From 262c14458e2a639fd857cfe6c8afeec2fc6d5b6f Mon Sep 17 00:00:00 2001 From: dullfig Date: Wed, 31 Dec 2025 15:08:42 -0800 Subject: [PATCH] major changes to base, renamed base to xmlListener --- __init__.py | 2 +- agentserver/listeners/base.py | 83 ----------- agentserver/listeners/base_llm.py | 2 +- agentserver/listeners/xmlListener.py | 104 ++++++++++++++ agentserver/message_bus.py | 206 +++++++++++++++++++++++++-- 5 files changed, 303 insertions(+), 94 deletions(-) delete mode 100644 agentserver/listeners/base.py create mode 100644 agentserver/listeners/xmlListener.py diff --git a/__init__.py b/__init__.py index 4827394..332e2c8 100644 --- a/__init__.py +++ b/__init__.py @@ -6,7 +6,7 @@ Secure, XML-centric multi-listener organism server. """ from agentserver.agentserver import AgentServer as AgentServer -from agentserver.listeners.base import XMLListener as XMLListener +from agentserver.listeners.xmlListener import XMLListener as XMLListener from agentserver.message_bus import MessageBus as MessageBus from agentserver.message_bus import Session as Session diff --git a/agentserver/listeners/base.py b/agentserver/listeners/base.py deleted file mode 100644 index 36835ed..0000000 --- a/agentserver/listeners/base.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -Core base class for all listeners in the xml-pipeline organism. - -All capabilities — personalities, tools, gateways — inherit from this class. -""" - -from __future__ import annotations - -import logging -from typing import List, Optional, Dict, Any - -from lxml import etree - -logger = logging.getLogger(__name__) - - -class XMLListener: - """ - Base class for all capabilities (personalities, tools, gateways). - - Subclasses must: - - Define `listens_to` as a class attribute (list of root tags they handle) - - Implement `async handle()` method - - The `convo_id` received in handle() MUST be preserved in any response payload - (via make_response() helper or manually). - """ - - listens_to: List[str] = [] # Must be overridden in subclass — required - - def __init__( - self, - *, - name: Optional[str] = None, - config: Optional[Dict[str, Any]] = None, - **kwargs, - ): - """ - Args: - name: Optional explicit name (defaults to class name) - config: Owner-provided configuration from privileged registration - """ - self.name = name or self.__class__.__name__ - self.config = config or {} - self.logger = logging.getLogger(f"{__name__}.{self.name}") - - async def handle( - self, msg: etree.Element, convo_id: str - ) -> Optional[etree.Element]: - """ - Process an incoming message whose root tag matches this listener. - - Args: - msg: The payload element (already repaired and C14N'd) - convo_id: Thread/conversation UUID — must be preserved in any response - - Returns: - Response payload element (with convo_id preserved), or None if no response - """ - raise NotImplementedError( - f"{self.__class__.__name__}.handle() must be implemented" - ) - - def make_response( - self, - tag: str, - text: Optional[str] = None, - *, - convo_id: str, - **attribs, - ) -> etree.Element: - """ - Convenience helper to create a response element with preserved convo_id. - - Strongly recommended for all listeners to ensure thread continuity. - """ - elem = etree.Element(tag, convo_id=convo_id, **attribs) - if text is not None: - elem.text = text - return elem - - def __repr__(self) -> str: - return f"<{self.__class__.__name__} name='{self.name}' listens_to={self.listens_to}>" \ No newline at end of file diff --git a/agentserver/listeners/base_llm.py b/agentserver/listeners/base_llm.py index b31a54a..bfebf95 100644 --- a/agentserver/listeners/base_llm.py +++ b/agentserver/listeners/base_llm.py @@ -16,7 +16,7 @@ from typing import Dict, List from lxml import etree -from agentserver.listeners.base import XMLListener +from agentserver.listeners.xmlListener import XMLListener from agentserver.llm_connection import llm_pool from agentserver.prompts.no_paperclippers import MANIFESTO_MESSAGE diff --git a/agentserver/listeners/xmlListener.py b/agentserver/listeners/xmlListener.py new file mode 100644 index 0000000..db9e6a7 --- /dev/null +++ b/agentserver/listeners/xmlListener.py @@ -0,0 +1,104 @@ +""" +xmllistener.py — The Sovereign Contract for All Capabilities + +In xml-pipeline, there are no "agents", no "tools", no "services". +There are only bounded, reactive XMLListeners. + +Every capability in the organism — whether driven by an LLM, +a pure function, a remote gateway, or privileged logic — +must inherit from this class. + +This file is intentionally verbose and heavily documented. +It is the constitution that all organs must obey. +""" + +from __future__ import annotations + +import uuid +from typing import Optional, List, ClassVar +from lxml import etree + +class XMLListener: + """ + Base class for all reactive capabilities in the organism. + + Key Invariants (never break these): + 1. Listeners are passive. They never initiate. They only react. + 2. They declare what they listen to via class variable. + 3. They have a globally unique agent_name. + 4. They receive the full parsed envelope tree (not raw XML). + 5. They return only payload XML (never the envelope). + 6. The MessageBus owns routing, threading, and envelope wrapping. + """ + + # =================================================================== + # Required class declarations — must be overridden in subclasses + # =================================================================== + + listens_to: ClassVar[List[str]] = [] + """ + List of full XML tags this listener reacts to. + Example: ["{https://example.org/chat}message", "{https://example.org/calc}request"] + """ + + agent_name: ClassVar[str] = "" + """ + Globally unique name for this instance. + Enforced by MessageBus at registration. + Used in , routing, logging, and known_peers prompts. + """ + + # =================================================================== + # Core handler — the only method that does work + # =================================================================== + + async def handle( + self, + envelope_tree: etree._Element, + convo_id: str, + sender_name: Optional[str], + ) -> Optional[str]: + """ + React to an incoming enveloped message. + + Parameters: + envelope_tree: Full root (parsed, post-repair/C14N) + convo_id: Current conversation UUID (injected or preserved by bus) + sender_name: The value (mandatory) + + Returns: + Payload XML string (no envelope) if responding, else None. + + The organism guarantees: + - envelope_tree is valid against envelope.xsd + - is present and matches sender_name + - convo_id is a valid UUID + + To reply in the current thread: omit convo_id in response → bus preserves it + To start a new thread: include new-uuid in returned envelope + """ + raise NotImplementedError( + f"{self.__class__.__name__} must implement handle()" + ) + + # =================================================================== + # Optional convenience helpers (can be overridden) + # =================================================================== + + def make_response( + self, + payload: str | etree._Element, + *, + to: Optional[str] = None, + convo_id: Optional[str] = None, + ) -> str: + """ + Helper for building correct response payloads. + Use this in subclasses to avoid envelope boilerplate. + + - If convo_id is None → reply in current thread + - If convo_id provided → force/start new thread + - to overrides default reply-to-sender + """ + # Implementation tomorrow — but declared here for contract clarity + raise NotImplementedError \ No newline at end of file diff --git a/agentserver/message_bus.py b/agentserver/message_bus.py index 4bb3722..d64ba70 100644 --- a/agentserver/message_bus.py +++ b/agentserver/message_bus.py @@ -1,13 +1,201 @@ +import asyncio +import uuid +from typing import Dict, Optional + +from lxml import etree + +from .xmllistener import XMLListener + +ENV_NS = "https://xml-pipeline.org/ns/envelope/1" +ENV = f"{{{ENV_NS}}}" + + class MessageBus: - """Stub for static analysis — the real class is in agentserver.message_bus""" - pass + """ + The central nervous system of the organism. -class Session: - """Stub for static analysis — the real class is in agentserver.message_bus""" - pass + Responsibilities: + - Register/unregister listeners with global agent_name uniqueness + - Immediate routing by payload root tag (± targeted ) + - Envelope handling (convo_id injection on ingress if missing) + - Response envelope construction + - Privileged message fast-path detection + """ -__all__ = [ - "MessageBus", - "Session", -] + def __init__(self): + # root_tag → {agent_name → XMLListener} + self.listeners: Dict[str, Dict[str, XMLListener]] = {} + # global reverse lookup for error messages and future cross-tag targeting + self.global_names: Dict[str, XMLListener] = {} + async def register_listener(self, listener: XMLListener) -> None: + """Register a listener. Enforces global agent_name uniqueness.""" + if not hasattr(listener, "agent_name") or not listener.agent_name: + raise ValueError("Listener must have a non-empty agent_name") + + if listener.agent_name in self.global_names: + raise ValueError(f"Agent name already registered: {listener.agent_name}") + + if not hasattr(listener, "listens_to") or not listener.listens_to: + raise ValueError(f"Listener {listener.agent_name} must declare listens_to") + + self.global_names[listener.agent_name] = listener + + for tag in listener.listens_to: + tag_dict = self.listeners.setdefault(tag, {}) + if listener.agent_name in tag_dict: + raise ValueError( + f"{listener.agent_name} already registered for tag <{tag}>" + ) + tag_dict[listener.agent_name] = listener + + async def unregister_listener(self, agent_name: str) -> None: + """Remove a listener completely.""" + listener = self.global_names.pop(agent_name, None) + if listener: + for tag_dict in self.listeners.values(): + tag_dict.pop(agent_name, None) + # Clean up empty tag entries + empty_tags = [tag for tag, d in self.listeners.items() if not d] + for tag in empty_tags: + del self.listeners[tag] + + async def dispatch( + self, + envelope_xml: str, # repaired, validated, exclusive C14N + client_id: Optional[str] = None, # originating connection identifier + ) -> Optional[str]: + """ + Main entry point for normal (non-privileged) traffic. + Returns serialized response envelope if any listener replied, else None. + """ + tree = etree.fromstring(envelope_xml.encode("utf-8")) + + # Fast-path privileged messages — bypass envelope processing + root_tag = tree.tag + if root_tag == "{https://xml-pipeline.org/ns/privileged/1}privileged-msg": + # In real implementation this will go to PrivilegedMsgListener + # For now, just acknowledge + return None + + meta = tree.find(f"{ENV}meta") + if meta is None: + # This should never happen after validation, but be defensive + return None + + from_name = meta.findtext(f"{ENV}from") + to_name = meta.findtext(f"{ENV}to") + convo_id_elem = meta.find(f"{ENV}convo_id") + convo_id = convo_id_elem.text if convo_id_elem is not None else None + + # Inject convo_id on first ingress if missing + if convo_id is None: + convo_id = str(uuid.uuid4()) + # Insert into tree for listeners to see + new_elem = etree.Element(f"{ENV}convo_id") + new_elem.text = convo_id + meta.append(new_elem) + + # Find the single payload element (foreign namespace) + payload_candidates = [ + child for child in tree if child.tag[:len(ENV)] != ENV + ] + if not payload_candidates: + return None + payload_elem = payload_candidates[0] + payload_tag = payload_elem.tag + + listeners_for_tag = self.listeners.get(payload_tag, {}) + + if to_name: + # Targeted delivery + target = listeners_for_tag.get(to_name) or self.global_names.get(to_name) + if target: + response_payload = await target.handle( + tree, convo_id, from_name or client_id + ) + return self._build_response_envelope( + response_payload, target.agent_name, from_name or client_id, tree + ) + # Unknown target — silent drop or error envelope (future) + return None + + else: + # Fan-out to all listeners for this tag + tasks = [ + listener.handle(tree, convo_id, from_name or client_id) + for listener in listeners_for_tag.values() + ] + responses = await asyncio.gather(*tasks, return_exceptions=True) + + # For v1: return the first non-None response (if any) + # Future: could support multi-response merging + for resp in responses: + if isinstance(resp, Exception): + continue # log in real impl + if resp: + # Use the agent_name of the first responding listener + responding_listener = next( + l for l in listeners_for_tag.values() if l is not None + ) + return self._build_response_envelope( + resp, + responding_listener.agent_name, + from_name or client_id, + tree, + ) + return None + + def _build_response_envelope( + self, + payload_xml: Optional[str], + from_agent: str, + original_sender: Optional[str], + incoming_tree: etree._Element, + ) -> Optional[str]: + """Build a proper response envelope from listener payload.""" + if not payload_xml: + return None + + # Parse the payload the listener returned + payload_tree = etree.fromstring(payload_xml.encode("utf-8")) + + # Extract convo_id from incoming envelope (default reply-in-thread) + incoming_convo = incoming_tree.findtext(f"{ENV}convo_id") + + # Check if listener explicitly overrode convo_id in its returned envelope + # (future-proof — listeners could return full envelope) + if payload_tree.tag == f"{ENV}message": + response_meta = payload_tree.find(f"{ENV}meta") + response_convo = ( + response_meta.findtext(f"{ENV}convo_id") if response_meta is not None else None + ) + response_to = ( + response_meta.findtext(f"{ENV}to") if response_meta is not None else None + ) + convo_id = response_convo or incoming_convo + to_name = response_to or original_sender + actual_payload = next( + (c for c in payload_tree if c.tag[:len(ENV)] != ENV), None + ) + else: + convo_id = incoming_convo + to_name = original_sender + actual_payload = payload_tree + + if actual_payload is None: + return None + + # Build fresh envelope + response_root = etree.Element(f"{ENV}message") + meta = etree.SubElement(response_root, f"{ENV}meta") + etree.SubElement(meta, f"{ENV}from").text = from_agent + if to_name: + etree.SubElement(meta, f"{ENV}to").text = to_name + if convo_id: + etree.SubElement(meta, f"{ENV}convo_id").text = convo_id + + response_root.append(actual_payload) + + # noinspection PyTypeChecker + return etree.tostring(response_root, encoding="unicode", pretty_print=True)