major changes to message bus, and xml listener

This commit is contained in:
dullfig 2026-01-01 15:05:13 -08:00
parent 2fd40bd664
commit 48b4a2b2bd
6 changed files with 88 additions and 163 deletions

View file

@ -6,7 +6,7 @@ Secure, XML-centric multi-listener organism server.
""" """
from agentserver.agentserver import AgentServer as AgentServer from agentserver.agentserver import AgentServer as AgentServer
from agentserver.listeners.xmlListener import XMLListener as XMLListener from agentserver.xml_listener import XMLListener as XMLListener
from agentserver.message_bus import MessageBus as MessageBus from agentserver.message_bus import MessageBus as MessageBus
from agentserver.message_bus import Session as Session from agentserver.message_bus import Session as Session

View file

@ -16,8 +16,8 @@ from typing import Dict, List
from lxml import etree from lxml import etree
from agentserver.listeners.xmlListener import XMLListener from agentserver.xml_listener import XMLListener
from agentserver.llm_connection import llm_pool from agentserver.utils.llm_connection import llm_pool
from agentserver.prompts.no_paperclippers import MANIFESTO_MESSAGE from agentserver.prompts.no_paperclippers import MANIFESTO_MESSAGE
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -1,201 +1,126 @@
import asyncio import asyncio
import uuid import logging
from typing import Dict, Optional from typing import Dict, Optional, Callable
from lxml import etree from lxml import etree
from .xmllistener import XMLListener from .xml_listener import XMLListener
from .utils.message import repair_and_canonicalize, XmlTamperError
# Constants for Internal Physics
ENV_NS = "https://xml-pipeline.org/ns/envelope/1" ENV_NS = "https://xml-pipeline.org/ns/envelope/1"
ENV = f"{{{ENV_NS}}}" ENV = f"{{{ENV_NS}}}"
LOG_TAG = "{https://xml-pipeline.org/ns/logger/1}log"
logger = logging.getLogger("agentserver.bus")
class MessageBus: class MessageBus:
""" """
The central nervous system of the organism. The Pure Carrier: Routes XML trees between sovereign listeners.
- Core: High-speed tree-to-tree routing.
Responsibilities: - Border: 'Air Lock' for ingesting raw bytes.
- Register/unregister listeners with global agent_name uniqueness - Physics: Hardcoded hooks for the Audit Witness.
- Immediate routing by payload root tag (± targeted <to>)
- Envelope handling (convo_id injection on ingress if missing)
- Response envelope construction
- Privileged message fast-path detection
""" """
def __init__(self): def __init__(self, log_hook: Callable[[etree.Element], None]):
# root_tag → {agent_name → XMLListener} # root_tag -> {agent_name -> XMLListener}
self.listeners: Dict[str, Dict[str, XMLListener]] = {} self.listeners: Dict[str, Dict[str, XMLListener]] = {}
# global reverse lookup for error messages and future cross-tag targeting # Global lookup for directed <to/> routing
self.global_names: Dict[str, XMLListener] = {} self.global_names: Dict[str, XMLListener] = {}
# The 'Physics' hook provided by the Host (AgentServer)
self._log_hook = log_hook
async def register_listener(self, listener: XMLListener) -> None: async def register_listener(self, listener: XMLListener) -> None:
"""Register a listener. Enforces global agent_name uniqueness.""" """Register an organ. Enforces global identity uniqueness."""
if not hasattr(listener, "agent_name") or not listener.agent_name:
raise ValueError("Listener must have a non-empty agent_name")
if listener.agent_name in self.global_names: if listener.agent_name in self.global_names:
raise ValueError(f"Agent name already registered: {listener.agent_name}") raise ValueError(f"Identity collision: {listener.agent_name}")
if not hasattr(listener, "listens_to") or not listener.listens_to:
raise ValueError(f"Listener {listener.agent_name} must declare listens_to")
self.global_names[listener.agent_name] = listener self.global_names[listener.agent_name] = listener
for tag in listener.listens_to: for tag in listener.listens_to:
tag_dict = self.listeners.setdefault(tag, {}) tag_dict = self.listeners.setdefault(tag, {})
if listener.agent_name in tag_dict:
raise ValueError(
f"{listener.agent_name} already registered for tag <{tag}>"
)
tag_dict[listener.agent_name] = listener tag_dict[listener.agent_name] = listener
async def unregister_listener(self, agent_name: str) -> None: logger.info(f"Registered organ: {listener.agent_name}")
"""Remove a listener completely."""
listener = self.global_names.pop(agent_name, None)
if listener: async def deliver_bytes(
for tag_dict in self.listeners.values(): self,
tag_dict.pop(agent_name, None) raw_xml: bytes,
# Clean up empty tag entries client_id: Optional[str] = None
empty_tags = [tag for tag, d in self.listeners.items() if not d] ) -> Optional[str]:
for tag in empty_tags: """
del self.listeners[tag] The Air Lock: Ingests raw bytes from unauthenticated or foreign sources.
Repairs and validates them before injecting the tree into the core.
"""
try:
# 1. Pressurize (Bytes -> Tree)
envelope_tree: etree._Element = repair_and_canonicalize(raw_xml)
# 2. Inject into core
return await self.dispatch(envelope_tree, client_id)
except XmlTamperError as e:
logger.warning(f"Air Lock Reject: {e}")
return None
async def dispatch( async def dispatch(
self, self,
envelope_xml: str, # repaired, validated, exclusive C14N envelope_tree: etree._Element,
client_id: Optional[str] = None, # originating connection identifier client_id: Optional[str] = None
) -> Optional[str]: ) -> Optional[str]:
""" """
Main entry point for normal (non-privileged) traffic. The Pure Heart: Routes a tree. Returns a validated response string.
Returns serialized response envelope if any listener replied, else None.
""" """
tree = etree.fromstring(envelope_xml.encode("utf-8")) # 1. WITNESS: Automatic logging of the truth
self._log_hook(envelope_tree)
# Fast-path privileged messages — bypass envelope processing
root_tag = tree.tag
if root_tag == "{https://xml-pipeline.org/ns/privileged/1}privileged-msg":
# In real implementation this will go to PrivilegedMsgListener
# For now, just acknowledge
return None
meta = tree.find(f"{ENV}meta")
if meta is None:
# This should never happen after validation, but be defensive
return None
# 2. Extract Metadata
meta = envelope_tree.find(f"{ENV}meta")
from_name = meta.findtext(f"{ENV}from") from_name = meta.findtext(f"{ENV}from")
to_name = meta.findtext(f"{ENV}to") to_name = meta.findtext(f"{ENV}to")
convo_id_elem = meta.find(f"{ENV}convo_id") thread_id = meta.findtext(f"{ENV}thread")
convo_id = convo_id_elem.text if convo_id_elem is not None else None
# Inject convo_id on first ingress if missing payload_elem = next((c for c in envelope_tree.iterchildren() if c.tag != f"{ENV}meta"), None)
if convo_id is None: if payload_elem is None: return None
convo_id = str(uuid.uuid4())
# Insert into tree for listeners to see
new_elem = etree.Element(f"{ENV}convo_id")
new_elem.text = convo_id
meta.append(new_elem)
# Find the single payload element (foreign namespace)
payload_candidates = [
child for child in tree if child.tag[:len(ENV)] != ENV
]
if not payload_candidates:
return None
payload_elem = payload_candidates[0]
payload_tag = payload_elem.tag payload_tag = payload_elem.tag
# 3. AUTONOMIC REFLEX: The Logger Hook (Hardcoded Physics)
if payload_tag == LOG_TAG:
self._log_hook(envelope_tree)
return f"<message xmlns='{ENV_NS}'><meta><from>system</from><to>{from_name}</to><thread_id>{thread_id}</thread_id></meta><logged status='success'/></message>"
# 4. ROUTING
listeners_for_tag = self.listeners.get(payload_tag, {}) listeners_for_tag = self.listeners.get(payload_tag, {})
response_tree = None
responding_agent_name = "unknown"
if to_name: if to_name:
# Targeted delivery # Directed Mode
target = listeners_for_tag.get(to_name) or self.global_names.get(to_name) target = listeners_for_tag.get(to_name) or self.global_names.get(to_name)
if target: if target:
response_payload = await target.handle( responding_agent_name = target.agent_name
tree, convo_id, from_name or client_id response_tree = await target.handle(envelope_tree, thread_id, from_name or client_id)
)
return self._build_response_envelope(
response_payload, target.agent_name, from_name or client_id, tree
)
# Unknown target — silent drop or error envelope (future)
return None
else: else:
# Fan-out to all listeners for this tag # Broadcast Mode
tasks = [ tasks = [l.handle(envelope_tree, thread_id, from_name or client_id) for l in listeners_for_tag.values()]
listener.handle(tree, convo_id, from_name or client_id) results = await asyncio.gather(*tasks, return_exceptions=True)
for listener in listeners_for_tag.values() for i, resp in enumerate(results):
] if isinstance(resp, etree._Element):
responses = await asyncio.gather(*tasks, return_exceptions=True) responding_agent_name = list(listeners_for_tag.values())[i].agent_name
response_tree = resp
break
# For v1: return the first non-None response (if any) # 5. EGRESS CUSTOMS: Final validation before bytes hit the wire
# Future: could support multi-response merging if response_tree is not None:
for resp in responses: return self._inspect_and_serialize(response_tree, responding_agent_name)
if isinstance(resp, Exception):
continue # log in real impl return None
if resp:
# Use the agent_name of the first responding listener
responding_listener = next( def _inspect_and_serialize(self, tree: etree._Element, expected_from: str) -> Optional[str]:
l for l in listeners_for_tag.values() if l is not None """Ensures identity integrity and performs final string serialization."""
) actual_from = tree.findtext(f"{ENV}meta/{ENV}from")
return self._build_response_envelope( if actual_from != expected_from:
resp, logger.critical(f"IDENTITY THEFT BLOCKED: {expected_from} spoofed {actual_from}")
responding_listener.agent_name,
from_name or client_id,
tree,
)
return None return None
def _build_response_envelope(
self,
payload_xml: Optional[str],
from_agent: str,
original_sender: Optional[str],
incoming_tree: etree._Element,
) -> Optional[str]:
"""Build a proper response envelope from listener payload."""
if not payload_xml:
return None
# Parse the payload the listener returned
payload_tree = etree.fromstring(payload_xml.encode("utf-8"))
# Extract convo_id from incoming envelope (default reply-in-thread)
incoming_convo = incoming_tree.findtext(f"{ENV}convo_id")
# Check if listener explicitly overrode convo_id in its returned envelope
# (future-proof — listeners could return full envelope)
if payload_tree.tag == f"{ENV}message":
response_meta = payload_tree.find(f"{ENV}meta")
response_convo = (
response_meta.findtext(f"{ENV}convo_id") if response_meta is not None else None
)
response_to = (
response_meta.findtext(f"{ENV}to") if response_meta is not None else None
)
convo_id = response_convo or incoming_convo
to_name = response_to or original_sender
actual_payload = next(
(c for c in payload_tree if c.tag[:len(ENV)] != ENV), None
)
else:
convo_id = incoming_convo
to_name = original_sender
actual_payload = payload_tree
if actual_payload is None:
return None
# Build fresh envelope
response_root = etree.Element(f"{ENV}message")
meta = etree.SubElement(response_root, f"{ENV}meta")
etree.SubElement(meta, f"{ENV}from").text = from_agent
if to_name:
etree.SubElement(meta, f"{ENV}to").text = to_name
if convo_id:
etree.SubElement(meta, f"{ENV}convo_id").text = convo_id
response_root.append(actual_payload)
# noinspection PyTypeChecker # noinspection PyTypeChecker
return etree.tostring(response_root, encoding="unicode", pretty_print=True) return etree.tostring(tree, encoding="unicode", pretty_print=True)

View file

@ -13,7 +13,7 @@
<xs:sequence> <xs:sequence>
<xs:element name="from" type="xs:string" minOccurs="1"/> <xs:element name="from" type="xs:string" minOccurs="1"/>
<xs:element name="to" type="xs:string" minOccurs="0"/> <xs:element name="to" type="xs:string" minOccurs="0"/>
<xs:element name="convo_id" type="xs:string" minOccurs="0"/> <xs:element name="thread" type="xs:string" minOccurs="0"/>
<!-- Reserved for future standard fields (timestamp, priority, etc.) --> <!-- Reserved for future standard fields (timestamp, priority, etc.) -->
<xs:any namespace="##other" processContents="lax" minOccurs="0" maxOccurs="unbounded"/> <xs:any namespace="##other" processContents="lax" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence> </xs:sequence>

View file

@ -56,7 +56,7 @@ class XMLListener:
self, self,
envelope_tree: etree._Element, envelope_tree: etree._Element,
convo_id: str, convo_id: str,
sender_name: Optional[str], sender_name: str,
) -> Optional[str]: ) -> Optional[str]:
""" """
React to an incoming enveloped message. React to an incoming enveloped message.