""" restart.py — Graceful restart orchestrator for AgentServer. Handles the restart protocol: 1. SIGNAL → SIGHUP or POST /organism/restart 2. DRAIN → pump._running = False; wait for queue to drain 3. PERSIST → Journal already has all in-flight state 4. STOP → Shutdown pump, server, process pool 5. EXEC → os.execv() (Unix) or subprocess (Windows) 6. BOOT → bootstrap() runs, journal replays unacknowledged 7. VERIFY → Compare journal stats pre/post restart The journal (W2) provides the safety net: unacknowledged entries are replayed on boot, ensuring no messages are lost during restart. """ from __future__ import annotations import asyncio import logging import os import subprocess import sys from dataclasses import dataclass, field from typing import Any, Dict, Optional, TYPE_CHECKING if TYPE_CHECKING: from xml_pipeline.message_bus.stream_pump import StreamPump logger = logging.getLogger(__name__) @dataclass class RestartResult: """Result of a restart drain operation.""" success: bool drained: bool journal_stats: Dict[str, Any] = field(default_factory=dict) error: Optional[str] = None class RestartOrchestrator: """ Orchestrates graceful restart of the organism. The restart protocol: 1. Drain the message queue (stop accepting new messages) 2. Wait for in-flight handlers to complete (with timeout) 3. Collect journal stats for post-restart verification 4. Re-exec the process (or signal the caller to do so) """ def __init__(self, pump: "StreamPump") -> None: self._pump = pump self._restarting = False @property def is_restarting(self) -> bool: return self._restarting async def initiate_restart(self, timeout: float = 30.0) -> RestartResult: """ Drain the pump and prepare for restart. Args: timeout: Maximum seconds to wait for drain Returns: RestartResult with drain status and journal stats """ if self._restarting: return RestartResult( success=False, drained=False, error="Restart already in progress", ) self._restarting = True logger.info("Restart initiated — draining message queue...") # Collect pre-restart journal stats journal_stats: Dict[str, Any] = {} for hook in self._pump.dispatch_hooks: from xml_pipeline.message_bus.journal import MessageJournal if isinstance(hook, MessageJournal): journal_stats = await hook.get_stats() break # Stop accepting new messages self._pump._running = False # Wait for queue to drain with timeout drained = False try: await asyncio.wait_for( self._pump.queue.join(), timeout=timeout, ) drained = True logger.info("Message queue drained successfully") except asyncio.TimeoutError: logger.warning( f"Queue drain timed out after {timeout}s — " f"{self._pump.queue.qsize()} messages remaining" ) # Shutdown process pool if active if self._pump._process_pool: self._pump._process_pool.shutdown(wait=True) logger.info("ProcessPool shutdown complete") return RestartResult( success=True, drained=drained, journal_stats=journal_stats, ) @staticmethod def exec_restart() -> None: """ Re-exec the current process. On Unix, uses os.execv() for in-place replacement. On Windows, starts a new process and exits. """ python = sys.executable args = sys.argv[:] logger.info(f"Re-executing: {python} {' '.join(args)}") if sys.platform == "win32": # Windows: start new process and exit subprocess.Popen([python] + args) sys.exit(0) else: # Unix: in-place replacement os.execv(python, [python] + args)