""" agent_app.py — Minimal agent-facing FastAPI application (port 8080). Exposes only: - GET /health — Health check - POST /inject — Message injection - WS /ws — Message bus WebSocket - WS /ws/messages — Message stream WebSocket Agents cannot query usage, read config, see other agents, or access audit logs. """ from __future__ import annotations import uuid from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any, AsyncGenerator from fastapi import APIRouter, FastAPI, HTTPException, WebSocket from xml_pipeline.server.models import InjectRequest, InjectResponse from xml_pipeline.server.state import ServerState from xml_pipeline.server.websocket import create_websocket_router if TYPE_CHECKING: from xml_pipeline.message_bus.stream_pump import StreamPump def create_agent_app( pump: "StreamPump", *, title: str = "AgentOS Agent Bus", version: str = "1.0.0", ) -> FastAPI: """ Create the agent-facing FastAPI app. This app is intentionally minimal — only health, inject, and WebSocket. All monitoring, config, and management endpoints are on the management port. """ state = ServerState(pump) @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: state.set_running() # Journal recovery for hook in pump.dispatch_hooks: from xml_pipeline.message_bus.journal import MessageJournal if isinstance(hook, MessageJournal): entries = await hook.get_unacknowledged(older_than_seconds=0) if entries: import logging logger = logging.getLogger(__name__) logger.info( f"Journal recovery: replaying {len(entries)} unacknowledged entries" ) for entry in entries: await pump.inject( entry["payload_bytes"], thread_id=entry["thread_id"], from_id=entry["from_id"], ) logger.info("Journal recovery complete") break yield state.set_stopping() app = FastAPI( title=title, version=version, description="Agent-facing message bus. No management or monitoring endpoints.", lifespan=lifespan, # No OpenAPI docs on agent port (agents shouldn't see API structure) docs_url=None, redoc_url=None, openapi_url=None, ) # Health check @app.get("/health") async def health_check() -> dict[str, Any]: info = state.get_organism_info() return { "status": "healthy", "organism": info.name, "uptime_seconds": info.uptime_seconds, } # Inject endpoint router = APIRouter() @router.post("/inject", response_model=InjectResponse) async def inject_message(request: InjectRequest) -> InjectResponse: """Inject a message to an agent.""" agent = state.get_agent(request.to) if agent is None: raise HTTPException( status_code=400, detail=f"Unknown target agent: {request.to}", ) thread_id = request.thread_id or str(uuid.uuid4()) payload_type = next(iter(request.payload.keys()), "Payload") msg_id = await state.record_message( thread_id=thread_id, from_id="api", to_id=request.to, payload_type=payload_type, payload=request.payload, ) return InjectResponse(thread_id=thread_id, message_id=msg_id) app.include_router(router) # WebSocket endpoints (message bus) app.include_router(create_websocket_router(state)) # Store state for access app.state.server_state = state app.state.pump = pump return app