From 48b4a2b2bd8ddc0c1978ed280d1c2d2713733ef6 Mon Sep 17 00:00:00 2001 From: dullfig Date: Thu, 1 Jan 2026 15:05:13 -0800 Subject: [PATCH] major changes to message bus, and xml listener --- __init__.py | 2 +- agentserver/listeners/base_llm.py | 4 +- agentserver/message_bus.py | 241 ++++++------------ agentserver/schema/envelope.xsd | 2 +- agentserver/{ => utils}/llm_connection.py | 0 .../xmlListener.py => xml_listener.py} | 2 +- 6 files changed, 88 insertions(+), 163 deletions(-) rename agentserver/{ => utils}/llm_connection.py (100%) rename agentserver/{listeners/xmlListener.py => xml_listener.py} (99%) diff --git a/__init__.py b/__init__.py index 332e2c8..6672898 100644 --- a/__init__.py +++ b/__init__.py @@ -6,7 +6,7 @@ Secure, XML-centric multi-listener organism server. """ from agentserver.agentserver import AgentServer as AgentServer -from agentserver.listeners.xmlListener import XMLListener as XMLListener +from agentserver.xml_listener import XMLListener as XMLListener from agentserver.message_bus import MessageBus as MessageBus from agentserver.message_bus import Session as Session diff --git a/agentserver/listeners/base_llm.py b/agentserver/listeners/base_llm.py index bfebf95..97794c8 100644 --- a/agentserver/listeners/base_llm.py +++ b/agentserver/listeners/base_llm.py @@ -16,8 +16,8 @@ from typing import Dict, List from lxml import etree -from agentserver.listeners.xmlListener import XMLListener -from agentserver.llm_connection import llm_pool +from agentserver.xml_listener import XMLListener +from agentserver.utils.llm_connection import llm_pool from agentserver.prompts.no_paperclippers import MANIFESTO_MESSAGE logger = logging.getLogger(__name__) diff --git a/agentserver/message_bus.py b/agentserver/message_bus.py index d64ba70..bf11e16 100644 --- a/agentserver/message_bus.py +++ b/agentserver/message_bus.py @@ -1,201 +1,126 @@ import asyncio -import uuid -from typing import Dict, Optional - +import logging +from typing import Dict, Optional, Callable from lxml import etree -from .xmllistener import XMLListener +from .xml_listener import XMLListener +from .utils.message import repair_and_canonicalize, XmlTamperError +# Constants for Internal Physics ENV_NS = "https://xml-pipeline.org/ns/envelope/1" ENV = f"{{{ENV_NS}}}" +LOG_TAG = "{https://xml-pipeline.org/ns/logger/1}log" + +logger = logging.getLogger("agentserver.bus") class MessageBus: """ - The central nervous system of the organism. - - Responsibilities: - - Register/unregister listeners with global agent_name uniqueness - - Immediate routing by payload root tag (± targeted ) - - Envelope handling (convo_id injection on ingress if missing) - - Response envelope construction - - Privileged message fast-path detection + The Pure Carrier: Routes XML trees between sovereign listeners. + - Core: High-speed tree-to-tree routing. + - Border: 'Air Lock' for ingesting raw bytes. + - Physics: Hardcoded hooks for the Audit Witness. """ - def __init__(self): - # root_tag → {agent_name → XMLListener} + def __init__(self, log_hook: Callable[[etree.Element], None]): + # root_tag -> {agent_name -> XMLListener} self.listeners: Dict[str, Dict[str, XMLListener]] = {} - # global reverse lookup for error messages and future cross-tag targeting + # Global lookup for directed routing self.global_names: Dict[str, XMLListener] = {} + # The 'Physics' hook provided by the Host (AgentServer) + self._log_hook = log_hook async def register_listener(self, listener: XMLListener) -> None: - """Register a listener. Enforces global agent_name uniqueness.""" - if not hasattr(listener, "agent_name") or not listener.agent_name: - raise ValueError("Listener must have a non-empty agent_name") - + """Register an organ. Enforces global identity uniqueness.""" if listener.agent_name in self.global_names: - raise ValueError(f"Agent name already registered: {listener.agent_name}") - - if not hasattr(listener, "listens_to") or not listener.listens_to: - raise ValueError(f"Listener {listener.agent_name} must declare listens_to") + raise ValueError(f"Identity collision: {listener.agent_name}") self.global_names[listener.agent_name] = listener - for tag in listener.listens_to: tag_dict = self.listeners.setdefault(tag, {}) - if listener.agent_name in tag_dict: - raise ValueError( - f"{listener.agent_name} already registered for tag <{tag}>" - ) tag_dict[listener.agent_name] = listener - async def unregister_listener(self, agent_name: str) -> None: - """Remove a listener completely.""" - listener = self.global_names.pop(agent_name, None) - if listener: - for tag_dict in self.listeners.values(): - tag_dict.pop(agent_name, None) - # Clean up empty tag entries - empty_tags = [tag for tag, d in self.listeners.items() if not d] - for tag in empty_tags: - del self.listeners[tag] + logger.info(f"Registered organ: {listener.agent_name}") + + + async def deliver_bytes( + self, + raw_xml: bytes, + client_id: Optional[str] = None + ) -> Optional[str]: + """ + The Air Lock: Ingests raw bytes from unauthenticated or foreign sources. + Repairs and validates them before injecting the tree into the core. + """ + try: + # 1. Pressurize (Bytes -> Tree) + envelope_tree: etree._Element = repair_and_canonicalize(raw_xml) + # 2. Inject into core + return await self.dispatch(envelope_tree, client_id) + except XmlTamperError as e: + logger.warning(f"Air Lock Reject: {e}") + return None + async def dispatch( - self, - envelope_xml: str, # repaired, validated, exclusive C14N - client_id: Optional[str] = None, # originating connection identifier + self, + envelope_tree: etree._Element, + client_id: Optional[str] = None ) -> Optional[str]: """ - Main entry point for normal (non-privileged) traffic. - Returns serialized response envelope if any listener replied, else None. + The Pure Heart: Routes a tree. Returns a validated response string. """ - tree = etree.fromstring(envelope_xml.encode("utf-8")) - - # Fast-path privileged messages — bypass envelope processing - root_tag = tree.tag - if root_tag == "{https://xml-pipeline.org/ns/privileged/1}privileged-msg": - # In real implementation this will go to PrivilegedMsgListener - # For now, just acknowledge - return None - - meta = tree.find(f"{ENV}meta") - if meta is None: - # This should never happen after validation, but be defensive - return None + # 1. WITNESS: Automatic logging of the truth + self._log_hook(envelope_tree) + # 2. Extract Metadata + meta = envelope_tree.find(f"{ENV}meta") from_name = meta.findtext(f"{ENV}from") to_name = meta.findtext(f"{ENV}to") - convo_id_elem = meta.find(f"{ENV}convo_id") - convo_id = convo_id_elem.text if convo_id_elem is not None else None + thread_id = meta.findtext(f"{ENV}thread") - # Inject convo_id on first ingress if missing - if convo_id is None: - convo_id = str(uuid.uuid4()) - # Insert into tree for listeners to see - new_elem = etree.Element(f"{ENV}convo_id") - new_elem.text = convo_id - meta.append(new_elem) - - # Find the single payload element (foreign namespace) - payload_candidates = [ - child for child in tree if child.tag[:len(ENV)] != ENV - ] - if not payload_candidates: - return None - payload_elem = payload_candidates[0] + payload_elem = next((c for c in envelope_tree.iterchildren() if c.tag != f"{ENV}meta"), None) + if payload_elem is None: return None payload_tag = payload_elem.tag + # 3. AUTONOMIC REFLEX: The Logger Hook (Hardcoded Physics) + if payload_tag == LOG_TAG: + self._log_hook(envelope_tree) + return f"system{from_name}{thread_id}" + + # 4. ROUTING listeners_for_tag = self.listeners.get(payload_tag, {}) + response_tree = None + responding_agent_name = "unknown" if to_name: - # Targeted delivery + # Directed Mode target = listeners_for_tag.get(to_name) or self.global_names.get(to_name) if target: - response_payload = await target.handle( - tree, convo_id, from_name or client_id - ) - return self._build_response_envelope( - response_payload, target.agent_name, from_name or client_id, tree - ) - # Unknown target — silent drop or error envelope (future) - return None - + responding_agent_name = target.agent_name + response_tree = await target.handle(envelope_tree, thread_id, from_name or client_id) else: - # Fan-out to all listeners for this tag - tasks = [ - listener.handle(tree, convo_id, from_name or client_id) - for listener in listeners_for_tag.values() - ] - responses = await asyncio.gather(*tasks, return_exceptions=True) + # Broadcast Mode + tasks = [l.handle(envelope_tree, thread_id, from_name or client_id) for l in listeners_for_tag.values()] + results = await asyncio.gather(*tasks, return_exceptions=True) + for i, resp in enumerate(results): + if isinstance(resp, etree._Element): + responding_agent_name = list(listeners_for_tag.values())[i].agent_name + response_tree = resp + break - # For v1: return the first non-None response (if any) - # Future: could support multi-response merging - for resp in responses: - if isinstance(resp, Exception): - continue # log in real impl - if resp: - # Use the agent_name of the first responding listener - responding_listener = next( - l for l in listeners_for_tag.values() if l is not None - ) - return self._build_response_envelope( - resp, - responding_listener.agent_name, - from_name or client_id, - tree, - ) + # 5. EGRESS CUSTOMS: Final validation before bytes hit the wire + if response_tree is not None: + return self._inspect_and_serialize(response_tree, responding_agent_name) + + return None + + + def _inspect_and_serialize(self, tree: etree._Element, expected_from: str) -> Optional[str]: + """Ensures identity integrity and performs final string serialization.""" + actual_from = tree.findtext(f"{ENV}meta/{ENV}from") + if actual_from != expected_from: + logger.critical(f"IDENTITY THEFT BLOCKED: {expected_from} spoofed {actual_from}") return None - - def _build_response_envelope( - self, - payload_xml: Optional[str], - from_agent: str, - original_sender: Optional[str], - incoming_tree: etree._Element, - ) -> Optional[str]: - """Build a proper response envelope from listener payload.""" - if not payload_xml: - return None - - # Parse the payload the listener returned - payload_tree = etree.fromstring(payload_xml.encode("utf-8")) - - # Extract convo_id from incoming envelope (default reply-in-thread) - incoming_convo = incoming_tree.findtext(f"{ENV}convo_id") - - # Check if listener explicitly overrode convo_id in its returned envelope - # (future-proof — listeners could return full envelope) - if payload_tree.tag == f"{ENV}message": - response_meta = payload_tree.find(f"{ENV}meta") - response_convo = ( - response_meta.findtext(f"{ENV}convo_id") if response_meta is not None else None - ) - response_to = ( - response_meta.findtext(f"{ENV}to") if response_meta is not None else None - ) - convo_id = response_convo or incoming_convo - to_name = response_to or original_sender - actual_payload = next( - (c for c in payload_tree if c.tag[:len(ENV)] != ENV), None - ) - else: - convo_id = incoming_convo - to_name = original_sender - actual_payload = payload_tree - - if actual_payload is None: - return None - - # Build fresh envelope - response_root = etree.Element(f"{ENV}message") - meta = etree.SubElement(response_root, f"{ENV}meta") - etree.SubElement(meta, f"{ENV}from").text = from_agent - if to_name: - etree.SubElement(meta, f"{ENV}to").text = to_name - if convo_id: - etree.SubElement(meta, f"{ENV}convo_id").text = convo_id - - response_root.append(actual_payload) - # noinspection PyTypeChecker - return etree.tostring(response_root, encoding="unicode", pretty_print=True) + return etree.tostring(tree, encoding="unicode", pretty_print=True) \ No newline at end of file diff --git a/agentserver/schema/envelope.xsd b/agentserver/schema/envelope.xsd index 1356593..e621124 100644 --- a/agentserver/schema/envelope.xsd +++ b/agentserver/schema/envelope.xsd @@ -13,7 +13,7 @@ - + diff --git a/agentserver/llm_connection.py b/agentserver/utils/llm_connection.py similarity index 100% rename from agentserver/llm_connection.py rename to agentserver/utils/llm_connection.py diff --git a/agentserver/listeners/xmlListener.py b/agentserver/xml_listener.py similarity index 99% rename from agentserver/listeners/xmlListener.py rename to agentserver/xml_listener.py index db9e6a7..d79f6ef 100644 --- a/agentserver/listeners/xmlListener.py +++ b/agentserver/xml_listener.py @@ -56,7 +56,7 @@ class XMLListener: self, envelope_tree: etree._Element, convo_id: str, - sender_name: Optional[str], + sender_name: str, ) -> Optional[str]: """ React to an incoming enveloped message.