xml-pipeline/xml_pipeline/server/management.py
dullfig 06eeea3dee
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
CI / docker (push) Has been cancelled
Add AgentOS container foundation, security hardening, and management plane
Invert the agent model: the agent IS the computer. The message pump
becomes the kernel, handlers are sandboxed apps, and all access is
mediated by the platform.

Phase 1 — Container foundation:
- Multi-stage Dockerfile (python:3.12-slim, non-root user, /data volume)
- deploy/entrypoint.py with --dry-run config validation
- docker-compose.yml (cap_drop ALL, read_only, no-new-privileges)
- docker-compose.dev.yml overlay for development
- CI Docker build smoke test

Phase 2 — Security hardening:
- xml_pipeline/security/ module with default-deny container mode
- Permission gate: per-listener tool allowlist enforcement
- Network policy: egress control (only declared LLM backend domains)
- Shell tool: disabled in container mode
- File tool: restricted to /data and /config in container mode
- Fetch tool: integrates network egress policy
- Config loader: parses security and network YAML sections

Phase 3 — Management plane:
- Agent app (port 8080): minimal /health, /inject, /ws only
- Management app (port 9090): full API, audit log, dashboard
- SQLite-backed audit log for tool invocations and security events
- Static web dashboard (no framework, WebSocket-driven)
- CLI --split flag for dual-port serving

All 439 existing tests pass with zero regressions.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 21:37:24 -08:00

171 lines
5 KiB
Python

"""
management.py — Management plane FastAPI application (port 9090).
Full operator visibility and control:
- All REST API endpoints (organism, agents, threads, usage, journal)
- Audit log viewer
- Configuration management
- Static dashboard
- WebSocket for real-time monitoring
This app should only be accessible to operators, never to agents.
"""
from __future__ import annotations
import time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional
from fastapi import APIRouter, FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from xml_pipeline.server.api import create_router
from xml_pipeline.server.state import ServerState
from xml_pipeline.server.websocket import create_websocket_router
if TYPE_CHECKING:
from xml_pipeline.message_bus.stream_pump import StreamPump
def create_management_app(
pump: "StreamPump",
*,
title: str = "AgentOS Management",
version: str = "1.0.0",
cors_origins: Optional[list[str]] = None,
) -> FastAPI:
"""
Create the management FastAPI app (full operator access).
Includes all existing API endpoints plus:
- Audit log endpoints
- Dashboard static file serving
"""
state = ServerState(pump)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
state.set_running()
yield
state.set_stopping()
app = FastAPI(
title=title,
version=version,
description=(
"Management plane for AgentOS operators. "
"Full monitoring, control, audit, and configuration access."
),
lifespan=lifespan,
)
# CORS for dashboard
if cors_origins is None:
cors_origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check
@app.get("/health")
async def health_check() -> dict[str, Any]:
info = state.get_organism_info()
return {
"status": "healthy",
"organism": info.name,
"uptime_seconds": info.uptime_seconds,
"management": True,
}
# Include full API router (all endpoints)
app.include_router(create_router(state))
# Include WebSocket endpoints (for dashboard real-time updates)
app.include_router(create_websocket_router(state))
# Audit log endpoints
audit_router = _create_audit_router()
app.include_router(audit_router)
# Store state
app.state.server_state = state
app.state.pump = pump
# Mount dashboard static files (if directory exists)
dashboard_paths = [
Path(__file__).parent.parent.parent / "dashboard", # repo root
Path("/app/dashboard"), # container path
]
for dashboard_dir in dashboard_paths:
if dashboard_dir.is_dir():
app.mount(
"/dashboard",
StaticFiles(directory=str(dashboard_dir), html=True),
name="dashboard",
)
break
return app
def _create_audit_router() -> APIRouter:
"""Create the audit log API router."""
router = APIRouter(prefix="/api/v1/audit", tags=["audit"])
@router.get("/events")
async def get_audit_events(
event_type: Optional[str] = Query(None, description="Filter by event type"),
listener: Optional[str] = Query(None, description="Filter by listener name"),
severity: Optional[str] = Query(None, description="Filter by severity"),
since: Optional[float] = Query(None, description="Events since timestamp"),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
) -> dict[str, Any]:
"""Query audit log events."""
from xml_pipeline.server.audit import get_entries
entries = get_entries(
event_type=event_type,
listener_name=listener,
severity=severity,
since=since,
limit=limit,
offset=offset,
)
return {"events": entries, "count": len(entries)}
@router.get("/stats")
async def get_audit_stats() -> dict[str, Any]:
"""Get audit log statistics."""
from xml_pipeline.server.audit import get_stats
return get_stats()
@router.get("/security")
async def get_security_events(
limit: int = Query(50, ge=1, le=500),
) -> dict[str, Any]:
"""Get recent security-related events (warnings and above)."""
from xml_pipeline.server.audit import get_entries
warnings = get_entries(severity="warning", limit=limit)
errors = get_entries(severity="error", limit=limit)
critical = get_entries(severity="critical", limit=limit)
all_events = sorted(
warnings + errors + critical,
key=lambda e: e["timestamp"],
reverse=True,
)[:limit]
return {"events": all_events, "count": len(all_events)}
return router