diff --git a/__init__.py b/__init__.py
index 4827394..332e2c8 100644
--- a/__init__.py
+++ b/__init__.py
@@ -6,7 +6,7 @@ Secure, XML-centric multi-listener organism server.
"""
from agentserver.agentserver import AgentServer as AgentServer
-from agentserver.listeners.base import XMLListener as XMLListener
+from agentserver.listeners.xmlListener import XMLListener as XMLListener
from agentserver.message_bus import MessageBus as MessageBus
from agentserver.message_bus import Session as Session
diff --git a/agentserver/listeners/base.py b/agentserver/listeners/base.py
deleted file mode 100644
index 36835ed..0000000
--- a/agentserver/listeners/base.py
+++ /dev/null
@@ -1,83 +0,0 @@
-"""
-Core base class for all listeners in the xml-pipeline organism.
-
-All capabilities — personalities, tools, gateways — inherit from this class.
-"""
-
-from __future__ import annotations
-
-import logging
-from typing import List, Optional, Dict, Any
-
-from lxml import etree
-
-logger = logging.getLogger(__name__)
-
-
-class XMLListener:
- """
- Base class for all capabilities (personalities, tools, gateways).
-
- Subclasses must:
- - Define `listens_to` as a class attribute (list of root tags they handle)
- - Implement `async handle()` method
-
- The `convo_id` received in handle() MUST be preserved in any response payload
- (via make_response() helper or manually).
- """
-
- listens_to: List[str] = [] # Must be overridden in subclass — required
-
- def __init__(
- self,
- *,
- name: Optional[str] = None,
- config: Optional[Dict[str, Any]] = None,
- **kwargs,
- ):
- """
- Args:
- name: Optional explicit name (defaults to class name)
- config: Owner-provided configuration from privileged registration
- """
- self.name = name or self.__class__.__name__
- self.config = config or {}
- self.logger = logging.getLogger(f"{__name__}.{self.name}")
-
- async def handle(
- self, msg: etree.Element, convo_id: str
- ) -> Optional[etree.Element]:
- """
- Process an incoming message whose root tag matches this listener.
-
- Args:
- msg: The payload element (already repaired and C14N'd)
- convo_id: Thread/conversation UUID — must be preserved in any response
-
- Returns:
- Response payload element (with convo_id preserved), or None if no response
- """
- raise NotImplementedError(
- f"{self.__class__.__name__}.handle() must be implemented"
- )
-
- def make_response(
- self,
- tag: str,
- text: Optional[str] = None,
- *,
- convo_id: str,
- **attribs,
- ) -> etree.Element:
- """
- Convenience helper to create a response element with preserved convo_id.
-
- Strongly recommended for all listeners to ensure thread continuity.
- """
- elem = etree.Element(tag, convo_id=convo_id, **attribs)
- if text is not None:
- elem.text = text
- return elem
-
- def __repr__(self) -> str:
- return f"<{self.__class__.__name__} name='{self.name}' listens_to={self.listens_to}>"
\ No newline at end of file
diff --git a/agentserver/listeners/base_llm.py b/agentserver/listeners/base_llm.py
index b31a54a..bfebf95 100644
--- a/agentserver/listeners/base_llm.py
+++ b/agentserver/listeners/base_llm.py
@@ -16,7 +16,7 @@ from typing import Dict, List
from lxml import etree
-from agentserver.listeners.base import XMLListener
+from agentserver.listeners.xmlListener import XMLListener
from agentserver.llm_connection import llm_pool
from agentserver.prompts.no_paperclippers import MANIFESTO_MESSAGE
diff --git a/agentserver/listeners/xmlListener.py b/agentserver/listeners/xmlListener.py
new file mode 100644
index 0000000..db9e6a7
--- /dev/null
+++ b/agentserver/listeners/xmlListener.py
@@ -0,0 +1,104 @@
+"""
+xmllistener.py — The Sovereign Contract for All Capabilities
+
+In xml-pipeline, there are no "agents", no "tools", no "services".
+There are only bounded, reactive XMLListeners.
+
+Every capability in the organism — whether driven by an LLM,
+a pure function, a remote gateway, or privileged logic —
+must inherit from this class.
+
+This file is intentionally verbose and heavily documented.
+It is the constitution that all organs must obey.
+"""
+
+from __future__ import annotations
+
+import uuid
+from typing import Optional, List, ClassVar
+from lxml import etree
+
+class XMLListener:
+ """
+ Base class for all reactive capabilities in the organism.
+
+ Key Invariants (never break these):
+ 1. Listeners are passive. They never initiate. They only react.
+ 2. They declare what they listen to via class variable.
+ 3. They have a globally unique agent_name.
+ 4. They receive the full parsed envelope tree (not raw XML).
+ 5. They return only payload XML (never the envelope).
+ 6. The MessageBus owns routing, threading, and envelope wrapping.
+ """
+
+ # ===================================================================
+ # Required class declarations — must be overridden in subclasses
+ # ===================================================================
+
+ listens_to: ClassVar[List[str]] = []
+ """
+ List of full XML tags this listener reacts to.
+ Example: ["{https://example.org/chat}message", "{https://example.org/calc}request"]
+ """
+
+ agent_name: ClassVar[str] = ""
+ """
+ Globally unique name for this instance.
+ Enforced by MessageBus at registration.
+ Used in , routing, logging, and known_peers prompts.
+ """
+
+ # ===================================================================
+ # Core handler — the only method that does work
+ # ===================================================================
+
+ async def handle(
+ self,
+ envelope_tree: etree._Element,
+ convo_id: str,
+ sender_name: Optional[str],
+ ) -> Optional[str]:
+ """
+ React to an incoming enveloped message.
+
+ Parameters:
+ envelope_tree: Full root (parsed, post-repair/C14N)
+ convo_id: Current conversation UUID (injected or preserved by bus)
+ sender_name: The value (mandatory)
+
+ Returns:
+ Payload XML string (no envelope) if responding, else None.
+
+ The organism guarantees:
+ - envelope_tree is valid against envelope.xsd
+ - is present and matches sender_name
+ - convo_id is a valid UUID
+
+ To reply in the current thread: omit convo_id in response → bus preserves it
+ To start a new thread: include new-uuid in returned envelope
+ """
+ raise NotImplementedError(
+ f"{self.__class__.__name__} must implement handle()"
+ )
+
+ # ===================================================================
+ # Optional convenience helpers (can be overridden)
+ # ===================================================================
+
+ def make_response(
+ self,
+ payload: str | etree._Element,
+ *,
+ to: Optional[str] = None,
+ convo_id: Optional[str] = None,
+ ) -> str:
+ """
+ Helper for building correct response payloads.
+ Use this in subclasses to avoid envelope boilerplate.
+
+ - If convo_id is None → reply in current thread
+ - If convo_id provided → force/start new thread
+ - to overrides default reply-to-sender
+ """
+ # Implementation tomorrow — but declared here for contract clarity
+ raise NotImplementedError
\ No newline at end of file
diff --git a/agentserver/message_bus.py b/agentserver/message_bus.py
index 4bb3722..d64ba70 100644
--- a/agentserver/message_bus.py
+++ b/agentserver/message_bus.py
@@ -1,13 +1,201 @@
+import asyncio
+import uuid
+from typing import Dict, Optional
+
+from lxml import etree
+
+from .xmllistener import XMLListener
+
+ENV_NS = "https://xml-pipeline.org/ns/envelope/1"
+ENV = f"{{{ENV_NS}}}"
+
+
class MessageBus:
- """Stub for static analysis — the real class is in agentserver.message_bus"""
- pass
+ """
+ The central nervous system of the organism.
-class Session:
- """Stub for static analysis — the real class is in agentserver.message_bus"""
- pass
+ Responsibilities:
+ - Register/unregister listeners with global agent_name uniqueness
+ - Immediate routing by payload root tag (± targeted )
+ - Envelope handling (convo_id injection on ingress if missing)
+ - Response envelope construction
+ - Privileged message fast-path detection
+ """
-__all__ = [
- "MessageBus",
- "Session",
-]
+ def __init__(self):
+ # root_tag → {agent_name → XMLListener}
+ self.listeners: Dict[str, Dict[str, XMLListener]] = {}
+ # global reverse lookup for error messages and future cross-tag targeting
+ self.global_names: Dict[str, XMLListener] = {}
+ async def register_listener(self, listener: XMLListener) -> None:
+ """Register a listener. Enforces global agent_name uniqueness."""
+ if not hasattr(listener, "agent_name") or not listener.agent_name:
+ raise ValueError("Listener must have a non-empty agent_name")
+
+ if listener.agent_name in self.global_names:
+ raise ValueError(f"Agent name already registered: {listener.agent_name}")
+
+ if not hasattr(listener, "listens_to") or not listener.listens_to:
+ raise ValueError(f"Listener {listener.agent_name} must declare listens_to")
+
+ self.global_names[listener.agent_name] = listener
+
+ for tag in listener.listens_to:
+ tag_dict = self.listeners.setdefault(tag, {})
+ if listener.agent_name in tag_dict:
+ raise ValueError(
+ f"{listener.agent_name} already registered for tag <{tag}>"
+ )
+ tag_dict[listener.agent_name] = listener
+
+ async def unregister_listener(self, agent_name: str) -> None:
+ """Remove a listener completely."""
+ listener = self.global_names.pop(agent_name, None)
+ if listener:
+ for tag_dict in self.listeners.values():
+ tag_dict.pop(agent_name, None)
+ # Clean up empty tag entries
+ empty_tags = [tag for tag, d in self.listeners.items() if not d]
+ for tag in empty_tags:
+ del self.listeners[tag]
+
+ async def dispatch(
+ self,
+ envelope_xml: str, # repaired, validated, exclusive C14N
+ client_id: Optional[str] = None, # originating connection identifier
+ ) -> Optional[str]:
+ """
+ Main entry point for normal (non-privileged) traffic.
+ Returns serialized response envelope if any listener replied, else None.
+ """
+ tree = etree.fromstring(envelope_xml.encode("utf-8"))
+
+ # Fast-path privileged messages — bypass envelope processing
+ root_tag = tree.tag
+ if root_tag == "{https://xml-pipeline.org/ns/privileged/1}privileged-msg":
+ # In real implementation this will go to PrivilegedMsgListener
+ # For now, just acknowledge
+ return None
+
+ meta = tree.find(f"{ENV}meta")
+ if meta is None:
+ # This should never happen after validation, but be defensive
+ return None
+
+ from_name = meta.findtext(f"{ENV}from")
+ to_name = meta.findtext(f"{ENV}to")
+ convo_id_elem = meta.find(f"{ENV}convo_id")
+ convo_id = convo_id_elem.text if convo_id_elem is not None else None
+
+ # Inject convo_id on first ingress if missing
+ if convo_id is None:
+ convo_id = str(uuid.uuid4())
+ # Insert into tree for listeners to see
+ new_elem = etree.Element(f"{ENV}convo_id")
+ new_elem.text = convo_id
+ meta.append(new_elem)
+
+ # Find the single payload element (foreign namespace)
+ payload_candidates = [
+ child for child in tree if child.tag[:len(ENV)] != ENV
+ ]
+ if not payload_candidates:
+ return None
+ payload_elem = payload_candidates[0]
+ payload_tag = payload_elem.tag
+
+ listeners_for_tag = self.listeners.get(payload_tag, {})
+
+ if to_name:
+ # Targeted delivery
+ target = listeners_for_tag.get(to_name) or self.global_names.get(to_name)
+ if target:
+ response_payload = await target.handle(
+ tree, convo_id, from_name or client_id
+ )
+ return self._build_response_envelope(
+ response_payload, target.agent_name, from_name or client_id, tree
+ )
+ # Unknown target — silent drop or error envelope (future)
+ return None
+
+ else:
+ # Fan-out to all listeners for this tag
+ tasks = [
+ listener.handle(tree, convo_id, from_name or client_id)
+ for listener in listeners_for_tag.values()
+ ]
+ responses = await asyncio.gather(*tasks, return_exceptions=True)
+
+ # For v1: return the first non-None response (if any)
+ # Future: could support multi-response merging
+ for resp in responses:
+ if isinstance(resp, Exception):
+ continue # log in real impl
+ if resp:
+ # Use the agent_name of the first responding listener
+ responding_listener = next(
+ l for l in listeners_for_tag.values() if l is not None
+ )
+ return self._build_response_envelope(
+ resp,
+ responding_listener.agent_name,
+ from_name or client_id,
+ tree,
+ )
+ return None
+
+ def _build_response_envelope(
+ self,
+ payload_xml: Optional[str],
+ from_agent: str,
+ original_sender: Optional[str],
+ incoming_tree: etree._Element,
+ ) -> Optional[str]:
+ """Build a proper response envelope from listener payload."""
+ if not payload_xml:
+ return None
+
+ # Parse the payload the listener returned
+ payload_tree = etree.fromstring(payload_xml.encode("utf-8"))
+
+ # Extract convo_id from incoming envelope (default reply-in-thread)
+ incoming_convo = incoming_tree.findtext(f"{ENV}convo_id")
+
+ # Check if listener explicitly overrode convo_id in its returned envelope
+ # (future-proof — listeners could return full envelope)
+ if payload_tree.tag == f"{ENV}message":
+ response_meta = payload_tree.find(f"{ENV}meta")
+ response_convo = (
+ response_meta.findtext(f"{ENV}convo_id") if response_meta is not None else None
+ )
+ response_to = (
+ response_meta.findtext(f"{ENV}to") if response_meta is not None else None
+ )
+ convo_id = response_convo or incoming_convo
+ to_name = response_to or original_sender
+ actual_payload = next(
+ (c for c in payload_tree if c.tag[:len(ENV)] != ENV), None
+ )
+ else:
+ convo_id = incoming_convo
+ to_name = original_sender
+ actual_payload = payload_tree
+
+ if actual_payload is None:
+ return None
+
+ # Build fresh envelope
+ response_root = etree.Element(f"{ENV}message")
+ meta = etree.SubElement(response_root, f"{ENV}meta")
+ etree.SubElement(meta, f"{ENV}from").text = from_agent
+ if to_name:
+ etree.SubElement(meta, f"{ENV}to").text = to_name
+ if convo_id:
+ etree.SubElement(meta, f"{ENV}convo_id").text = convo_id
+
+ response_root.append(actual_payload)
+
+ # noinspection PyTypeChecker
+ return etree.tostring(response_root, encoding="unicode", pretty_print=True)