diff --git a/tests/test_hot_reload.py b/tests/test_hot_reload.py new file mode 100644 index 0000000..5e77b1e --- /dev/null +++ b/tests/test_hot_reload.py @@ -0,0 +1,571 @@ +""" +test_hot_reload.py — Tests for hot-reload functionality. + +Tests that StreamPump can reload its configuration at runtime, +adding/removing/updating listeners without restarting. +""" + +import pytest +import tempfile +import yaml +from pathlib import Path +from dataclasses import dataclass + +from third_party.xmlable import xmlify + + +# ============================================================================ +# Test Payloads +# ============================================================================ + +@xmlify +@dataclass +class TestPayloadA: + """Test payload A.""" + value: str + + +@xmlify +@dataclass +class TestPayloadB: + """Test payload B.""" + count: int + + +@xmlify +@dataclass +class TestPayloadC: + """Test payload C.""" + name: str + + +# ============================================================================ +# Test Handlers +# ============================================================================ + +async def handler_a(payload, metadata): + """Handler A.""" + return None + + +async def handler_b(payload, metadata): + """Handler B.""" + return None + + +async def handler_c(payload, metadata): + """Handler C.""" + return None + + +async def handler_a_updated(payload, metadata): + """Handler A (updated).""" + return None + + +# ============================================================================ +# Fixtures +# ============================================================================ + +@pytest.fixture +def base_config(): + """Base configuration with one listener.""" + return { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + } + ], + } + + +@pytest.fixture +def config_file(base_config): + """Create a temporary config file.""" + with tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", delete=False + ) as f: + yaml.dump(base_config, f) + f.flush() + yield Path(f.name) + # Cleanup + Path(f.name).unlink(missing_ok=True) + + +@pytest.fixture +def pump(config_file): + """Create a StreamPump from the config file.""" + from xml_pipeline.message_bus import StreamPump, ConfigLoader + + config = ConfigLoader.load(str(config_file)) + pump = StreamPump(config, config_path=str(config_file)) + pump.register_all() + return pump + + +# ============================================================================ +# Tests: Reload Adds New Listeners +# ============================================================================ + +class TestReloadAddListeners: + """Test that reload adds new listeners.""" + + def test_add_new_listener(self, pump, config_file): + """New listeners in config should be registered.""" + # Verify initial state + assert "listener-a" in pump.listeners + assert "listener-b" not in pump.listeners + + # Update config to add listener-b + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + { + "name": "listener-b", + "payload_class": "tests.test_hot_reload.TestPayloadB", + "handler": "tests.test_hot_reload.handler_b", + "description": "Listener B", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + # Reload + result = pump.reload_config() + + # Verify + assert result.success + assert "listener-b" in result.added_listeners + assert "listener-b" in pump.listeners + assert pump.listeners["listener-b"].description == "Listener B" + + def test_add_multiple_listeners(self, pump, config_file): + """Multiple new listeners can be added at once.""" + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + { + "name": "listener-b", + "payload_class": "tests.test_hot_reload.TestPayloadB", + "handler": "tests.test_hot_reload.handler_b", + "description": "Listener B", + }, + { + "name": "listener-c", + "payload_class": "tests.test_hot_reload.TestPayloadC", + "handler": "tests.test_hot_reload.handler_c", + "description": "Listener C", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + result = pump.reload_config() + + assert result.success + assert set(result.added_listeners) == {"listener-b", "listener-c"} + assert "listener-b" in pump.listeners + assert "listener-c" in pump.listeners + + +# ============================================================================ +# Tests: Reload Removes Listeners +# ============================================================================ + +class TestReloadRemoveListeners: + """Test that reload removes listeners no longer in config.""" + + def test_remove_listener(self, config_file): + """Listeners not in new config should be unregistered.""" + from xml_pipeline.message_bus import StreamPump, ConfigLoader + + # Create pump with two listeners + initial_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + { + "name": "listener-b", + "payload_class": "tests.test_hot_reload.TestPayloadB", + "handler": "tests.test_hot_reload.handler_b", + "description": "Listener B", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(initial_config, f) + + config = ConfigLoader.load(str(config_file)) + pump = StreamPump(config, config_path=str(config_file)) + pump.register_all() + + assert "listener-a" in pump.listeners + assert "listener-b" in pump.listeners + + # Update config to remove listener-b + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + # Reload + result = pump.reload_config() + + # Verify + assert result.success + assert "listener-b" in result.removed_listeners + assert "listener-b" not in pump.listeners + assert "listener-a" in pump.listeners # Still exists + + +# ============================================================================ +# Tests: Reload Updates Listeners +# ============================================================================ + +class TestReloadUpdateListeners: + """Test that reload updates changed listeners.""" + + def test_update_handler(self, pump, config_file): + """Changed handler should be updated.""" + old_handler = pump.listeners["listener-a"].handler + + # Update config with new handler + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a_updated", + "description": "Listener A", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + result = pump.reload_config() + + assert result.success + assert "listener-a" in result.updated_listeners + assert pump.listeners["listener-a"].handler != old_handler + + def test_update_description(self, pump, config_file): + """Changed description should be updated.""" + # Update config with new description + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Updated description", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + result = pump.reload_config() + + assert result.success + assert "listener-a" in result.updated_listeners + assert pump.listeners["listener-a"].description == "Updated description" + + def test_update_peers(self, pump, config_file): + """Changed peers should be updated.""" + # Update config with peers + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + "agent": True, + "peers": ["listener-b"], + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + result = pump.reload_config() + + assert result.success + assert "listener-a" in result.updated_listeners + assert pump.listeners["listener-a"].peers == ["listener-b"] + + def test_unchanged_listener_not_updated(self, pump, config_file): + """Unchanged listeners should not appear in updated list.""" + # Reload same config + result = pump.reload_config() + + assert result.success + assert "listener-a" not in result.updated_listeners + assert "listener-a" not in result.added_listeners + assert "listener-a" not in result.removed_listeners + + +# ============================================================================ +# Tests: Reload Events +# ============================================================================ + +class TestReloadEvents: + """Test that reload emits events.""" + + def test_reload_emits_event(self, pump, config_file): + """Reload should emit ReloadEvent.""" + from xml_pipeline.message_bus import ReloadEvent + + events = [] + + def capture_event(event): + events.append(event) + + pump.subscribe_events(capture_event) + + # Add a listener + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + { + "name": "listener-b", + "payload_class": "tests.test_hot_reload.TestPayloadB", + "handler": "tests.test_hot_reload.handler_b", + "description": "Listener B", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + pump.reload_config() + + # Check event was emitted + reload_events = [e for e in events if isinstance(e, ReloadEvent)] + assert len(reload_events) == 1 + assert reload_events[0].success + assert "listener-b" in reload_events[0].added_listeners + + +# ============================================================================ +# Tests: Reload Error Handling +# ============================================================================ + +class TestReloadErrors: + """Test reload error handling.""" + + def test_reload_no_config_path(self): + """Reload without config path should fail gracefully.""" + from xml_pipeline.message_bus import StreamPump, OrganismConfig + + config = OrganismConfig(name="test") + pump = StreamPump(config) # No config_path + + result = pump.reload_config() + + assert not result.success + assert "No config path" in result.error + + def test_reload_invalid_config(self, pump, config_file): + """Reload with invalid config should fail gracefully.""" + # Write invalid YAML + with open(config_file, "w") as f: + f.write("invalid: yaml: content: [") + + result = pump.reload_config() + + assert not result.success + assert result.error is not None + + def test_reload_missing_file(self, pump): + """Reload with missing file should fail gracefully.""" + pump.config_path = "/nonexistent/path.yaml" + + result = pump.reload_config() + + assert not result.success + assert result.error is not None + + +# ============================================================================ +# Tests: System Listeners Protected +# ============================================================================ + +class TestSystemListenersProtected: + """Test that system listeners are not affected by reload.""" + + def test_system_listeners_not_removed(self, config_file): + """System listeners should not be removed during reload.""" + from xml_pipeline.message_bus import StreamPump, ConfigLoader, ListenerConfig + + # Load config + config = ConfigLoader.load(str(config_file)) + pump = StreamPump(config, config_path=str(config_file)) + + # Manually add a system listener (simulating bootstrap) + @xmlify + @dataclass + class SystemPayload: + msg: str + + async def system_handler(p, m): + return None + + system_lc = ListenerConfig( + name="system.test", + payload_class_path="tests.test_hot_reload.TestPayloadA", + handler_path="tests.test_hot_reload.handler_a", + description="System test listener", + payload_class=SystemPayload, + handler=system_handler, + ) + pump.register_listener(system_lc) + pump.register_all() + + assert "system.test" in pump.listeners + + # Reload with empty config (would remove all non-system listeners) + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + result = pump.reload_config() + + # System listener should still exist + assert result.success + assert "system.test" in pump.listeners + assert "system.test" not in result.removed_listeners + + +# ============================================================================ +# Tests: Routing Table Updates +# ============================================================================ + +class TestRoutingTableUpdates: + """Test that routing table is updated correctly.""" + + def test_routing_table_updated_on_add(self, pump, config_file): + """Routing table should include new listeners.""" + # Add listener-b + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + { + "name": "listener-b", + "payload_class": "tests.test_hot_reload.TestPayloadB", + "handler": "tests.test_hot_reload.handler_b", + "description": "Listener B", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + pump.reload_config() + + # Check routing table + expected_root_tag = "listener-b.testpayloadb" + assert expected_root_tag in pump.routing_table + + def test_routing_table_updated_on_remove(self, config_file): + """Routing table should not include removed listeners.""" + from xml_pipeline.message_bus import StreamPump, ConfigLoader + + # Create pump with two listeners + initial_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + { + "name": "listener-b", + "payload_class": "tests.test_hot_reload.TestPayloadB", + "handler": "tests.test_hot_reload.handler_b", + "description": "Listener B", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(initial_config, f) + + config = ConfigLoader.load(str(config_file)) + pump = StreamPump(config, config_path=str(config_file)) + pump.register_all() + + # Verify both in routing table + assert "listener-b.testpayloadb" in pump.routing_table + + # Remove listener-b + new_config = { + "organism": {"name": "test-organism"}, + "listeners": [ + { + "name": "listener-a", + "payload_class": "tests.test_hot_reload.TestPayloadA", + "handler": "tests.test_hot_reload.handler_a", + "description": "Listener A", + }, + ], + } + with open(config_file, "w") as f: + yaml.dump(new_config, f) + + pump.reload_config() + + # Check routing table no longer has listener-b + assert "listener-b.testpayloadb" not in pump.routing_table diff --git a/xml_pipeline/message_bus/__init__.py b/xml_pipeline/message_bus/__init__.py index 51fe135..a5e18af 100644 --- a/xml_pipeline/message_bus/__init__.py +++ b/xml_pipeline/message_bus/__init__.py @@ -39,6 +39,7 @@ from xml_pipeline.message_bus.stream_pump import ( MessageSentEvent, AgentStateEvent, ThreadEvent, + ReloadEvent, ) from xml_pipeline.message_bus.message_state import ( @@ -83,6 +84,7 @@ __all__ = [ "MessageSentEvent", "AgentStateEvent", "ThreadEvent", + "ReloadEvent", # Message state "MessageState", "HandlerMetadata", diff --git a/xml_pipeline/message_bus/stream_pump.py b/xml_pipeline/message_bus/stream_pump.py index 039da62..f86aa54 100644 --- a/xml_pipeline/message_bus/stream_pump.py +++ b/xml_pipeline/message_bus/stream_pump.py @@ -96,6 +96,16 @@ class ThreadEvent(PumpEvent): error: Optional[str] = None +@dataclass +class ReloadEvent(PumpEvent): + """Fired when organism configuration is reloaded.""" + success: bool + added_listeners: List[str] = field(default_factory=list) + removed_listeners: List[str] = field(default_factory=list) + updated_listeners: List[str] = field(default_factory=list) + error: Optional[str] = None + + EventCallback = Callable[[PumpEvent], None] @@ -253,8 +263,9 @@ class StreamPump: shared backend (Redis or Manager) for cross-process data access. """ - def __init__(self, config: OrganismConfig): + def __init__(self, config: OrganismConfig, config_path: str = ""): self.config = config + self.config_path = config_path # Store path for hot-reload # Message queue feeds the stream self.queue: asyncio.Queue[MessageState] = asyncio.Queue() @@ -1111,6 +1122,119 @@ class StreamPump: pump_logger.info("ProcessPool shutdown complete") self._process_pool = None + def reload_config(self, config_path: Optional[str] = None) -> ReloadEvent: + """ + Hot-reload organism configuration. + + Re-reads the config file and updates listeners: + - New listeners are registered + - Removed listeners are unregistered + - Changed listeners are updated (handler, peers, description) + + Args: + config_path: Path to config file. Uses stored path if not provided. + + Returns: + ReloadEvent with details of what changed + """ + path = config_path or self.config_path + if not path: + return ReloadEvent( + success=False, + error="No config path available for reload", + ) + + try: + # Re-read config + new_config = ConfigLoader.load(path) + + # Track changes + added: List[str] = [] + removed: List[str] = [] + updated: List[str] = [] + + # Get current listener names (excluding system listeners) + current_names = { + name for name in self.listeners.keys() + if not name.startswith("system.") + } + + # Get new listener names + new_names = {lc.name for lc in new_config.listeners} + + # Find removed listeners + for name in current_names - new_names: + if self.unregister_listener(name): + removed.append(name) + pump_logger.info(f"Hot-reload: removed listener '{name}'") + + # Find new and updated listeners + for lc in new_config.listeners: + # Resolve imports for the listener config + ConfigLoader._resolve_imports(lc) + + if lc.name in current_names: + # Check if changed + existing = self.listeners.get(lc.name) + if existing and self._listener_changed(existing, lc): + # Remove old and re-register + self.unregister_listener(lc.name) + self.register_listener(lc) + updated.append(lc.name) + pump_logger.info(f"Hot-reload: updated listener '{lc.name}'") + else: + # New listener + self.register_listener(lc) + added.append(lc.name) + pump_logger.info(f"Hot-reload: added listener '{lc.name}'") + + # Rebuild usage instructions for all agents (peers may have changed) + for listener in self.listeners.values(): + if listener.is_agent and listener.peers: + listener.usage_instructions = self._build_usage_instructions(listener) + + # Update stored config + self.config = new_config + + # Emit reload event + event = ReloadEvent( + success=True, + added_listeners=added, + removed_listeners=removed, + updated_listeners=updated, + ) + self._emit_event(event) + + pump_logger.info( + f"Hot-reload complete: +{len(added)} -{len(removed)} ~{len(updated)}" + ) + return event + + except Exception as e: + pump_logger.error(f"Hot-reload failed: {e}") + event = ReloadEvent(success=False, error=str(e)) + self._emit_event(event) + return event + + def _listener_changed(self, existing: Listener, new_config: ListenerConfig) -> bool: + """Check if listener config has changed.""" + # Compare key fields + if existing.handler != new_config.handler: + return True + if existing.payload_class != new_config.payload_class: + return True + if existing.description != new_config.description: + return True + if existing.is_agent != new_config.is_agent: + return True + if set(existing.peers) != set(new_config.peers): + return True + if existing.broadcast != new_config.broadcast: + return True + if existing.cpu_bound != new_config.cpu_bound: + return True + return False + # ============================================================================ # Config Loader (same as before) @@ -1208,7 +1332,7 @@ async def bootstrap(config_path: str = "config/organism.yaml") -> StreamPump: print(f"Organism: {config.name}") print(f"Listeners: {len(config.listeners)}") - pump = StreamPump(config) + pump = StreamPump(config, config_path=config_path) # Register system listeners first boot_listener_config = ListenerConfig( diff --git a/xml_pipeline/server/api.py b/xml_pipeline/server/api.py index fc60399..156c790 100644 --- a/xml_pipeline/server/api.py +++ b/xml_pipeline/server/api.py @@ -259,9 +259,21 @@ def create_router(state: "ServerState") -> APIRouter: @router.post("/organism/reload") async def reload_config() -> dict: - """Hot-reload organism configuration.""" - # TODO: Implement hot-reload - return {"success": False, "error": "Hot-reload not yet implemented"} + """ + Hot-reload organism configuration. + + Re-reads organism.yaml and updates listeners: + - New listeners are registered + - Removed listeners are unregistered + - Changed listeners are updated + """ + result = await state.reload_config() + if not result["success"]: + raise HTTPException( + status_code=500, + detail=result.get("error", "Reload failed"), + ) + return result @router.post("/organism/stop") async def stop_organism() -> dict: diff --git a/xml_pipeline/server/state.py b/xml_pipeline/server/state.py index adeb3e5..d6f95a6 100644 --- a/xml_pipeline/server/state.py +++ b/xml_pipeline/server/state.py @@ -222,6 +222,60 @@ class ServerState: """Mark organism as stopping.""" self._status = OrganismStatus.STOPPING + async def reload_config(self, config_path: Optional[str] = None) -> dict: + """ + Hot-reload organism configuration. + + Calls pump.reload_config() and updates local agent state. + + Args: + config_path: Optional path to config file (uses stored path if not provided) + + Returns: + Dict with reload results + """ + from xml_pipeline.message_bus import ReloadEvent + + # Call pump's reload + event = self.pump.reload_config(config_path) + + if event.success: + # Refresh agent states from pump + async with self._lock: + # Remove agents that were removed from pump + for name in event.removed_listeners: + if name in self._agents: + del self._agents[name] + + # Add/update agents + for name in event.added_listeners + event.updated_listeners: + listener = self.pump.listeners.get(name) + if listener: + self._agents[name] = AgentRuntimeState( + name=name, + description=listener.description, + is_agent=listener.is_agent, + peers=list(listener.peers), + payload_class=f"{listener.payload_class.__module__}.{listener.payload_class.__name__}", + ) + + # Notify subscribers of reload + await self._broadcast({ + "event": "reload", + "success": True, + "added": event.added_listeners, + "removed": event.removed_listeners, + "updated": event.updated_listeners, + }) + + return { + "success": event.success, + "added": event.added_listeners, + "removed": event.removed_listeners, + "updated": event.updated_listeners, + "error": event.error, + } + # ========================================================================= # Event Recording (called by pump hooks) # =========================================================================