From ab207d8f0b88ba1e59fdbc6427152cd3ad05401a Mon Sep 17 00:00:00 2001 From: dullfig Date: Thu, 8 Jan 2026 12:30:58 -0800 Subject: [PATCH] fixing docs --- .../message_bus/steps/envelope_validation.py | 57 ++++++++++++ .../message_bus/steps/payload_extraction.py | 91 +++++++++++++++++++ agentserver/message_bus/steps/test_c14n.py | 0 agentserver/message_bus/steps/test_repair.py | 0 .../message_bus/steps/thread_assignment.py | 57 ++++++++++++ .../message_bus/steps/xsd_validation.py | 91 +++++++++++++++++++ docs/handler-contract-v2.1.md | 77 ++++++++++++++++ docs/listener-class-v2.1.md | 20 +--- docs/primitives.md | 40 ++++++++ 9 files changed, 415 insertions(+), 18 deletions(-) create mode 100644 agentserver/message_bus/steps/envelope_validation.py create mode 100644 agentserver/message_bus/steps/payload_extraction.py create mode 100644 agentserver/message_bus/steps/test_c14n.py create mode 100644 agentserver/message_bus/steps/test_repair.py create mode 100644 agentserver/message_bus/steps/thread_assignment.py create mode 100644 agentserver/message_bus/steps/xsd_validation.py create mode 100644 docs/handler-contract-v2.1.md create mode 100644 docs/primitives.md diff --git a/agentserver/message_bus/steps/envelope_validation.py b/agentserver/message_bus/steps/envelope_validation.py new file mode 100644 index 0000000..739b1b8 --- /dev/null +++ b/agentserver/message_bus/steps/envelope_validation.py @@ -0,0 +1,57 @@ +""" +envelope_validation.py — Validates the canonicalized envelope against envelope.xsd. + +After repair_step and c14n_step, we have a normalized envelope_tree. +This step enforces the outer structure (thread, from, optional to, etc.) +using the strict envelope.xsd schema. + +Failure here is serious — invalid envelope means the message is malformed at the protocol level, +so we set a clear error and let downstream steps handle it (typically route to system pipeline +for diagnostic ). + +Part of AgentServer v2.1 message pump. +""" + +from lxml import etree +from agentserver.message_bus.message_state import MessageState + +# Load envelope.xsd once at module import (startup time) +# In real implementation, move this to a config loader or bus init +_ENVELOPE_XSD = etree.XMLSchema(file="agentserver/schema/envelope.xsd") + + +async def envelope_validation_step(state: MessageState) -> MessageState: + """ + Validate the canonicalized envelope_tree against the fixed envelope.xsd schema. + + Requirements: + - Must be a valid with required and + - Optional , etc. + - Namespace must match https://xml-pipeline.org/ns/envelope/v1 + + On failure: sets state.error with schema validation details. + Downstream steps should short-circuit if error is set. + """ + if state.envelope_tree is None: + state.error = "envelope_validation_step: no envelope_tree (previous step failed)" + return state + + try: + # lxml schema validation — raises XMLSchemaError on failure + _ENVELOPE_XSD.assertValid(state.envelope_tree) + + # Optional extra checks (can be removed later if redundant) + if state.envelope_tree.tag != "{https://xml-pipeline.org/ns/envelope/v1}message": + raise ValueError("Root element is not in expected namespace") + + except etree.DocumentInvalid as exc: + # Schema violation — collect all error messages for diagnostics + error_lines = [] + for error in _ENVELOPE_XSD.error_log: + error_lines.append(f"{error.level_name}: {error.message} (line {error.line})") + state.error = "envelope_validation_step: invalid envelope\n" + "\n".join(error_lines) + + except Exception as exc: # pylint: disable=broad-except + state.error = f"envelope_validation_step failed: {exc}" + + return state \ No newline at end of file diff --git a/agentserver/message_bus/steps/payload_extraction.py b/agentserver/message_bus/steps/payload_extraction.py new file mode 100644 index 0000000..27a4c2c --- /dev/null +++ b/agentserver/message_bus/steps/payload_extraction.py @@ -0,0 +1,91 @@ +""" +payload_extraction.py — Extract the inner payload from the validated envelope. + +After envelope_validation_step confirms a correct outer envelope, +this step removes the envelope elements (, , optional , etc.) +and isolates the single child element that is the actual payload. + +The payload is expected to be exactly one root element (the capability-specific XML). +If zero or multiple payload roots are found, we set a clear error — this protects +against malformed or ambiguous messages. + +Part of AgentServer v2.1 message pump. +""" + +from lxml import etree +from agentserver.message_bus.message_state import MessageState + +# Envelope namespace for easy reference +_ENVELOPE_NS = "https://xml-pipeline.org/ns/envelope/v1" +_MESSAGE_TAG = f"{{{ _ENVELOPE_NS }}}message" + + +async def payload_extraction_step(state: MessageState) -> MessageState: + """ + Extract the single payload element from the validated envelope. + + Expected structure: + + uuid + sender + + ← this is the one we want + ... + + + + On success: state.payload_tree is set to the payload Element. + On failure: state.error is set with a clear diagnostic. + """ + if state.envelope_tree is None: + state.error = "payload_extraction_step: no envelope_tree (previous step failed)" + return state + + # Basic sanity — root must be in correct namespace (already checked by schema, + # but we double-check for defence in depth) + if state.envelope_tree.tag != _MESSAGE_TAG: + state.error = f"payload_extraction_step: root tag is not in envelope namespace" + return state + + # Find all direct children that are not envelope control elements + # Envelope control elements are: thread, from, to (optional) + payload_candidates = [ + child + for child in state.envelope_tree + if not ( + child.tag in { + f"{{{ _ENVELOPE_NS }}}thread", + f"{{{ _ENVELOPE_NS }}}from", + f"{{{ _ENVELOPE_NS }}}to", + } + ) + ] + + if len(payload_candidates) == 0: + state.error = "payload_extraction_step: no payload element found inside " + return state + + if len(payload_candidates) > 1: + state.error = ( + "payload_extraction_step: multiple payload roots found — " + "exactly one capability payload element is allowed" + ) + return state + + # Success — exactly one payload element + payload_element = payload_candidates[0] + + # Optional: capture provenance from envelope for later use + # (these will be trustworthy because envelope was validated) + thread_elem = state.envelope_tree.find(f"{{{ _ENVELOPE_NS }}}thread") + from_elem = state.envelope_tree.find(f"{{{ _ENVELOPE_NS }}}from") + + if thread_elem is not None and thread_elem.text: + state.thread_id = thread_elem.text.strip() + + if from_elem is not None and from_elem.text: + state.from_id = from_elem.text.strip() + + state.payload_tree = payload_element + + return state \ No newline at end of file diff --git a/agentserver/message_bus/steps/test_c14n.py b/agentserver/message_bus/steps/test_c14n.py new file mode 100644 index 0000000..e69de29 diff --git a/agentserver/message_bus/steps/test_repair.py b/agentserver/message_bus/steps/test_repair.py new file mode 100644 index 0000000..e69de29 diff --git a/agentserver/message_bus/steps/thread_assignment.py b/agentserver/message_bus/steps/thread_assignment.py new file mode 100644 index 0000000..7646ac1 --- /dev/null +++ b/agentserver/message_bus/steps/thread_assignment.py @@ -0,0 +1,57 @@ +""" +thread_assignment.py — Ensure every message has a valid opaque thread UUID. + +The envelope.xsd requires , but external clients may: + - Omit it (first message) + - Send invalid format + - Send duplicate/malformed UUID + +This step enforces: + - Presence of a valid UUID v4 string in + - If missing or invalid → generate a new one (new root thread) + - Store it in state.thread_id for all downstream use + +This guarantees thread continuity and privacy (external parties never see internal hierarchy). + +Part of AgentServer v2.1 message pump. +""" + +import uuid +from agentserver.message_bus.message_state import MessageState + + +def _is_valid_uuid(val: str) -> bool: + """Simple UUID v4 validation — accepts standard string formats.""" + try: + uuid_obj = uuid.UUID(val, version=4) + return str(uuid_obj) == val # Ensures canonical lowercase format + except ValueError: + return False + + +async def thread_assignment_step(state: MessageState) -> MessageState: + """ + Assign or validate the thread UUID. + + - If state.thread_id is already set and valid → keep it + - Else → generate new UUID v4 + - Always normalizes to lowercase canonical string + + This is the source of truth for thread identity throughout the organism. + """ + if state.thread_id and _is_valid_uuid(state.thread_id): + # Already valid — nothing to do + return state + + # Invalid, missing, or malformed — generate new root thread + new_thread_id = str(uuid.uuid4()) + + # Optional: log warning if external client sent bad thread + if state.thread_id: + state.metadata.setdefault("diagnostics", []).append( + f"Invalid external thread ID '{state.thread_id}' — replaced with new root thread" + ) + + state.thread_id = new_thread_id + + return state \ No newline at end of file diff --git a/agentserver/message_bus/steps/xsd_validation.py b/agentserver/message_bus/steps/xsd_validation.py new file mode 100644 index 0000000..27a4c2c --- /dev/null +++ b/agentserver/message_bus/steps/xsd_validation.py @@ -0,0 +1,91 @@ +""" +payload_extraction.py — Extract the inner payload from the validated envelope. + +After envelope_validation_step confirms a correct outer envelope, +this step removes the envelope elements (, , optional , etc.) +and isolates the single child element that is the actual payload. + +The payload is expected to be exactly one root element (the capability-specific XML). +If zero or multiple payload roots are found, we set a clear error — this protects +against malformed or ambiguous messages. + +Part of AgentServer v2.1 message pump. +""" + +from lxml import etree +from agentserver.message_bus.message_state import MessageState + +# Envelope namespace for easy reference +_ENVELOPE_NS = "https://xml-pipeline.org/ns/envelope/v1" +_MESSAGE_TAG = f"{{{ _ENVELOPE_NS }}}message" + + +async def payload_extraction_step(state: MessageState) -> MessageState: + """ + Extract the single payload element from the validated envelope. + + Expected structure: + + uuid + sender + + ← this is the one we want + ... + + + + On success: state.payload_tree is set to the payload Element. + On failure: state.error is set with a clear diagnostic. + """ + if state.envelope_tree is None: + state.error = "payload_extraction_step: no envelope_tree (previous step failed)" + return state + + # Basic sanity — root must be in correct namespace (already checked by schema, + # but we double-check for defence in depth) + if state.envelope_tree.tag != _MESSAGE_TAG: + state.error = f"payload_extraction_step: root tag is not in envelope namespace" + return state + + # Find all direct children that are not envelope control elements + # Envelope control elements are: thread, from, to (optional) + payload_candidates = [ + child + for child in state.envelope_tree + if not ( + child.tag in { + f"{{{ _ENVELOPE_NS }}}thread", + f"{{{ _ENVELOPE_NS }}}from", + f"{{{ _ENVELOPE_NS }}}to", + } + ) + ] + + if len(payload_candidates) == 0: + state.error = "payload_extraction_step: no payload element found inside " + return state + + if len(payload_candidates) > 1: + state.error = ( + "payload_extraction_step: multiple payload roots found — " + "exactly one capability payload element is allowed" + ) + return state + + # Success — exactly one payload element + payload_element = payload_candidates[0] + + # Optional: capture provenance from envelope for later use + # (these will be trustworthy because envelope was validated) + thread_elem = state.envelope_tree.find(f"{{{ _ENVELOPE_NS }}}thread") + from_elem = state.envelope_tree.find(f"{{{ _ENVELOPE_NS }}}from") + + if thread_elem is not None and thread_elem.text: + state.thread_id = thread_elem.text.strip() + + if from_elem is not None and from_elem.text: + state.from_id = from_elem.text.strip() + + state.payload_tree = payload_element + + return state \ No newline at end of file diff --git a/docs/handler-contract-v2.1.md b/docs/handler-contract-v2.1.md new file mode 100644 index 0000000..231f30c --- /dev/null +++ b/docs/handler-contract-v2.1.md @@ -0,0 +1,77 @@ +# AgentServer v2.1 — Handler Contract +**January 08, 2026** + +This document is the single canonical specification for all capability handlers in AgentServer v2.1. +All examples, documentation, and implementation must conform to this contract. + +## Handler Signature (Locked) + +Every handler **must** be declared with the following exact signature: + +```python +async def handler( + payload: PayloadDataclass, # XSD-validated, deserialized @xmlify dataclass instance + metadata: HandlerMetadata # Minimal trustworthy context provided by the message pump +) -> bytes: + ... +``` + +- Handlers **must** be asynchronous (`async def`). +- Synchronous functions are not permitted and will not be auto-wrapped. +- The `metadata` parameter is mandatory. +- The return value **must** be a `bytes` object containing one or more raw XML payload fragments. +- Returning `None` or any non-`bytes` value is a programming error and will trigger a protective `` emission. + +## HandlerMetadata + +```python +@dataclass(frozen=True) +class HandlerMetadata: + thread_id: str # Opaque thread UUID — safe for thread-scoped storage + own_name: str | None = None # Registered name of the executing listener. + # Populated ONLY for listeners with `agent: true` in organism.yaml +``` + +### Field Rationale +- `thread_id`: Enables isolated per-thread state (e.g., conversation memory, calculator history) without exposing topology. +- `own_name`: Allows LLM agents to produce self-referential reasoning text while remaining blind to routing mechanics. + +No sender identity (`from_id`) is provided — preserving full topology privacy. + +## Security Model + +The message pump captures all security-critical information (sender name, thread hierarchy, peers list enforcement) in trusted coroutine scope **before** invoking the handler. + +Handlers are treated as **untrusted code**. They receive only the minimal safe context defined above and cannot: +- Forge provenance +- Escape thread boundaries +- Probe or leak topology +- Route arbitrarily + +## Example Handlers + +**Pure tool (no agent flag):** +```python +async def add_handler(payload: AddPayload, metadata: HandlerMetadata) -> bytes: + result = payload.a + payload.b + return f"{result}".encode("utf-8") +``` + +**LLM agent (agent: true):** +```python +async def research_handler(payload: ResearchPayload, metadata: HandlerMetadata) -> bytes: + own = metadata.own_name or "researcher" # safe fallback + return b""" + I am the """ + own.encode() + b""" agent. Next step... + 735 + """ +``` + +## References in Other Documentation + +- All code examples in README.md, self-grammar-generation.md, and configuration.md must match this contract. +- listener-class-v2.1.md now references this file as the authoritative source for signature and metadata. + +--- + +This contract is now **locked** for v2.1 \ No newline at end of file diff --git a/docs/listener-class-v2.1.md b/docs/listener-class-v2.1.md index 5ec23bf..b5f988a 100644 --- a/docs/listener-class-v2.1.md +++ b/docs/listener-class-v2.1.md @@ -91,24 +91,8 @@ async def add_handler( return f"{result}".encode("utf-8") ``` -### Handler Signature (Locked) -```python -async def handler( - payload: PayloadDataclass, # Deserialized, XSD-validated instance - metadata: HandlerMetadata # Small, trustworthy context -) -> bytes: - ... -``` - -### HandlerMetadata (frozen, read-only) -```python -@dataclass(frozen=True) -class HandlerMetadata: - thread_id: str # Opaque UUID matching in envelope - from_id: str # Registered name of the sender (pump-injected, trustworthy) - own_name: str | None = None # Populated ONLY for listeners with agent: true - is_self_call: bool = False # Convenience flag: from_id == own_name -``` +### Handler Signature and Metadata (Locked) +See [handler-contract-v2.1.md](handler-contract-v2.1.md) for the canonical handler signature and metadata definition. Typical uses: - Stateful tools → key persistent data by `thread_id` diff --git a/docs/primitives.md b/docs/primitives.md new file mode 100644 index 0000000..8b1b3a5 --- /dev/null +++ b/docs/primitives.md @@ -0,0 +1,40 @@ +# AgentServer v2.1 — System Primitives (Magic Tags) + +These payload root elements receive special routing and/or side effects in the message pump. +They reside in the reserved namespace `https://xml-pipeline.org/ns/core/v1`. + +## `` +### `` +- Emitted exclusively by the system +- Routes back to the listener that triggered the error +- Payload structure: + ```xml + + Brief canned error message (e.g., "Invalid payload structure") + Base64-encoded raw bytes of the failed attempt (truncated if large) + + ``` +- Purpose: Safe, LLM-friendly diagnostic feedback +- Security note: Error messages are abstract and canned — no raw validator output is exposed to agents +- Security note: + - Certain classes of errors (payload schema violations, unknown root tags, etc.) are intentionally reported with identical abstract messages. + - This prevents topology probing: an agent or external caller cannot distinguish between "wrong schema for existing capability" and "capability does not exist". + - Authorized introspection is available only via controlled meta queries. + +## `` +- May be emitted by any listener +- Routes to self (uses the emitting listener's unique root tag mechanism) +- No side effects +- Purpose: Optional visible scaffolding for structured reasoning and iteration planning + +## `` +- May be emitted by any listener +- Routes to the immediate parent listener in the private thread hierarchy +- Side effect: The Current subthread below the current listener is pruned after successful delivery of message.
the current thread tail is the current listener. +- Purpose: Explicit return-to-caller semantics with automatic cleanup + +## `` +- May be emitted by any listener +- Routes to the immediate parent listener in the private thread hierarchy +- Side effect: The Entire thread is pruned up to and including the current listener.
the current thread tail is the parent listener. +- Purpose: Explicit termination of the current thread and all its subthreads \ No newline at end of file