Some checks failed
Invert the agent model: the agent IS the computer. The message pump becomes the kernel, handlers are sandboxed apps, and all access is mediated by the platform. Phase 1 — Container foundation: - Multi-stage Dockerfile (python:3.12-slim, non-root user, /data volume) - deploy/entrypoint.py with --dry-run config validation - docker-compose.yml (cap_drop ALL, read_only, no-new-privileges) - docker-compose.dev.yml overlay for development - CI Docker build smoke test Phase 2 — Security hardening: - xml_pipeline/security/ module with default-deny container mode - Permission gate: per-listener tool allowlist enforcement - Network policy: egress control (only declared LLM backend domains) - Shell tool: disabled in container mode - File tool: restricted to /data and /config in container mode - Fetch tool: integrates network egress policy - Config loader: parses security and network YAML sections Phase 3 — Management plane: - Agent app (port 8080): minimal /health, /inject, /ws only - Management app (port 9090): full API, audit log, dashboard - SQLite-backed audit log for tool invocations and security events - Static web dashboard (no framework, WebSocket-driven) - CLI --split flag for dual-port serving All 439 existing tests pass with zero regressions. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
128 lines
3.9 KiB
Python
128 lines
3.9 KiB
Python
"""
|
|
agent_app.py — Minimal agent-facing FastAPI application (port 8080).
|
|
|
|
Exposes only:
|
|
- GET /health — Health check
|
|
- POST /inject — Message injection
|
|
- WS /ws — Message bus WebSocket
|
|
- WS /ws/messages — Message stream WebSocket
|
|
|
|
Agents cannot query usage, read config, see other agents, or access audit logs.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from typing import TYPE_CHECKING, Any, AsyncGenerator
|
|
|
|
from fastapi import APIRouter, FastAPI, HTTPException, WebSocket
|
|
|
|
from xml_pipeline.server.models import InjectRequest, InjectResponse
|
|
from xml_pipeline.server.state import ServerState
|
|
from xml_pipeline.server.websocket import create_websocket_router
|
|
|
|
if TYPE_CHECKING:
|
|
from xml_pipeline.message_bus.stream_pump import StreamPump
|
|
|
|
|
|
def create_agent_app(
|
|
pump: "StreamPump",
|
|
*,
|
|
title: str = "AgentOS Agent Bus",
|
|
version: str = "1.0.0",
|
|
) -> FastAPI:
|
|
"""
|
|
Create the agent-facing FastAPI app.
|
|
|
|
This app is intentionally minimal — only health, inject, and WebSocket.
|
|
All monitoring, config, and management endpoints are on the management port.
|
|
"""
|
|
state = ServerState(pump)
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|
state.set_running()
|
|
|
|
# Journal recovery
|
|
for hook in pump.dispatch_hooks:
|
|
from xml_pipeline.message_bus.journal import MessageJournal
|
|
|
|
if isinstance(hook, MessageJournal):
|
|
entries = await hook.get_unacknowledged(older_than_seconds=0)
|
|
if entries:
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(
|
|
f"Journal recovery: replaying {len(entries)} unacknowledged entries"
|
|
)
|
|
for entry in entries:
|
|
await pump.inject(
|
|
entry["payload_bytes"],
|
|
thread_id=entry["thread_id"],
|
|
from_id=entry["from_id"],
|
|
)
|
|
logger.info("Journal recovery complete")
|
|
break
|
|
|
|
yield
|
|
state.set_stopping()
|
|
|
|
app = FastAPI(
|
|
title=title,
|
|
version=version,
|
|
description="Agent-facing message bus. No management or monitoring endpoints.",
|
|
lifespan=lifespan,
|
|
# No OpenAPI docs on agent port (agents shouldn't see API structure)
|
|
docs_url=None,
|
|
redoc_url=None,
|
|
openapi_url=None,
|
|
)
|
|
|
|
# Health check
|
|
@app.get("/health")
|
|
async def health_check() -> dict[str, Any]:
|
|
info = state.get_organism_info()
|
|
return {
|
|
"status": "healthy",
|
|
"organism": info.name,
|
|
"uptime_seconds": info.uptime_seconds,
|
|
}
|
|
|
|
# Inject endpoint
|
|
router = APIRouter()
|
|
|
|
@router.post("/inject", response_model=InjectResponse)
|
|
async def inject_message(request: InjectRequest) -> InjectResponse:
|
|
"""Inject a message to an agent."""
|
|
agent = state.get_agent(request.to)
|
|
if agent is None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Unknown target agent: {request.to}",
|
|
)
|
|
|
|
thread_id = request.thread_id or str(uuid.uuid4())
|
|
payload_type = next(iter(request.payload.keys()), "Payload")
|
|
|
|
msg_id = await state.record_message(
|
|
thread_id=thread_id,
|
|
from_id="api",
|
|
to_id=request.to,
|
|
payload_type=payload_type,
|
|
payload=request.payload,
|
|
)
|
|
|
|
return InjectResponse(thread_id=thread_id, message_id=msg_id)
|
|
|
|
app.include_router(router)
|
|
|
|
# WebSocket endpoints (message bus)
|
|
app.include_router(create_websocket_router(state))
|
|
|
|
# Store state for access
|
|
app.state.server_state = state
|
|
app.state.pump = pump
|
|
|
|
return app
|