xml-pipeline/agentserver/message_bus/steps/routing_resolution.py
dullfig 82b5fcdd78 Replace MessageBus with aiostream-based StreamPump
Major refactor of the message pump architecture:

- Replace bus.py with stream_pump.py using aiostream for composable
  stream processing with natural fan-out via flatmap
- Add to_id field to MessageState for explicit routing
- Fix routing to use to_id.class format (e.g., "greeter.greeting")
- Generate XSD schemas from xmlified payload classes
- Fix xmlable imports (absolute -> relative) and parse_element ctx

New features:
- handlers/hello.py: Sample Greeting/GreetingResponse handler
- config/organism.yaml: Sample organism configuration
- 41 tests (31 unit + 10 integration) all passing

Schema changes:
- envelope.xsd: Allow any namespace payloads (##other -> ##any)

Dependencies added to pyproject.toml:
- aiostream>=0.5 (core dependency)
- pyhumps, termcolor (for xmlable)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 10:41:17 -08:00

70 lines
2.2 KiB
Python

"""
routing_resolution.py — Resolve routing based on derived root tag.
This step computes the root tag from the deserialized payload and looks it up
in a routing table (root_tag → list[Listener]).
NOTE: The StreamPump has routing built-in via _route_step(). This standalone
step is provided for custom pipeline configurations or testing.
Usage:
routing_step = make_routing_step(routing_table)
state = await routing_step(state)
Part of AgentServer v2.1 message pump.
"""
from __future__ import annotations
from typing import Dict, List, Callable, Awaitable, TYPE_CHECKING
from agentserver.message_bus.message_state import MessageState
if TYPE_CHECKING:
from agentserver.message_bus.stream_pump import Listener
def make_routing_step(
routing_table: Dict[str, List["Listener"]]
) -> Callable[[MessageState], Awaitable[MessageState]]:
"""
Factory: create a routing step with a specific routing table.
The routing table maps root tags to lists of listeners:
{"agent.payload": [listener1, listener2], ...}
"""
async def routing_resolution_step(state: MessageState) -> MessageState:
"""
Resolve which listener(s) should handle this payload.
Root tag = f"{from_id.lower()}.{payload_class_name.lower()}"
Supports:
- Normal unique routing (one listener)
- Broadcast (multiple listeners if same root tag)
If no match → error, falls to system pipeline.
"""
if state.payload is None:
state.error = "routing_resolution_step: no deserialized payload"
return state
if state.to_id is None:
state.error = "routing_resolution_step: missing to_id"
return state
payload_class_name = type(state.payload).__name__.lower()
root_tag = f"{state.to_id.lower()}.{payload_class_name}"
targets = routing_table.get(root_tag)
if not targets:
state.error = f"routing_resolution_step: unknown root tag '{root_tag}'"
return state
state.target_listeners = targets
return state
routing_resolution_step.__name__ = "routing_resolution_step"
return routing_resolution_step