diff --git a/agentserver/message_bus/message_state.py b/agentserver/message_bus/message_state.py index df64e6f..8e1f2b0 100644 --- a/agentserver/message_bus/message_state.py +++ b/agentserver/message_bus/message_state.py @@ -27,6 +27,78 @@ class HandlerMetadata: from_id: str own_name: str | None = None # Only for agent: true listeners is_self_call: bool = False # Convenience flag + usage_instructions: str = "" # Peer schemas for LLM prompts + todo_nudge: str = "" # Raised eyebrows: "your todos appear complete" + + +class _ResponseMarker: + """Sentinel indicating 'respond to caller'.""" + pass + +RESPOND_TO_CALLER = _ResponseMarker() + + +@dataclass +class HandlerResponse: + """ + Clean return type for handlers. + + Handlers return this instead of raw XML bytes. + The pump handles envelope wrapping. + + Usage: + # Forward to specific listener: + return HandlerResponse(payload=MyPayload(...), to="target") + + # Respond back to caller (prunes call chain): + return HandlerResponse.respond(MyPayload(...)) + """ + payload: Any # @xmlify dataclass instance + to: str | _ResponseMarker # Target listener name, or RESPOND_TO_CALLER + + @classmethod + def respond(cls, payload: Any) -> 'HandlerResponse': + """ + Create a response back to the caller. + + The pump will look up the call chain, prune it, and route + back to whoever called this handler. + """ + return cls(payload=payload, to=RESPOND_TO_CALLER) + + @property + def is_response(self) -> bool: + """Check if this is a response (back to caller) vs forward (to named target).""" + return isinstance(self.to, _ResponseMarker) + + +@dataclass +class SystemError: + """ + System error sent back to agent for retry. + + Generic message that doesn't reveal topology. + Keeps thread alive so agent can try again. + """ + code: str # Generic code: "routing", "validation", "timeout" + message: str # Human-readable, non-revealing message + retry_allowed: bool = True + + def to_xml(self) -> str: + """Manual XML serialization (avoids xmlify issues with future annotations).""" + return f""" + {self.code} + {self.message} + {str(self.retry_allowed).lower()} +""" + + +# Standard error messages (intentionally generic) +ROUTING_ERROR = SystemError( + code="routing", + message="Message could not be delivered. Please verify your target and try again.", + retry_allowed=True, +) @dataclass diff --git a/agentserver/message_bus/stream_pump.py b/agentserver/message_bus/stream_pump.py index eccbef7..f076af4 100644 --- a/agentserver/message_bus/stream_pump.py +++ b/agentserver/message_bus/stream_pump.py @@ -29,7 +29,9 @@ from agentserver.message_bus.steps.c14n import c14n_step from agentserver.message_bus.steps.envelope_validation import envelope_validation_step from agentserver.message_bus.steps.payload_extraction import payload_extraction_step from agentserver.message_bus.steps.thread_assignment import thread_assignment_step -from agentserver.message_bus.message_state import MessageState, HandlerMetadata +from agentserver.message_bus.message_state import MessageState, HandlerMetadata, HandlerResponse, SystemError, ROUTING_ERROR +from agentserver.message_bus.thread_registry import get_registry +from agentserver.message_bus.todo_registry import get_todo_registry # ============================================================================ @@ -62,6 +64,9 @@ class OrganismConfig: max_concurrent_handlers: int = 20 # Concurrent handler invocations max_concurrent_per_agent: int = 5 # Per-agent rate limit + # LLM configuration (optional) + llm_config: Dict[str, Any] = field(default_factory=dict) + @dataclass class Listener: @@ -74,6 +79,7 @@ class Listener: broadcast: bool = False schema: etree.XMLSchema = field(default=None, repr=False) root_tag: str = "" + usage_instructions: str = "" # Generated at registration for LLM agents # ============================================================================ @@ -209,9 +215,67 @@ class StreamPump: return listener def register_all(self) -> None: + # First pass: register all listeners for lc in self.config.listeners: self.register_listener(lc) + # Second pass: build usage instructions (needs all listeners registered) + for listener in self.listeners.values(): + if listener.is_agent and listener.peers: + listener.usage_instructions = self._build_usage_instructions(listener) + + def _build_usage_instructions(self, agent: Listener) -> str: + """ + Build LLM system prompt instructions from peer schemas. + + Generates human-readable documentation of what messages + this agent can send to its peers. + """ + lines = [ + f"You are the {agent.name} agent.", + f"Description: {agent.description}", + "", + "You can send messages to the following peers:", + ] + + for peer_name in agent.peers: + peer = self.listeners.get(peer_name) + if not peer: + lines.append(f"\n## {peer_name} (not registered)") + continue + + lines.append(f"\n## {peer_name}") + lines.append(f"Description: {peer.description}") + + # Get XSD schema as readable XML + if hasattr(peer.payload_class, 'xsd'): + xsd_tree = peer.payload_class.xsd() + xsd_str = etree.tostring(xsd_tree, pretty_print=True, encoding='unicode') + lines.append(f"Expected payload schema:\n```xml\n{xsd_str}```") + + # Also show a simple example structure + if hasattr(peer.payload_class, '__dataclass_fields__'): + fields = peer.payload_class.__dataclass_fields__ + example_lines = [f"<{peer.payload_class.__name__}>"] + for fname, finfo in fields.items(): + example_lines.append(f" <{fname}>...") + example_lines.append(f"") + lines.append(f"Example structure:\n```xml\n" + "\n".join(example_lines) + "\n```") + + lines.append("\n---") + lines.append("## Important: Response Semantics") + lines.append("") + lines.append("When you RESPOND (return to your caller), your call chain is pruned.") + lines.append("This means:") + lines.append("- Any sub-agents you called are effectively terminated") + lines.append("- Their state/context is lost (e.g., calculator memory, scratch space)") + lines.append("- You cannot call them again in the same context after responding") + lines.append("") + lines.append("Therefore: Complete ALL sub-tasks before responding to your caller.") + lines.append("If you need results from a peer, wait for their response before you respond.") + + return "\n".join(lines) + def _generate_schema(self, payload_class: type) -> etree.XMLSchema: """Generate XSD schema from xmlified payload class.""" if hasattr(payload_class, 'xsd'): @@ -262,6 +326,11 @@ class StreamPump: For broadcast, yields one response per listener. Each response becomes a new message in the stream. + + Handlers can return: + - None: no response needed + - HandlerResponse(payload, to): clean dataclass + target (preferred) + - bytes: raw envelope XML (legacy, for backwards compatibility) """ if state.error or not state.target_listeners: # Pass through errors/unroutable for downstream handling @@ -276,21 +345,114 @@ class StreamPump: await semaphore.acquire() try: + # Ensure we have a valid thread chain + registry = get_registry() + todo_registry = get_todo_registry() + current_thread = state.thread_id or "" + + # Check if thread exists in registry; if not, register it + if current_thread and not registry.lookup(current_thread): + # New conversation - register existing UUID to chain + # The UUID was assigned by thread_assignment_step + from_id = state.from_id or "external" + registry.register_thread(current_thread, from_id, listener.name) + + # Check for todo matches on this message + # This may raise eyebrows on watchers for this thread + if current_thread and state.payload: + payload_type = type(state.payload).__name__ + todo_registry.check( + thread_id=current_thread, + payload_type=payload_type, + from_id=state.from_id or "", + payload=state.payload, + ) + + # Detect self-calls (agent sending to itself) + is_self_call = (state.from_id or "") == listener.name + + # Get any raised eyebrows for this agent (for nagging) + todo_nudge = "" + if listener.is_agent and current_thread: + raised = todo_registry.get_raised_for(current_thread, listener.name) + todo_nudge = todo_registry.format_nudge(raised) + metadata = HandlerMetadata( - thread_id=state.thread_id or "", + thread_id=current_thread, from_id=state.from_id or "", own_name=listener.name if listener.is_agent else None, + is_self_call=is_self_call, + usage_instructions=listener.usage_instructions, + todo_nudge=todo_nudge, ) - response_bytes = await listener.handler(state.payload, metadata) + response = await listener.handler(state.payload, metadata) - if not isinstance(response_bytes, bytes): + # None means "no response needed" - don't re-inject + if response is None: + continue + + # Handle clean HandlerResponse (preferred) + if isinstance(response, HandlerResponse): + registry = get_registry() + + if response.is_response: + # Response back to caller - prune chain + target, new_thread_id = registry.prune_for_response(current_thread) + if target is None: + # Chain exhausted - nowhere to respond to + continue + to_id = target + thread_id = new_thread_id + else: + # Forward to named target - validate against peers + requested_to = response.to + + # Enforce peer constraints for agents + if listener.is_agent and listener.peers: + if requested_to not in listener.peers: + # Agent trying to send to non-peer - send generic error back to agent + # Log details internally but don't reveal to agent + import logging + logging.getLogger(__name__).warning( + f"Peer violation: {listener.name} -> {requested_to} (allowed: {listener.peers})" + ) + + # Send SystemError back to the agent (keeps thread alive) + error_bytes = self._wrap_in_envelope( + payload=ROUTING_ERROR, + from_id="system", + to_id=listener.name, + thread_id=current_thread, + ) + yield MessageState( + raw_bytes=error_bytes, + thread_id=current_thread, + from_id="system", + ) + continue + + to_id = requested_to + thread_id = registry.extend_chain(current_thread, to_id) + + response_bytes = self._wrap_in_envelope( + payload=response.payload, + from_id=listener.name, + to_id=to_id, + thread_id=thread_id, + ) + # Legacy: raw bytes (backwards compatible) + elif isinstance(response, bytes): + response_bytes = response + thread_id = state.thread_id + else: response_bytes = b"Handler returned invalid type" + thread_id = state.thread_id # Yield response — will be processed by next iteration yield MessageState( raw_bytes=response_bytes, - thread_id=state.thread_id, + thread_id=thread_id, from_id=listener.name, ) @@ -306,6 +468,37 @@ class StreamPump: error=str(exc), ) + def _wrap_in_envelope(self, payload: Any, from_id: str, to_id: str, thread_id: str) -> bytes: + """Wrap a dataclass payload in a message envelope.""" + # Serialize payload to XML + if hasattr(payload, 'to_xml'): + # SystemError and similar have manual to_xml() + payload_str = payload.to_xml() + elif hasattr(payload, 'xml_value'): + # @xmlify dataclasses + payload_class_name = type(payload).__name__ + payload_tree = payload.xml_value(payload_class_name) + payload_str = etree.tostring(payload_tree, encoding='unicode') + else: + # Fallback for non-xmlify classes + payload_class_name = type(payload).__name__ + payload_str = f"<{payload_class_name}>{payload}" + + # Add xmlns="" to keep payload out of envelope namespace + if 'xmlns=' not in payload_str: + idx = payload_str.index('>') + payload_str = payload_str[:idx] + ' xmlns=""' + payload_str[idx:] + + envelope = f""" + + {from_id} + {to_id} + {thread_id} + + {payload_str} +""" + return envelope.encode('utf-8') + async def _reinject_responses(self, state: MessageState) -> None: """Push handler responses back into the queue for next iteration.""" await self.queue.put(state) @@ -498,6 +691,7 @@ class ConfigLoader: max_concurrent_pipelines=raw.get("max_concurrent_pipelines", 50), max_concurrent_handlers=raw.get("max_concurrent_handlers", 20), max_concurrent_per_agent=raw.get("max_concurrent_per_agent", 5), + llm_config=raw.get("llm", {}), ) for entry in raw.get("listeners", []): @@ -533,14 +727,92 @@ class ConfigLoader: # ============================================================================ async def bootstrap(config_path: str = "config/organism.yaml") -> StreamPump: - """Load config and create pump.""" + """Load config, create pump, initialize root thread, and inject boot message.""" + from datetime import datetime, timezone + from dotenv import load_dotenv + from agentserver.primitives import Boot, handle_boot + from agentserver.primitives import ( + TodoUntil, TodoComplete, + handle_todo_until, handle_todo_complete, + ) + + # Load .env file if present + load_dotenv() + config = ConfigLoader.load(config_path) print(f"Organism: {config.name}") print(f"Listeners: {len(config.listeners)}") pump = StreamPump(config) + + # Register system listeners first + boot_listener_config = ListenerConfig( + name="system.boot", + payload_class_path="agentserver.primitives.Boot", + handler_path="agentserver.primitives.handle_boot", + description="System boot handler - initializes organism", + is_agent=False, + payload_class=Boot, + handler=handle_boot, + ) + pump.register_listener(boot_listener_config) + + # Register TodoUntil handler (agents register watchers) + todo_until_config = ListenerConfig( + name="system.todo", + payload_class_path="agentserver.primitives.TodoUntil", + handler_path="agentserver.primitives.handle_todo_until", + description="System todo handler - registers watchers", + is_agent=False, + payload_class=TodoUntil, + handler=handle_todo_until, + ) + pump.register_listener(todo_until_config) + + # Register TodoComplete handler (agents close watchers) + todo_complete_config = ListenerConfig( + name="system.todo-complete", + payload_class_path="agentserver.primitives.TodoComplete", + handler_path="agentserver.primitives.handle_todo_complete", + description="System todo handler - closes watchers", + is_agent=False, + payload_class=TodoComplete, + handler=handle_todo_complete, + ) + pump.register_listener(todo_complete_config) + + # Register all user-defined listeners pump.register_all() + # Configure LLM router if llm section present + if config.llm_config: + from agentserver.llm import configure_router + configure_router(config.llm_config) + print(f"LLM backends: {len(config.llm_config.get('backends', []))}") + + # Initialize root thread in registry + registry = get_registry() + root_uuid = registry.initialize_root(config.name) + print(f"Root thread: {root_uuid} ({registry.root_chain})") + + # Create and inject the boot message + boot_payload = Boot( + organism_name=config.name, + timestamp=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + listener_count=len(pump.listeners), + ) + + # Wrap boot payload in envelope + boot_envelope = pump._wrap_in_envelope( + payload=boot_payload, + from_id="system", + to_id="system.boot", + thread_id=root_uuid, + ) + + # Inject boot message (will be processed when pump.run() is called) + await pump.inject(boot_envelope, thread_id=root_uuid, from_id="system") + print(f"Routing: {list(pump.routing_table.keys())}") return pump diff --git a/agentserver/message_bus/todo_registry.py b/agentserver/message_bus/todo_registry.py new file mode 100644 index 0000000..e36a30a --- /dev/null +++ b/agentserver/message_bus/todo_registry.py @@ -0,0 +1,231 @@ +""" +todo_registry.py — Registry for TodoUntil watchers. + +Tracks pending "todos" that agents have issued. When a matching message +appears on the thread, the watcher's eyebrow is raised. Subsequent messages +to the issuing agent include a nudge until the agent explicitly closes +the todo with . + +Design: +- Observer pattern, not interceptor — messages flow normally +- Thread-scoped — watchers only see messages on their thread +- Persistent nudge — keeps nagging until explicit close +- Cheap matching — payload type + optional source filter + +Usage: + registry = get_todo_registry() + + # Agent issues TodoUntil + watcher_id = registry.register( + thread_id="...", + issuer="greeter", + wait_for="ShoutedResponse", + from_listener="shouter", # optional + ) + + # On each message, check for matches + registry.check(message_state) + + # When dispatching to an agent, get raised eyebrows + raised = registry.get_raised_for(thread_id, agent_name) + + # Agent closes todo + registry.close(watcher_id) +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Any, Optional +import uuid +import threading + + +@dataclass +class TodoWatcher: + """A pending todo that watches for a condition on a thread.""" + + id: str # Unique ID for explicit close + thread_id: str # Thread being watched + issuer: str # Agent that issued the todo (who to nag) + wait_for: str # Payload type to match (e.g., "ShoutedResponse") + from_listener: Optional[str] = None # Optional: must be from specific source + description: str = "" # Human-readable description of what we're waiting for + + eyebrow_raised: bool = False # True when condition appears satisfied + triggered_by: Any = None # The payload that raised the eyebrow + triggered_from: str = "" # Who sent the triggering message + + +class TodoRegistry: + """ + Registry for TodoUntil watchers. + + Thread-safe. Singleton pattern via get_todo_registry(). + """ + + def __init__(self): + self._lock = threading.Lock() + # thread_id -> list of watchers + self._watchers: Dict[str, List[TodoWatcher]] = {} + # watcher_id -> watcher (for fast lookup on close) + self._by_id: Dict[str, TodoWatcher] = {} + + def register( + self, + thread_id: str, + issuer: str, + wait_for: str, + from_listener: Optional[str] = None, + description: str = "", + ) -> str: + """ + Register a new todo watcher. + + Returns the watcher ID for explicit close. + """ + watcher_id = str(uuid.uuid4()) + watcher = TodoWatcher( + id=watcher_id, + thread_id=thread_id, + issuer=issuer, + wait_for=wait_for.lower(), # Normalize for matching + from_listener=from_listener.lower() if from_listener else None, + description=description, + ) + + with self._lock: + if thread_id not in self._watchers: + self._watchers[thread_id] = [] + self._watchers[thread_id].append(watcher) + self._by_id[watcher_id] = watcher + + return watcher_id + + def check(self, thread_id: str, payload_type: str, from_id: str, payload: Any = None) -> List[TodoWatcher]: + """ + Check if any watchers on this thread match the incoming message. + + Raises eyebrows on matching watchers. Returns list of newly raised. + """ + newly_raised = [] + payload_type_lower = payload_type.lower() + from_id_lower = from_id.lower() if from_id else "" + + with self._lock: + watchers = self._watchers.get(thread_id, []) + for watcher in watchers: + if watcher.eyebrow_raised: + continue # Already raised + + # Check payload type match + if watcher.wait_for not in payload_type_lower: + continue + + # Check optional from_listener filter + if watcher.from_listener and watcher.from_listener != from_id_lower: + continue + + # Match! Raise the eyebrow + watcher.eyebrow_raised = True + watcher.triggered_by = payload + watcher.triggered_from = from_id + newly_raised.append(watcher) + + return newly_raised + + def get_raised_for(self, thread_id: str, agent: str) -> List[TodoWatcher]: + """ + Get all raised eyebrows for this agent on this thread. + + These are the todos that appear satisfied and should be nagged about. + """ + agent_lower = agent.lower() + with self._lock: + watchers = self._watchers.get(thread_id, []) + return [w for w in watchers if w.issuer.lower() == agent_lower and w.eyebrow_raised] + + def get_pending_for(self, thread_id: str, agent: str) -> List[TodoWatcher]: + """ + Get all pending (not yet raised) todos for this agent on this thread. + + Useful for showing the agent what it's still waiting for. + """ + agent_lower = agent.lower() + with self._lock: + watchers = self._watchers.get(thread_id, []) + return [w for w in watchers if w.issuer.lower() == agent_lower and not w.eyebrow_raised] + + def close(self, watcher_id: str) -> bool: + """ + Close a todo by ID. + + Returns True if found and removed, False if not found. + """ + with self._lock: + watcher = self._by_id.pop(watcher_id, None) + if watcher is None: + return False + + thread_watchers = self._watchers.get(watcher.thread_id, []) + try: + thread_watchers.remove(watcher) + except ValueError: + pass + + # Clean up empty thread entries + if not thread_watchers: + self._watchers.pop(watcher.thread_id, None) + + return True + + def close_all_for_thread(self, thread_id: str) -> int: + """ + Close all watchers for a thread (e.g., when thread ends). + + Returns count of watchers removed. + """ + with self._lock: + watchers = self._watchers.pop(thread_id, []) + for w in watchers: + self._by_id.pop(w.id, None) + return len(watchers) + + def format_nudge(self, watchers: List[TodoWatcher]) -> str: + """ + Format raised eyebrows as a nudge string for the LLM. + + Returns empty string if no raised eyebrows. + """ + if not watchers: + return "" + + lines = ["[SYSTEM NOTE: The following todos appear complete:]"] + for w in watchers: + desc = f" ({w.description})" if w.description else "" + lines.append(f" - Waiting for {w.wait_for}{desc}: received from {w.triggered_from}") + lines.append(f" Close with: {w.id}") + + return "\n".join(lines) + + def clear(self): + """Clear all watchers. Useful for testing.""" + with self._lock: + self._watchers.clear() + self._by_id.clear() + + +# ============================================================================ +# Singleton +# ============================================================================ + +_registry: Optional[TodoRegistry] = None +_registry_lock = threading.Lock() + + +def get_todo_registry() -> TodoRegistry: + """Get the global TodoRegistry singleton.""" + global _registry + if _registry is None: + with _registry_lock: + if _registry is None: + _registry = TodoRegistry() + return _registry diff --git a/agentserver/primitives/__init__.py b/agentserver/primitives/__init__.py new file mode 100644 index 0000000..5d95412 --- /dev/null +++ b/agentserver/primitives/__init__.py @@ -0,0 +1,27 @@ +""" +System primitives — Core message types handled by the organism itself. + +These are not user-defined listeners but system-level messages that +establish context, handle errors, and manage the organism lifecycle. +""" + +from agentserver.primitives.boot import Boot, handle_boot +from agentserver.primitives.todo import ( + TodoUntil, + TodoComplete, + TodoRegistered, + TodoClosed, + handle_todo_until, + handle_todo_complete, +) + +__all__ = [ + "Boot", + "handle_boot", + "TodoUntil", + "TodoComplete", + "TodoRegistered", + "TodoClosed", + "handle_todo_until", + "handle_todo_complete", +] diff --git a/agentserver/primitives/boot.py b/agentserver/primitives/boot.py new file mode 100644 index 0000000..8cdbd7d --- /dev/null +++ b/agentserver/primitives/boot.py @@ -0,0 +1,77 @@ +""" +boot.py — System boot primitive. + +The message is the first message in every organism's lifetime. +It establishes the root thread from which all other threads descend. + +The boot handler: +1. Logs organism startup +2. Initializes any system-level state +3. Sends initial ConsolePrompt to start the console REPL + +All external messages that arrive without a known thread parent +will be registered as children of the boot thread. +""" + +from dataclasses import dataclass +import logging + +from third_party.xmlable import xmlify +from agentserver.message_bus.message_state import HandlerMetadata, HandlerResponse + +logger = logging.getLogger(__name__) + + +@xmlify +@dataclass +class Boot: + """ + System boot message — first message in organism lifetime. + + Injected automatically at startup. Establishes root thread context. + """ + organism_name: str = "" + timestamp: str = "" + listener_count: int = 0 + + +@xmlify +@dataclass +class ConsolePrompt: + """ + Prompt message to the console. + + Duplicated here to avoid circular import with handlers.console. + The pump will route based on payload class name. + """ + output: str = "" + source: str = "" + show_banner: bool = False + + +async def handle_boot(payload: Boot, metadata: HandlerMetadata) -> HandlerResponse: + """ + Handle the system boot message. + + Logs the boot event and sends initial ConsolePrompt to start the REPL. + """ + logger.info( + f"Organism '{payload.organism_name}' booted at {payload.timestamp} " + f"with {payload.listener_count} listeners. " + f"Root thread: {metadata.thread_id}" + ) + + # Could initialize system state here: + # - Warm up LLM connections + # - Load cached schemas + # - Pre-populate routing caches + + # Send initial prompt to console to start the REPL + return HandlerResponse( + payload=ConsolePrompt( + output=f"Organism '{payload.organism_name}' ready.\n{payload.listener_count} listeners registered.", + source="system", + show_banner=True, + ), + to="console", + ) diff --git a/agentserver/primitives/todo.py b/agentserver/primitives/todo.py new file mode 100644 index 0000000..de6c7e3 --- /dev/null +++ b/agentserver/primitives/todo.py @@ -0,0 +1,139 @@ +""" +todo.py — TodoUntil and TodoComplete system primitives. + +These payloads allow agents to register watchers that monitor the thread +for specific conditions, and to close those watchers when done. + +Usage by an agent: + # Issue a todo - "wait for ShoutedResponse from shouter" + return HandlerResponse( + payload=TodoUntil( + wait_for="ShoutedResponse", + from_listener="shouter", + description="waiting for shouter to respond", + ), + to="system.todo", + ) + + # Later, when nagged about a raised eyebrow, close it + return HandlerResponse( + payload=TodoComplete(id="..."), + to="system.todo", + ) + +The system.todo listener handles these messages: +- TodoUntil: registers a watcher in the TodoRegistry +- TodoComplete: closes the watcher +""" + +from dataclasses import dataclass +from typing import Optional +import logging + +from third_party.xmlable import xmlify +from agentserver.message_bus.message_state import HandlerMetadata, HandlerResponse +from agentserver.message_bus.todo_registry import get_todo_registry + +logger = logging.getLogger(__name__) + + +@xmlify +@dataclass +class TodoUntil: + """ + Register a todo watcher on the current thread. + + The agent will be nagged when the condition appears satisfied, + until it explicitly closes with TodoComplete. + """ + wait_for: str = "" # Payload type to watch for + from_listener: str = "" # Optional: must be from this listener + description: str = "" # Human-readable description + + +@xmlify +@dataclass +class TodoComplete: + """ + Close a todo watcher. + + Sent by an agent when it acknowledges the todo is complete. + """ + id: str = "" # Watcher ID to close + + +@xmlify +@dataclass +class TodoRegistered: + """ + Acknowledgment that a todo was registered. + + Sent back to the issuing agent with the watcher ID. + """ + id: str = "" # Watcher ID for later close + wait_for: str = "" # What we're watching for + description: str = "" # Echo back the description + + +@xmlify +@dataclass +class TodoClosed: + """ + Acknowledgment that a todo was closed. + """ + id: str = "" # Watcher ID that was closed + was_raised: bool = False # Whether the eyebrow was raised when closed + + +async def handle_todo_until(payload: TodoUntil, metadata: HandlerMetadata) -> HandlerResponse: + """ + Handle TodoUntil — register a watcher for this thread. + + Returns TodoRegistered to acknowledge. + """ + registry = get_todo_registry() + + watcher_id = registry.register( + thread_id=metadata.thread_id, + issuer=metadata.from_id, + wait_for=payload.wait_for, + from_listener=payload.from_listener or None, + description=payload.description, + ) + + logger.info( + f"TodoUntil registered: {watcher_id} on thread {metadata.thread_id[:8]}... " + f"by {metadata.from_id}, waiting for {payload.wait_for}" + ) + + return HandlerResponse( + payload=TodoRegistered( + id=watcher_id, + wait_for=payload.wait_for, + description=payload.description, + ), + to=metadata.from_id, + ) + + +async def handle_todo_complete(payload: TodoComplete, metadata: HandlerMetadata) -> Optional[HandlerResponse]: + """ + Handle TodoComplete — close a watcher. + + Returns TodoClosed to acknowledge, or None if not found. + """ + registry = get_todo_registry() + + # Get watcher info before closing (for the response) + watcher = registry._by_id.get(payload.id) + was_raised = watcher.eyebrow_raised if watcher else False + + if registry.close(payload.id): + logger.info(f"TodoComplete: closed {payload.id} (was_raised={was_raised})") + return HandlerResponse( + payload=TodoClosed(id=payload.id, was_raised=was_raised), + to=metadata.from_id, + ) + else: + logger.warning(f"TodoComplete: watcher {payload.id} not found") + return None diff --git a/handlers/hello.py b/handlers/hello.py index 562a56e..609a419 100644 --- a/handlers/hello.py +++ b/handlers/hello.py @@ -24,14 +24,9 @@ Usage in organism.yaml: """ from dataclasses import dataclass -from lxml import etree from third_party.xmlable import xmlify -from agentserver.message_bus.message_state import HandlerMetadata - - -# Envelope namespace -ENVELOPE_NS = "https://xml-pipeline.org/ns/envelope/v1" +from agentserver.message_bus.message_state import HandlerMetadata, HandlerResponse @xmlify @@ -56,70 +51,78 @@ class ShoutedResponse: message: str -def wrap_in_envelope(payload_bytes: bytes, from_id: str, to_id: str, thread_id: str) -> bytes: - """Wrap a payload in a proper message envelope. - - Adds xmlns="" to payload to prevent it inheriting envelope namespace. - """ - payload_str = payload_bytes.decode('utf-8') - - # Add xmlns="" to payload root to keep it out of envelope namespace - if 'xmlns=' not in payload_str: - idx = payload_str.index('>') - payload_str = payload_str[:idx] + ' xmlns=""' + payload_str[idx:] - - return f""" - - {from_id} - {to_id} - {thread_id} - - {payload_str} -""".encode('utf-8') - - -async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> bytes: +async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> HandlerResponse: """ Handle an incoming Greeting and forward GreetingResponse to shouter. - Flow: user -> greeter -> shouter + Flow: console-router -> greeter -> shouter -> response-handler + + Demonstrates TodoUntil pattern: + 1. Register a watcher for ShoutedResponse from shouter + 2. Send GreetingResponse to shouter + 3. When ShoutedResponse appears, eyebrow is raised + 4. On next invocation, greeter sees nudge and can close the todo """ - # Create response, tracking original sender for later - response = GreetingResponse( - message=f"Hello, {payload.name}!", - original_sender=metadata.from_id, - ) + from agentserver.llm import complete + from agentserver.message_bus.todo_registry import get_todo_registry - # Serialize to XML - response_tree = response.xml_value("GreetingResponse") - payload_bytes = etree.tostring(response_tree, encoding='utf-8') + # Check for any raised todos and close them + todo_registry = get_todo_registry() + if metadata.todo_nudge: + # We have raised todos - check and close them + raised = todo_registry.get_raised_for(metadata.thread_id, metadata.own_name or "greeter") + for watcher in raised: + todo_registry.close(watcher.id) + # In a real scenario, we might log or react to the completed todo - # Forward to shouter (not back to sender) - return wrap_in_envelope( - payload_bytes=payload_bytes, - from_id=metadata.own_name or "greeter", - to_id="shouter", # Forward to shouter agent + # Register a todo watcher - we want to know when shouter responds + # This demonstrates the "await confirmation" pattern + todo_registry.register( thread_id=metadata.thread_id, + issuer=metadata.own_name or "greeter", + wait_for="ShoutedResponse", + from_listener="shouter", + description=f"waiting for shouter to process greeting for {payload.name}", + ) + + # Build system prompt with peer awareness + system_prompt = "You are a friendly greeter. Respond with ONLY a single short enthusiastic greeting sentence. No XML, no markup, just the greeting text." + if metadata.usage_instructions: + system_prompt = metadata.usage_instructions + "\n\n" + system_prompt + + # Include any todo nudges in the prompt (for LLM awareness) + if metadata.todo_nudge: + system_prompt = system_prompt + "\n\n" + metadata.todo_nudge + + # Use LLM to generate a creative greeting + llm_response = await complete( + model="grok-3-mini-beta", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"Greet {payload.name} enthusiastically."}, + ], + agent_id=metadata.own_name, + temperature=0.9, + ) + + # Return clean dataclass + target - pump handles envelope + return HandlerResponse( + payload=GreetingResponse( + message=llm_response.content, + original_sender="response-handler", + ), + to="shouter", ) -async def handle_shout(payload: GreetingResponse, metadata: HandlerMetadata) -> bytes: +async def handle_shout(payload: GreetingResponse, metadata: HandlerMetadata) -> HandlerResponse: """ Handle GreetingResponse by shouting it back to original sender. - Flow: greeter -> shouter -> user + Flow: greeter -> shouter -> original_sender (response-handler) """ - # Create ALL CAPS response - response = ShoutedResponse(message=payload.message.upper()) - - # Serialize to XML - response_tree = response.xml_value("ShoutedResponse") - payload_bytes = etree.tostring(response_tree, encoding='utf-8') - - # Send back to original sender (tracked in payload) - return wrap_in_envelope( - payload_bytes=payload_bytes, - from_id=metadata.own_name or "shouter", - to_id=payload.original_sender, # Back to whoever started the conversation - thread_id=metadata.thread_id, + # Return clean dataclass + target - pump handles envelope + return HandlerResponse( + payload=ShoutedResponse(message=payload.message.upper()), + to=payload.original_sender, ) diff --git a/tests/test_pump_integration.py b/tests/test_pump_integration.py index 0252ebb..b4fa480 100644 --- a/tests/test_pump_integration.py +++ b/tests/test_pump_integration.py @@ -14,7 +14,9 @@ from unittest.mock import AsyncMock, patch from agentserver.message_bus import StreamPump, bootstrap, MessageState from agentserver.message_bus.stream_pump import ConfigLoader, ListenerConfig, OrganismConfig, Listener -from handlers.hello import Greeting, GreetingResponse, handle_greeting, handle_shout, ENVELOPE_NS +from handlers.hello import Greeting, GreetingResponse, handle_greeting, handle_shout + +ENVELOPE_NS = "https://xml-pipeline.org/ns/envelope/v1" def make_envelope(payload_xml: str, from_id: str, to_id: str, thread_id: str) -> bytes: @@ -50,17 +52,13 @@ class TestPumpBootstrap: config = ConfigLoader.load('config/organism.yaml') assert config.name == "hello-world" - assert len(config.listeners) == 2 + assert len(config.listeners) == 5 # console-router, response-handler, console, greeter, shouter - # Greeter listener - assert config.listeners[0].name == "greeter" - assert config.listeners[0].payload_class == Greeting - assert config.listeners[0].handler == handle_greeting - - # Shouter listener - assert config.listeners[1].name == "shouter" - assert config.listeners[1].payload_class == GreetingResponse - assert config.listeners[1].handler == handle_shout + # Find greeter and shouter by name + listener_names = [lc.name for lc in config.listeners] + assert "greeter" in listener_names + assert "shouter" in listener_names + assert "console-router" in listener_names @pytest.mark.asyncio async def test_bootstrap_creates_pump(self): @@ -68,10 +66,11 @@ class TestPumpBootstrap: pump = await bootstrap('config/organism.yaml') assert pump.config.name == "hello-world" + assert len(pump.routing_table) == 8 # 5 user listeners + 3 system (boot, todo, todo-complete) assert "greeter.greeting" in pump.routing_table assert "shouter.greetingresponse" in pump.routing_table - assert pump.listeners["greeter"].payload_class == Greeting - assert pump.listeners["shouter"].payload_class == GreetingResponse + assert "console-router.consoleinput" in pump.routing_table + assert "system.boot.boot" in pump.routing_table # Boot listener @pytest.mark.asyncio async def test_bootstrap_generates_xsd(self): @@ -95,10 +94,20 @@ class TestPumpInjection: """inject() should add a MessageState to the queue.""" pump = await bootstrap('config/organism.yaml') + # Bootstrap already injects a boot message, so queue starts with 1 + initial_size = pump.queue.qsize() + assert initial_size == 1 # Boot message + thread_id = str(uuid.uuid4()) await pump.inject(b"", thread_id, from_id="user") - assert pump.queue.qsize() == 1 + assert pump.queue.qsize() == initial_size + 1 + + # Drain the boot message first + boot_state = await pump.queue.get() + assert b"Boot" in boot_state.raw_bytes + + # Then get our test message state = await pump.queue.get() assert state.raw_bytes == b"" assert state.thread_id == thread_id @@ -112,54 +121,81 @@ class TestFullPipelineFlow: async def test_greeting_round_trip(self): """ Full integration test: - 1. Inject a Greeting message + 1. Inject a Greeting message directly to greeter (bypassing console flow) 2. Pump processes it through the pipeline 3. Handler is called with deserialized Greeting 4. Handler response is re-injected """ - pump = await bootstrap('config/organism.yaml') + # Create a minimal config without console (console awaits stdin, blocks tests) + config = OrganismConfig(name="test-greeting") + pump = StreamPump(config) + + # Register just greeter + lc = ListenerConfig( + name="greeter", + payload_class_path="handlers.hello.Greeting", + handler_path="handlers.hello.handle_greeting", + description="Test greeter", + is_agent=True, + peers=["shouter"], + payload_class=Greeting, + handler=handle_greeting, + ) + pump.register_listener(lc) # Track what the handler receives handler_calls = [] original_handler = pump.listeners["greeter"].handler + # Mock the LLM call since we don't have a real API key in tests + from agentserver.llm.backend import LLMResponse + + mock_response = LLMResponse( + content="Hello, World!", + model="mock", + usage={"total_tokens": 10}, + finish_reason="stop", + ) + async def tracking_handler(payload, metadata): handler_calls.append((payload, metadata)) + # Use mocked original handler return await original_handler(payload, metadata) pump.listeners["greeter"].handler = tracking_handler - # Create and inject a Greeting message - thread_id = str(uuid.uuid4()) - envelope = make_envelope( - payload_xml="World", - from_id="user", - to_id="greeter", - thread_id=thread_id, - ) + with patch('agentserver.llm.complete', new=AsyncMock(return_value=mock_response)): + # Create and inject a Greeting message + thread_id = str(uuid.uuid4()) + envelope = make_envelope( + payload_xml="World", + from_id="user", + to_id="greeter", + thread_id=thread_id, + ) - await pump.inject(envelope, thread_id, from_id="user") + await pump.inject(envelope, thread_id, from_id="user") - # Run pump briefly to process the message - pump._running = True - pipeline = pump.build_pipeline(pump._queue_source()) + # Run pump briefly to process the message + pump._running = True + pipeline = pump.build_pipeline(pump._queue_source()) - # Process with timeout - async def run_with_timeout(): - async with pipeline.stream() as streamer: - try: - async for _ in streamer: - # One iteration should process our message - break - except asyncio.CancelledError: - pass + # Process with timeout + async def run_with_timeout(): + async with pipeline.stream() as streamer: + try: + async for _ in streamer: + # One iteration should process our message + break + except asyncio.CancelledError: + pass - try: - await asyncio.wait_for(run_with_timeout(), timeout=2.0) - except asyncio.TimeoutError: - pass - finally: - pump._running = False + try: + await asyncio.wait_for(run_with_timeout(), timeout=2.0) + except asyncio.TimeoutError: + pass + finally: + pump._running = False # Verify handler was called assert len(handler_calls) == 1 @@ -173,11 +209,25 @@ class TestFullPipelineFlow: @pytest.mark.asyncio async def test_handler_response_reinjected(self): """Handler response should be re-injected into the queue.""" - pump = await bootstrap('config/organism.yaml') + # Create a minimal config without console (console awaits stdin, blocks tests) + config = OrganismConfig(name="test-reinjection") + pump = StreamPump(config) + + # Register just greeter + lc = ListenerConfig( + name="greeter", + payload_class_path="handlers.hello.Greeting", + handler_path="handlers.hello.handle_greeting", + description="Test greeter", + is_agent=True, + peers=["shouter"], + payload_class=Greeting, + handler=handle_greeting, + ) + pump.register_listener(lc) # Capture re-injected messages reinjected = [] - original_reinject = pump._reinject_responses async def capture_reinject(state): reinjected.append(state) @@ -185,35 +235,46 @@ class TestFullPipelineFlow: pump._reinject_responses = capture_reinject - # Inject a Greeting - thread_id = str(uuid.uuid4()) - envelope = make_envelope( - payload_xml="Alice", - from_id="user", - to_id="greeter", - thread_id=thread_id, + # Mock the LLM call since we don't have a real API key in tests + from agentserver.llm.backend import LLMResponse + + mock_response = LLMResponse( + content="Hello, Alice!", + model="mock", + usage={"total_tokens": 10}, + finish_reason="stop", ) - await pump.inject(envelope, thread_id, from_id="user") + with patch('agentserver.llm.complete', new=AsyncMock(return_value=mock_response)): + # Inject a Greeting + thread_id = str(uuid.uuid4()) + envelope = make_envelope( + payload_xml="Alice", + from_id="user", + to_id="greeter", + thread_id=thread_id, + ) - # Run pump briefly - pump._running = True - pipeline = pump.build_pipeline(pump._queue_source()) + await pump.inject(envelope, thread_id, from_id="user") - async def run_with_timeout(): - async with pipeline.stream() as streamer: - try: - async for _ in streamer: - break - except asyncio.CancelledError: - pass + # Run pump briefly + pump._running = True + pipeline = pump.build_pipeline(pump._queue_source()) - try: - await asyncio.wait_for(run_with_timeout(), timeout=2.0) - except asyncio.TimeoutError: - pass - finally: - pump._running = False + async def run_with_timeout(): + async with pipeline.stream() as streamer: + try: + async for _ in streamer: + break + except asyncio.CancelledError: + pass + + try: + await asyncio.wait_for(run_with_timeout(), timeout=2.0) + except asyncio.TimeoutError: + pass + finally: + pump._running = False # Verify response was re-injected assert len(reinjected) == 1 @@ -221,7 +282,6 @@ class TestFullPipelineFlow: assert response_state.raw_bytes is not None assert b"Hello, Alice!" in response_state.raw_bytes - assert response_state.thread_id == thread_id assert response_state.from_id == "greeter" @@ -319,6 +379,317 @@ class TestErrorHandling: assert any("nonexistent" in e for e in errors) +class TestThreadRoutingFlow: + """ + Test full thread routing: console-router → greeter (LLM) → shouter → response-handler. + + This verifies that thread IDs are properly propagated and extended through + the entire message chain, including LLM agent calls. + """ + + @pytest.mark.asyncio + async def test_full_thread_routing_chain(self): + """ + Trace thread ID through: console-router → greeter → shouter → response-handler. + + 1. Inject ConsoleInput (simulating user input) + 2. Console-router routes to greeter with Greeting + 3. Greeter calls LLM, sends GreetingResponse to shouter + 4. Shouter sends ShoutedResponse to response-handler + 5. Response-handler creates ConsolePrompt + + Thread ID must be consistent through entire chain. + """ + from handlers.console import ConsoleInput, ConsolePrompt, ShoutedResponse + from handlers.console import handle_console_input, handle_shouted_response + from handlers.hello import Greeting, GreetingResponse, handle_greeting, handle_shout + from agentserver.llm.backend import LLMResponse + from agentserver.message_bus.thread_registry import get_registry + + # Create pump with full routing chain (but no console - it blocks on stdin) + config = OrganismConfig(name="thread-routing-test") + pump = StreamPump(config) + + # Register all handlers in the chain + pump.register_listener(ListenerConfig( + name="console-router", + payload_class_path="handlers.console.ConsoleInput", + handler_path="handlers.console.handle_console_input", + description="Routes console input", + payload_class=ConsoleInput, + handler=handle_console_input, + )) + + pump.register_listener(ListenerConfig( + name="greeter", + payload_class_path="handlers.hello.Greeting", + handler_path="handlers.hello.handle_greeting", + description="Greeting agent", + is_agent=True, + peers=["shouter"], + payload_class=Greeting, + handler=handle_greeting, + )) + + pump.register_listener(ListenerConfig( + name="shouter", + payload_class_path="handlers.hello.GreetingResponse", + handler_path="handlers.hello.handle_shout", + description="Shouts responses", + payload_class=GreetingResponse, + handler=handle_shout, + )) + + pump.register_listener(ListenerConfig( + name="response-handler", + payload_class_path="handlers.console.ShoutedResponse", + handler_path="handlers.console.handle_shouted_response", + description="Forwards to console", + payload_class=ShoutedResponse, + handler=handle_shouted_response, + )) + + # Track thread IDs at each handler call + thread_trace = [] + + # Wrap handlers to capture thread IDs + original_console_router = pump.listeners["console-router"].handler + original_greeter = pump.listeners["greeter"].handler + original_shouter = pump.listeners["shouter"].handler + original_response = pump.listeners["response-handler"].handler + + async def trace_console_router(payload, metadata): + thread_trace.append(("console-router", metadata.thread_id, payload)) + return await original_console_router(payload, metadata) + + async def trace_greeter(payload, metadata): + thread_trace.append(("greeter", metadata.thread_id, payload)) + return await original_greeter(payload, metadata) + + async def trace_shouter(payload, metadata): + thread_trace.append(("shouter", metadata.thread_id, payload)) + return await original_shouter(payload, metadata) + + async def trace_response(payload, metadata): + thread_trace.append(("response-handler", metadata.thread_id, payload)) + return await original_response(payload, metadata) + + pump.listeners["console-router"].handler = trace_console_router + pump.listeners["greeter"].handler = trace_greeter + pump.listeners["shouter"].handler = trace_shouter + pump.listeners["response-handler"].handler = trace_response + + # Mock LLM response + mock_llm = LLMResponse( + content="Hello there, friend!", + model="mock", + usage={"total_tokens": 10}, + finish_reason="stop", + ) + + # Capture final output (response-handler sends to console, but console isn't registered) + final_outputs = [] + + async def capture_reinject(state): + final_outputs.append(state) + # Re-inject to continue the chain (if not to console) + if state.raw_bytes and b"console" not in state.raw_bytes: + await pump.queue.put(state) + + pump._reinject_responses = capture_reinject + + with patch('agentserver.llm.complete', new=AsyncMock(return_value=mock_llm)): + # Inject ConsoleInput (simulating: user typed "@greeter TestUser") + # Note: xmlify converts field names to PascalCase for XML elements + thread_id = str(uuid.uuid4()) + envelope = make_envelope( + payload_xml="TestUsergreeter", + from_id="console", + to_id="console-router", + thread_id=thread_id, + ) + + await pump.inject(envelope, thread_id, from_id="console") + + # Run pump to process all messages in chain + pump._running = True + pipeline = pump.build_pipeline(pump._queue_source()) + + async def run_chain(): + async with pipeline.stream() as streamer: + count = 0 + async for _ in streamer: + count += 1 + # Process up to 5 messages (should be enough for full chain) + if count >= 5: + break + + try: + await asyncio.wait_for(run_chain(), timeout=3.0) + except asyncio.TimeoutError: + pass + finally: + pump._running = False + + # Verify the trace + assert len(thread_trace) >= 4, f"Expected 4+ handler calls, got {len(thread_trace)}: {[t[0] for t in thread_trace]}" + + # All handlers should see a thread ID derived from the original + handler_names = [t[0] for t in thread_trace] + assert "console-router" in handler_names + assert "greeter" in handler_names + assert "shouter" in handler_names + assert "response-handler" in handler_names + + # Verify payloads were correctly routed + for name, tid, payload in thread_trace: + if name == "console-router": + assert isinstance(payload, ConsoleInput) + assert payload.target == "greeter" + elif name == "greeter": + assert isinstance(payload, Greeting) + assert payload.name == "TestUser" + elif name == "shouter": + assert isinstance(payload, GreetingResponse) + assert "Hello" in payload.message or "friend" in payload.message + elif name == "response-handler": + assert isinstance(payload, ShoutedResponse) + assert payload.message.isupper() # Shouted = ALL CAPS + + # Verify thread registry has entries for this chain + registry = get_registry() + assert registry.lookup(thread_id) is not None or len(thread_trace) > 0 + + @pytest.mark.asyncio + async def test_thread_id_chain_extension(self): + """ + Verify thread IDs are extended as messages flow through agents. + + The thread registry should show the chain growing: + - Initial: console → console-router + - After greeter: chain includes greeter + - After shouter: chain includes shouter + """ + from handlers.console import ConsoleInput, ShoutedResponse + from handlers.console import handle_console_input, handle_shouted_response + from handlers.hello import Greeting, GreetingResponse, handle_greeting, handle_shout + from agentserver.llm.backend import LLMResponse + from agentserver.message_bus.thread_registry import ThreadRegistry + + # Use a fresh registry for this test + test_registry = ThreadRegistry() + + # Create pump + config = OrganismConfig(name="thread-chain-test") + pump = StreamPump(config) + + # Patch get_registry to use our test registry + with patch('agentserver.message_bus.stream_pump.get_registry', return_value=test_registry): + with patch('agentserver.message_bus.thread_registry.get_registry', return_value=test_registry): + # Register handlers + pump.register_listener(ListenerConfig( + name="console-router", + payload_class_path="handlers.console.ConsoleInput", + handler_path="handlers.console.handle_console_input", + description="Routes console input", + payload_class=ConsoleInput, + handler=handle_console_input, + )) + + pump.register_listener(ListenerConfig( + name="greeter", + payload_class_path="handlers.hello.Greeting", + handler_path="handlers.hello.handle_greeting", + description="Greeting agent", + is_agent=True, + peers=["shouter"], + payload_class=Greeting, + handler=handle_greeting, + )) + + pump.register_listener(ListenerConfig( + name="shouter", + payload_class_path="handlers.hello.GreetingResponse", + handler_path="handlers.hello.handle_shout", + description="Shouts responses", + payload_class=GreetingResponse, + handler=handle_shout, + )) + + pump.register_listener(ListenerConfig( + name="response-handler", + payload_class_path="handlers.console.ShoutedResponse", + handler_path="handlers.console.handle_shouted_response", + description="Forwards to console", + payload_class=ShoutedResponse, + handler=handle_shouted_response, + )) + + # Track thread IDs after each handler + thread_ids_seen = [] + + original_greeter = pump.listeners["greeter"].handler + + async def trace_thread_after_greeter(payload, metadata): + thread_ids_seen.append(("greeter_received", metadata.thread_id)) + result = await original_greeter(payload, metadata) + return result + + pump.listeners["greeter"].handler = trace_thread_after_greeter + + # Prevent re-injection loops + async def noop_reinject(state): + pass + pump._reinject_responses = noop_reinject + + # Mock LLM + mock_llm = LLMResponse( + content="Hello!", + model="mock", + usage={"total_tokens": 5}, + finish_reason="stop", + ) + + with patch('agentserver.llm.complete', new=AsyncMock(return_value=mock_llm)): + # Inject initial message + thread_id = str(uuid.uuid4()) + envelope = make_envelope( + payload_xml="Alicegreeter", + from_id="console", + to_id="console-router", + thread_id=thread_id, + ) + + await pump.inject(envelope, thread_id, from_id="console") + + # Run pipeline + pump._running = True + pipeline = pump.build_pipeline(pump._queue_source()) + + async def run_chain(): + async with pipeline.stream() as streamer: + count = 0 + async for _ in streamer: + count += 1 + if count >= 4: + break + + try: + await asyncio.wait_for(run_chain(), timeout=3.0) + except asyncio.TimeoutError: + pass + finally: + pump._running = False + + # Verify registry has tracked the chain + chain = test_registry.lookup(thread_id) + assert chain is not None, "Thread should be registered" + + # The chain should show the message flow path + # e.g. "console.console-router" or similar + assert "console" in chain.lower() or len(thread_ids_seen) > 0 + + class TestManualPumpConfiguration: """Test creating a pump without YAML config.""" diff --git a/tests/test_todo_registry.py b/tests/test_todo_registry.py new file mode 100644 index 0000000..2f64b9b --- /dev/null +++ b/tests/test_todo_registry.py @@ -0,0 +1,569 @@ +""" +test_todo_registry.py — Tests for the TodoUntil watcher system. + +Tests: +1. TodoRegistry basic operations +2. Eyebrow raising on condition match +3. Nudge formatting +4. Integration with StreamPump +""" + +import pytest +import asyncio +import uuid +from unittest.mock import AsyncMock, patch + +from agentserver.message_bus.todo_registry import TodoRegistry, TodoWatcher, get_todo_registry +from agentserver.message_bus.stream_pump import StreamPump, ListenerConfig, OrganismConfig +from agentserver.message_bus.message_state import HandlerMetadata, HandlerResponse +from agentserver.primitives.todo import ( + TodoUntil, TodoComplete, TodoRegistered, TodoClosed, + handle_todo_until, handle_todo_complete, +) + + +class TestTodoRegistry: + """Test TodoRegistry basic operations.""" + + def test_register_creates_watcher(self): + """register() should create a watcher with correct fields.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + watcher_id = registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + from_listener="shouter", + description="waiting for shout", + ) + + assert watcher_id is not None + assert watcher_id in registry._by_id + assert len(registry._watchers[thread_id]) == 1 + + watcher = registry._by_id[watcher_id] + assert watcher.thread_id == thread_id + assert watcher.issuer == "greeter" + assert watcher.wait_for == "shoutedresponse" # Normalized to lowercase + assert watcher.from_listener == "shouter" + assert watcher.description == "waiting for shout" + assert watcher.eyebrow_raised is False + + def test_check_raises_eyebrow_on_match(self): + """check() should raise eyebrow when payload type matches.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + watcher_id = registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + ) + + # Check with matching payload type + newly_raised = registry.check( + thread_id=thread_id, + payload_type="ShoutedResponse", + from_id="shouter", + payload={"message": "HELLO!"}, + ) + + assert len(newly_raised) == 1 + assert newly_raised[0].id == watcher_id + assert newly_raised[0].eyebrow_raised is True + assert newly_raised[0].triggered_from == "shouter" + + def test_check_respects_from_listener_filter(self): + """check() should only match if from_listener matches.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + from_listener="shouter", # Must be from shouter + ) + + # Check from wrong sender - should NOT raise + newly_raised = registry.check( + thread_id=thread_id, + payload_type="ShoutedResponse", + from_id="other-agent", + ) + assert len(newly_raised) == 0 + + # Check from correct sender - should raise + newly_raised = registry.check( + thread_id=thread_id, + payload_type="ShoutedResponse", + from_id="shouter", + ) + assert len(newly_raised) == 1 + + def test_check_ignores_non_matching_payload(self): + """check() should not raise eyebrow for non-matching payload.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + ) + + # Check with non-matching payload type + newly_raised = registry.check( + thread_id=thread_id, + payload_type="Greeting", + from_id="console", + ) + + assert len(newly_raised) == 0 + + def test_check_ignores_different_thread(self): + """check() should not match watchers on different threads.""" + registry = TodoRegistry() + thread_id_1 = str(uuid.uuid4()) + thread_id_2 = str(uuid.uuid4()) + + registry.register( + thread_id=thread_id_1, + issuer="greeter", + wait_for="ShoutedResponse", + ) + + # Check on different thread + newly_raised = registry.check( + thread_id=thread_id_2, + payload_type="ShoutedResponse", + from_id="shouter", + ) + + assert len(newly_raised) == 0 + + def test_get_raised_for_returns_raised_watchers(self): + """get_raised_for() should return only raised watchers for agent.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + # Register two watchers for greeter + registry.register(thread_id, "greeter", "ShoutedResponse") + registry.register(thread_id, "greeter", "AnotherPayload") + + # Raise one + registry.check(thread_id, "ShoutedResponse", "shouter") + + raised = registry.get_raised_for(thread_id, "greeter") + assert len(raised) == 1 + assert raised[0].wait_for == "shoutedresponse" + + def test_close_removes_watcher(self): + """close() should remove watcher by ID.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + watcher_id = registry.register(thread_id, "greeter", "ShoutedResponse") + assert watcher_id in registry._by_id + + result = registry.close(watcher_id) + assert result is True + assert watcher_id not in registry._by_id + assert thread_id not in registry._watchers # Empty list cleaned up + + def test_close_returns_false_for_unknown(self): + """close() should return False for unknown watcher ID.""" + registry = TodoRegistry() + result = registry.close("nonexistent-id") + assert result is False + + def test_format_nudge_empty_for_no_raised(self): + """format_nudge() should return empty string for no raised watchers.""" + registry = TodoRegistry() + nudge = registry.format_nudge([]) + assert nudge == "" + + def test_format_nudge_includes_watcher_info(self): + """format_nudge() should format raised watchers.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + watcher_id = registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + description="waiting for shout", + ) + registry.check(thread_id, "ShoutedResponse", "shouter") + + raised = registry.get_raised_for(thread_id, "greeter") + nudge = registry.format_nudge(raised) + + assert "SYSTEM NOTE" in nudge + assert "shoutedresponse" in nudge.lower() + assert "waiting for shout" in nudge + assert watcher_id in nudge + assert "TodoComplete" in nudge + + +class TestTodoHandlers: + """Test TodoUntil and TodoComplete handlers.""" + + @pytest.mark.asyncio + async def test_handle_todo_until_registers_watcher(self): + """handle_todo_until should register a watcher.""" + # Clear any existing watchers + registry = get_todo_registry() + registry.clear() + + payload = TodoUntil( + wait_for="ShoutedResponse", + from_listener="shouter", + description="test watcher", + ) + + metadata = HandlerMetadata( + thread_id=str(uuid.uuid4()), + from_id="greeter", + ) + + response = await handle_todo_until(payload, metadata) + + assert isinstance(response, HandlerResponse) + assert isinstance(response.payload, TodoRegistered) + assert response.payload.wait_for == "ShoutedResponse" + assert response.to == "greeter" + + # Verify watcher was registered + assert response.payload.id in registry._by_id + + @pytest.mark.asyncio + async def test_handle_todo_complete_closes_watcher(self): + """handle_todo_complete should close a watcher.""" + registry = get_todo_registry() + registry.clear() + + thread_id = str(uuid.uuid4()) + + # First register a watcher + watcher_id = registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + ) + + # Raise its eyebrow + registry.check(thread_id, "ShoutedResponse", "shouter") + + # Now close it + payload = TodoComplete(id=watcher_id) + metadata = HandlerMetadata(thread_id=thread_id, from_id="greeter") + + response = await handle_todo_complete(payload, metadata) + + assert isinstance(response, HandlerResponse) + assert isinstance(response.payload, TodoClosed) + assert response.payload.id == watcher_id + assert response.payload.was_raised is True + + # Verify watcher was removed + assert watcher_id not in registry._by_id + + +class TestTodoIntegration: + """Integration tests for TodoUntil with StreamPump.""" + + @pytest.mark.asyncio + async def test_todo_nudge_appears_in_metadata(self): + """Raised eyebrows should appear in handler metadata.""" + from handlers.hello import Greeting, GreetingResponse, handle_greeting + from agentserver.llm.backend import LLMResponse + + # Clear registries + todo_registry = get_todo_registry() + todo_registry.clear() + + # Create pump with greeter + config = OrganismConfig(name="todo-test") + pump = StreamPump(config) + + pump.register_listener(ListenerConfig( + name="greeter", + payload_class_path="handlers.hello.Greeting", + handler_path="handlers.hello.handle_greeting", + description="Greeting agent", + is_agent=True, + peers=["shouter"], + payload_class=Greeting, + handler=handle_greeting, + )) + + # Register a todo watcher for greeter + thread_id = str(uuid.uuid4()) + watcher_id = todo_registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="SomeResponse", + description="waiting for something", + ) + + # Raise the eyebrow + todo_registry.check(thread_id, "SomeResponse", "other") + + # Track metadata passed to handler + captured_metadata = [] + original_handler = pump.listeners["greeter"].handler + + async def capturing_handler(payload, metadata): + captured_metadata.append(metadata) + return HandlerResponse( + payload=GreetingResponse(message="hi", original_sender="test"), + to="shouter", + ) + + pump.listeners["greeter"].handler = capturing_handler + + # Create and inject a message + from agentserver.message_bus.message_state import MessageState + + state = MessageState( + payload=Greeting(name="Test"), + thread_id=thread_id, + from_id="console", + target_listeners=[pump.listeners["greeter"]], + ) + + # Dispatch + responses = [] + async for resp in pump._dispatch_to_handlers(state): + responses.append(resp) + + # Verify nudge was in metadata + assert len(captured_metadata) == 1 + assert "SYSTEM NOTE" in captured_metadata[0].todo_nudge + assert watcher_id in captured_metadata[0].todo_nudge + + @pytest.mark.asyncio + async def test_message_raises_eyebrow(self): + """Incoming message should raise eyebrow on matching watcher.""" + from handlers.hello import Greeting, GreetingResponse + from handlers.console import ShoutedResponse + + # Clear registry + todo_registry = get_todo_registry() + todo_registry.clear() + + config = OrganismConfig(name="eyebrow-test") + pump = StreamPump(config) + + # Register a simple handler + async def noop_handler(payload, metadata): + return None + + pump.register_listener(ListenerConfig( + name="response-handler", + payload_class_path="handlers.console.ShoutedResponse", + handler_path="handlers.console.handle_shouted_response", + description="Test", + payload_class=ShoutedResponse, + handler=noop_handler, + )) + + # Register a watcher waiting for ShoutedResponse + thread_id = str(uuid.uuid4()) + watcher_id = todo_registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + ) + + # Eyebrow should not be raised yet + watcher = todo_registry._by_id[watcher_id] + assert watcher.eyebrow_raised is False + + # Dispatch a ShoutedResponse message + from agentserver.message_bus.message_state import MessageState + + state = MessageState( + payload=ShoutedResponse(message="HELLO!"), + thread_id=thread_id, + from_id="shouter", + target_listeners=[pump.listeners["response-handler"]], + ) + + async for _ in pump._dispatch_to_handlers(state): + pass + + # Now eyebrow should be raised + assert watcher.eyebrow_raised is True + assert watcher.triggered_from == "shouter" + + +class TestGreeterTodoFlow: + """Test greeter's use of TodoUntil.""" + + @pytest.mark.asyncio + async def test_greeter_registers_todo_and_eyebrow_raised(self): + """ + Greeter should register a TodoUntil, and eyebrow should raise + when ShoutedResponse arrives. + """ + from handlers.hello import Greeting, GreetingResponse, handle_greeting + from handlers.console import ShoutedResponse + from agentserver.llm.backend import LLMResponse + + # Clear registry + todo_registry = get_todo_registry() + todo_registry.clear() + + thread_id = str(uuid.uuid4()) + + # Mock LLM + mock_llm = LLMResponse( + content="Hello there!", + model="mock", + usage={"total_tokens": 5}, + finish_reason="stop", + ) + + with patch('agentserver.llm.complete', new=AsyncMock(return_value=mock_llm)): + # Call greeter handler + metadata = HandlerMetadata( + thread_id=thread_id, + from_id="console-router", + own_name="greeter", + ) + + response = await handle_greeting(Greeting(name="Alice"), metadata) + + # Verify response goes to shouter + assert response.to == "shouter" + assert isinstance(response.payload, GreetingResponse) + + # Verify a watcher was registered + watchers = todo_registry._watchers.get(thread_id, []) + assert len(watchers) == 1 + assert watchers[0].issuer == "greeter" + assert watchers[0].wait_for == "shoutedresponse" + assert watchers[0].eyebrow_raised is False + + # Simulate ShoutedResponse arriving (from shouter) + todo_registry.check( + thread_id=thread_id, + payload_type="ShoutedResponse", + from_id="shouter", + payload=ShoutedResponse(message="HELLO THERE!"), + ) + + # Eyebrow should now be raised + assert watchers[0].eyebrow_raised is True + assert watchers[0].triggered_from == "shouter" + + @pytest.mark.asyncio + async def test_greeter_closes_raised_todos_on_next_call(self): + """ + When greeter is called again with raised todos, it should close them. + """ + from handlers.hello import Greeting, GreetingResponse, handle_greeting + from agentserver.llm.backend import LLMResponse + + # Clear registry + todo_registry = get_todo_registry() + todo_registry.clear() + + thread_id = str(uuid.uuid4()) + + # Pre-register a raised todo (simulating previous invocation) + watcher_id = todo_registry.register( + thread_id=thread_id, + issuer="greeter", + wait_for="ShoutedResponse", + ) + todo_registry.check(thread_id, "ShoutedResponse", "shouter") + + # Verify eyebrow is raised + assert todo_registry._by_id[watcher_id].eyebrow_raised is True + + # Mock LLM + mock_llm = LLMResponse( + content="Hello again!", + model="mock", + usage={"total_tokens": 5}, + finish_reason="stop", + ) + + # Format the nudge as the pump would + raised = todo_registry.get_raised_for(thread_id, "greeter") + nudge = todo_registry.format_nudge(raised) + + with patch('agentserver.llm.complete', new=AsyncMock(return_value=mock_llm)): + # Call greeter with the nudge + metadata = HandlerMetadata( + thread_id=thread_id, + from_id="console-router", + own_name="greeter", + todo_nudge=nudge, + ) + + await handle_greeting(Greeting(name="Bob"), metadata) + + # Old watcher should be closed + assert watcher_id not in todo_registry._by_id + + # But a new one should be registered for this greeting + watchers = todo_registry._watchers.get(thread_id, []) + assert len(watchers) == 1 + assert watchers[0].id != watcher_id # New watcher + + +class TestTodoMultipleWatchers: + """Test scenarios with multiple watchers.""" + + def test_multiple_watchers_same_thread(self): + """Multiple watchers on same thread should work independently.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + id1 = registry.register(thread_id, "agent1", "ResponseA") + id2 = registry.register(thread_id, "agent2", "ResponseB") + id3 = registry.register(thread_id, "agent1", "ResponseB") + + # Raise only ResponseB watchers + raised = registry.check(thread_id, "ResponseB", "source") + + assert len(raised) == 2 + raised_ids = {w.id for w in raised} + assert id2 in raised_ids + assert id3 in raised_ids + assert id1 not in raised_ids + + def test_watcher_only_raised_once(self): + """Eyebrow should only raise once per watcher.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + registry.register(thread_id, "agent", "Response") + + # First check raises + raised1 = registry.check(thread_id, "Response", "source") + assert len(raised1) == 1 + + # Second check does not re-raise + raised2 = registry.check(thread_id, "Response", "source") + assert len(raised2) == 0 + + def test_close_all_for_thread(self): + """close_all_for_thread should remove all watchers on thread.""" + registry = TodoRegistry() + thread_id = str(uuid.uuid4()) + + id1 = registry.register(thread_id, "agent1", "A") + id2 = registry.register(thread_id, "agent2", "B") + + count = registry.close_all_for_thread(thread_id) + + assert count == 2 + assert id1 not in registry._by_id + assert id2 not in registry._by_id + assert thread_id not in registry._watchers