xml-pipeline/xml_pipeline/server/app.py
dullfig d97c24b1dd
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
Add message journal, graceful restart, and clean repo for public release
Three workstreams implemented:

W1 (Repo Split): Remove proprietary BloxServer files and docs, update
pyproject.toml URLs to public GitHub, clean doc references, add CI
workflow (.github/workflows/ci.yml) and CONTRIBUTING.md.

W2 (Message Journal): Add DispatchHook protocol for dispatch lifecycle
events, SQLite-backed MessageJournal with WAL mode for certified-mail
delivery guarantees (PENDING→DISPATCHED→ACKED/FAILED), integrate hooks
into StreamPump._dispatch_to_handlers(), add journal REST endpoints,
and aiosqlite dependency.

W3 (Hot Deployment): Add RestartOrchestrator for graceful restart with
queue drain and journal stats collection, SIGHUP signal handler in CLI,
POST /organism/restart endpoint, restart-aware app lifespan with journal
recovery on boot, and os.execv/subprocess re-exec for Unix/Windows.

All 439 tests pass (37 new tests for W2/W3).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 22:27:38 -08:00

169 lines
4.7 KiB
Python

"""
app.py — FastAPI application factory for AgentServer.
Creates the FastAPI app that combines REST API and WebSocket endpoints.
"""
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from xml_pipeline.server.api import create_router
from xml_pipeline.server.state import ServerState
from xml_pipeline.server.websocket import create_websocket_router
if TYPE_CHECKING:
from xml_pipeline.message_bus.stream_pump import StreamPump
def create_app(
pump: "StreamPump",
*,
title: str = "AgentServer API",
version: str = "1.0.0",
cors_origins: Optional[list[str]] = None,
) -> FastAPI:
"""
Create FastAPI application with REST and WebSocket endpoints.
Args:
pump: The StreamPump instance to wrap
title: API title for OpenAPI docs
version: API version
cors_origins: List of allowed CORS origins (default: all)
Returns:
Configured FastAPI application
"""
# Create state manager
state = ServerState(pump)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Manage app lifecycle - startup and shutdown with journal recovery."""
# Startup
state.set_running()
# Journal recovery: replay unacknowledged entries from previous run
for hook in pump.dispatch_hooks:
from xml_pipeline.message_bus.journal import MessageJournal
if isinstance(hook, MessageJournal):
entries = await hook.get_unacknowledged(older_than_seconds=0)
if entries:
import logging
logger = logging.getLogger(__name__)
logger.info(
f"Journal recovery: replaying {len(entries)} unacknowledged entries"
)
for entry in entries:
await pump.inject(
entry["payload_bytes"],
thread_id=entry["thread_id"],
from_id=entry["from_id"],
)
logger.info("Journal recovery complete")
break
yield
# Shutdown
state.set_stopping()
app = FastAPI(
title=title,
version=version,
description="REST and WebSocket API for monitoring and controlling xml-pipeline organisms.",
lifespan=lifespan,
)
# CORS middleware
if cors_origins is None:
cors_origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(create_router(state))
app.include_router(create_websocket_router(state))
# Store state on app for access if needed
app.state.server_state = state
app.state.pump = pump
@app.get("/health")
async def health_check() -> dict[str, Any]:
"""Health check endpoint."""
info = state.get_organism_info()
return {
"status": "healthy",
"organism": info.name,
"uptime_seconds": info.uptime_seconds,
}
return app
async def run_server(
pump: "StreamPump",
*,
host: str = "0.0.0.0",
port: int = 8080,
cors_origins: Optional[list[str]] = None,
) -> None:
"""
Run the AgentServer with uvicorn.
Args:
pump: The StreamPump instance to wrap
host: Host to bind to
port: Port to listen on
cors_origins: List of allowed CORS origins
"""
try:
import uvicorn
except ImportError as e:
raise ImportError(
"uvicorn is required for the server. Install with: pip install xml-pipeline[server]"
) from e
app = create_app(pump, cors_origins=cors_origins)
config = uvicorn.Config(
app,
host=host,
port=port,
log_level="info",
)
server = uvicorn.Server(config)
await server.serve()
def run_server_sync(
pump: "StreamPump",
*,
host: str = "0.0.0.0",
port: int = 8080,
cors_origins: Optional[list[str]] = None,
) -> None:
"""
Run the AgentServer synchronously (blocking).
This is a convenience wrapper for CLI usage.
Args:
pump: The StreamPump instance to wrap
host: Host to bind to
port: Port to listen on
cors_origins: List of allowed CORS origins
"""
asyncio.run(run_server(pump, host=host, port=port, cors_origins=cors_origins))