diff --git a/agentserver/message_bus.py b/agentserver/message_bus.py
index bf11e16..5f2c9a1 100644
--- a/agentserver/message_bus.py
+++ b/agentserver/message_bus.py
@@ -1,6 +1,10 @@
+# agentserver/message_bus.py
+# Refactored January 01, 2026 – MessageBus with run() pump and out-of-band shutdown
+
import asyncio
import logging
-from typing import Dict, Optional, Callable
+from typing import AsyncIterator, Callable, Dict, Optional, Awaitable
+
from lxml import etree
from .xml_listener import XMLListener
@@ -15,21 +19,26 @@ logger = logging.getLogger("agentserver.bus")
class MessageBus:
- """
- The Pure Carrier: Routes XML trees between sovereign listeners.
- - Core: High-speed tree-to-tree routing.
- - Border: 'Air Lock' for ingesting raw bytes.
- - Physics: Hardcoded hooks for the Audit Witness.
+ """The sovereign message carrier.
+
+ - Routes canonical XML trees by root tag and meta.
+ - Pure dispatch: tree → optional response tree.
+ - Active pump via run(): handles serialization and egress.
+ - Out-of-band shutdown via asyncio.Event (fast-path, flood-immune).
"""
- def __init__(self, log_hook: Callable[[etree.Element], None]):
+ def __init__(self, log_hook: Callable[[etree._Element], None]):
# root_tag -> {agent_name -> XMLListener}
self.listeners: Dict[str, Dict[str, XMLListener]] = {}
# Global lookup for directed routing
self.global_names: Dict[str, XMLListener] = {}
- # The 'Physics' hook provided by the Host (AgentServer)
+
+ # The Sovereign Witness hook
self._log_hook = log_hook
+ # Out-of-band shutdown signal (set only by AgentServer on privileged command)
+ self.shutdown_event = asyncio.Event()
+
async def register_listener(self, listener: XMLListener) -> None:
"""Register an organ. Enforces global identity uniqueness."""
if listener.agent_name in self.global_names:
@@ -42,85 +51,108 @@ class MessageBus:
logger.info(f"Registered organ: {listener.agent_name}")
-
- async def deliver_bytes(
- self,
- raw_xml: bytes,
- client_id: Optional[str] = None
- ) -> Optional[str]:
- """
- The Air Lock: Ingests raw bytes from unauthenticated or foreign sources.
- Repairs and validates them before injecting the tree into the core.
- """
+ async def deliver_bytes(self, raw_xml: bytes, client_id: Optional[str] = None) -> None:
+ """Air Lock: ingest raw bytes, repair/canonicalize, inject into core."""
try:
- # 1. Pressurize (Bytes -> Tree)
- envelope_tree: etree._Element = repair_and_canonicalize(raw_xml)
- # 2. Inject into core
- return await self.dispatch(envelope_tree, client_id)
+ envelope_tree = repair_and_canonicalize(raw_xml)
+ await self.dispatch(envelope_tree, client_id)
except XmlTamperError as e:
logger.warning(f"Air Lock Reject: {e}")
- return None
-
async def dispatch(
- self,
- envelope_tree: etree._Element,
- client_id: Optional[str] = None
- ) -> Optional[str]:
- """
- The Pure Heart: Routes a tree. Returns a validated response string.
- """
- # 1. WITNESS: Automatic logging of the truth
+ self,
+ envelope_tree: etree._Element,
+ client_id: Optional[str] = None,
+ ) -> etree._Element | None:
+ """Pure routing heart. Returns validated response tree or None."""
+ # 1. WITNESS – every canonical envelope is seen
self._log_hook(envelope_tree)
- # 2. Extract Metadata
+ # 2. Extract envelope metadata
meta = envelope_tree.find(f"{ENV}meta")
+ if meta is None:
+ return None
from_name = meta.findtext(f"{ENV}from")
to_name = meta.findtext(f"{ENV}to")
- thread_id = meta.findtext(f"{ENV}thread")
+ thread_id = meta.findtext(f"{ENV}thread_id") or meta.findtext(f"{ENV}thread")
- payload_elem = next((c for c in envelope_tree.iterchildren() if c.tag != f"{ENV}meta"), None)
- if payload_elem is None: return None
+ # Find payload (first non-meta child)
+ payload_elem = next((c for c in envelope_tree if c.tag != f"{ENV}meta"), None)
+ if payload_elem is None:
+ return None
payload_tag = payload_elem.tag
- # 3. AUTONOMIC REFLEX: The Logger Hook (Hardcoded Physics)
+ # 3. AUTONOMIC REFLEX: explicit
if payload_tag == LOG_TAG:
- self._log_hook(envelope_tree)
- return f"system{from_name}{thread_id}"
+ self._log_hook(envelope_tree) # extra vent
+ # Minimal ack envelope
+ ack = etree.Element(f"{ENV}message")
+ meta_ack = etree.SubElement(ack, f"{ENV}meta")
+ etree.SubElement(meta_ack, f"{ENV}from").text = "system"
+ if from_name:
+ etree.SubElement(meta_ack, f"{ENV}to").text = from_name
+ if thread_id:
+ etree.SubElement(meta_ack, f"{ENV}thread_id").text = thread_id
+ etree.SubElement(ack, "logged", status="success")
+ return ack
# 4. ROUTING
listeners_for_tag = self.listeners.get(payload_tag, {})
- response_tree = None
+ response_tree: Optional[etree._Element] = None
responding_agent_name = "unknown"
if to_name:
- # Directed Mode
+ # Directed
target = listeners_for_tag.get(to_name) or self.global_names.get(to_name)
if target:
responding_agent_name = target.agent_name
response_tree = await target.handle(envelope_tree, thread_id, from_name or client_id)
else:
- # Broadcast Mode
- tasks = [l.handle(envelope_tree, thread_id, from_name or client_id) for l in listeners_for_tag.values()]
+ # Broadcast – first non-None wins (current policy)
+ tasks = [
+ l.handle(envelope_tree, thread_id, from_name or client_id)
+ for l in listeners_for_tag.values()
+ ]
results = await asyncio.gather(*tasks, return_exceptions=True)
- for i, resp in enumerate(results):
- if isinstance(resp, etree._Element):
- responding_agent_name = list(listeners_for_tag.values())[i].agent_name
- response_tree = resp
+ for listener, result in zip(listeners_for_tag.values(), results):
+ if isinstance(result, etree._Element):
+ responding_agent_name = listener.agent_name
+ response_tree = result
+ break # first-wins
+
+ # 5. IDENTITY INSPECTION – prevent spoofing
+ if response_tree is not None:
+ actual_from = response_tree.findtext(f"{ENV}meta/{ENV}from")
+ if actual_from != responding_agent_name:
+ logger.critical(
+ f"IDENTITY THEFT BLOCKED: expected {responding_agent_name}, got {actual_from}"
+ )
+ return None
+
+ return response_tree
+
+ async def run(
+ self,
+ inbound: AsyncIterator[etree._Element],
+ outbound: Callable[[bytes], Awaitable[None]],
+ client_id: Optional[str] = None,
+ ) -> None:
+ """Active pump for a connection. Handles serialization and egress."""
+ try:
+ async for envelope_tree in inbound:
+ if self.shutdown_event.is_set():
break
- # 5. EGRESS CUSTOMS: Final validation before bytes hit the wire
- if response_tree is not None:
- return self._inspect_and_serialize(response_tree, responding_agent_name)
-
- return None
-
-
- def _inspect_and_serialize(self, tree: etree._Element, expected_from: str) -> Optional[str]:
- """Ensures identity integrity and performs final string serialization."""
- actual_from = tree.findtext(f"{ENV}meta/{ENV}from")
- if actual_from != expected_from:
- logger.critical(f"IDENTITY THEFT BLOCKED: {expected_from} spoofed {actual_from}")
- return None
- # noinspection PyTypeChecker
- return etree.tostring(tree, encoding="unicode", pretty_print=True)
\ No newline at end of file
+ response_tree = await self.dispatch(envelope_tree, client_id)
+ if response_tree is not None:
+ serialized = etree.tostring(
+ response_tree, encoding="utf-8", pretty_print=True
+ )
+ await outbound(serialized)
+ finally:
+ # Optional final courtesy message on clean exit
+ goodbye = b""
+ try:
+ await outbound(goodbye)
+ except Exception:
+ pass # connection already gone
\ No newline at end of file