diff --git a/docs/logic-and-iteration.md b/docs/archive-obsolete/logic-and-iteration.md similarity index 100% rename from docs/logic-and-iteration.md rename to docs/archive-obsolete/logic-and-iteration.md diff --git a/docs/thread-management.md b/docs/archive-obsolete/thread-management.md similarity index 100% rename from docs/thread-management.md rename to docs/archive-obsolete/thread-management.md diff --git a/docs/token-scheduling-issues.md b/docs/archive-obsolete/token-scheduling-issues.md similarity index 100% rename from docs/token-scheduling-issues.md rename to docs/archive-obsolete/token-scheduling-issues.md diff --git a/docs/configuration.md b/docs/configuration.md index eb11239..b22e6c6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1,115 +1,128 @@ -G# Configuration — organism.yaml (v2.1) +**AgentServer v2.1 — Organism Configuration** + +This file is the canonical reference for `organism.yaml` format in v2.1. +The old `configuration.md` is hereby obsolete and superseded. The entire organism is declared in a single YAML file (default: `config/organism.yaml`). -Loaded at bootstrap — single source of truth for initial composition. -Runtime changes (hot-reload) via local OOB privileged commands. +It is the single source of truth for initial composition, loaded at bootstrap. +Runtime structural changes (add/remove/replace listeners) are performed exclusively via privileged OOB commands (hot-reload). -## Example Full Configuration +### Full Example (ResearchSwarm-01) ```yaml organism: name: "ResearchSwarm-01" - identity: "config/identity/private.ed25519" # Ed25519 private key - port: 8765 # Main message bus WSS + identity: "config/identity/private.ed25519" # Ed25519 private key path + port: 8765 # Main WSS message bus tls: cert: "certs/fullchain.pem" key: "certs/privkey.pem" -oob: # Out-of-band privileged channel (GUI/hot-reload ready) +oob: # Out-of-band privileged channel enabled: true - bind: "127.0.0.1" # Localhost-only default - port: 8766 # Separate WSS port - # unix_socket: "/tmp/organism.sock" # Alternative + bind: "127.0.0.1" # Localhost-only by default (GUI safe) + port: 8766 # Separate WSS port from main bus + # unix_socket: "/tmp/organism.sock" # Alternative binding -thread_scheduling: "breadth-first" # or "depth-first" (default: breadth-first) +thread_scheduling: "breadth-first" # or "depth-first" meta: enabled: true allow_list_capabilities: true - allow_schema_requests: "admin" # "admin" | "authenticated" | "none" + allow_schema_requests: "admin" # "admin" | "authenticated" | "none" allow_example_requests: "admin" allow_prompt_requests: "admin" - allow_remote: false # Federation peers query meta + allow_remote: false # Federation peers may query meta listeners: - name: calculator.add payload_class: examples.calculator.AddPayload handler: examples.calculator.add_handler - description: "Adds two integers and returns their sum." # Mandatory for usable tool prompts + description: "Adds two integers and returns their sum." - - name: summarizer + - name: calculator.multiply + payload_class: examples.calculator.MultiplyPayload + handler: examples.calculator.multiply_handler + description: "Multiplies two integers and returns their product." + + - name: local_summarizer payload_class: agents.summarizer.SummarizePayload handler: agents.summarizer.summarize_handler description: "Summarizes text via local LLM." -agents: - name: researcher - system_prompt: "prompts/researcher_system.txt" - tools: + payload_class: agents.researcher.ResearchPayload + handler: agents.researcher.research_handler + description: "Primary research agent that reasons and coordinates tools." + agent: true # LLM agent → unique root tag, own_name exposed + peers: # Allowed call targets - calculator.add - - summarizer - - name: web_search - remote: true - gateways: - - search_node1 - - search_node2 - - search_node3 # list = broadcast to all - mode: "first-answer-wins" # optional: "all" (collect responses), default "single" if one gateway - + - calculator.multiply + - local_summarizer + - web_search # gateway group, defined below + + - name: search.google + payload_class: gateways.google.SearchPayload + handler: gateways.google.search_handler + description: "Google search gateway." + broadcast: true # Shares root tag with other search.* listeners + + - name: search.bing + payload_class: gateways.google.SearchPayload # Identical dataclass required + handler: gateways.bing.search_handler + description: "Bing search gateway." + broadcast: true + gateways: - name: web_search remote_url: "wss://trusted-search-node.example.org" trusted_identity: "pubkeys/search_node.ed25519.pub" - description: "Federated web search capability." + description: "Federated web search gateway group." ``` -## Sections Explained +### Sections Explained -### `organism` -Core settings. -- `name`: Logs/discovery. -- `identity`: Ed25519 private key path. -- `port` / `tls`: Main WSS bus. +#### `organism` +Core identity and main bus. +- `name`: Human identifier, used in logs and discovery. +- `identity`: Path to Ed25519 private key (signing, federation, OOB auth). +- `port` / `tls`: Main encrypted message bus. -### `oob` -Privileged local control channel. -- `enabled: false` → pure static (restart for changes). -- Localhost default for GUI safety. -- Separate from main port — bus oblivious. +#### `oob` +Privileged local control channel (GUI/hot-reload ready). +- Disabled → fully static configuration (restart required for changes). +- Bound to localhost by default for security. -### `thread_scheduling` -Balanced subthread execution. -- `"breadth-first"`: Fair round-robin (default, prevents deep starvation). -- `"depth-first"`: Dive deep into branches. +#### `thread_scheduling` +Subthread execution policy across the organism. +- `"breadth-first"` (default): fair round-robin, prevents deep branch starvation. +- `"depth-first"`: aggressive dive into branches. -### `meta` -Introspection controls (`https://xml-pipeline.org/ns/meta/v1`). +#### `meta` +Introspection controls (`https://xml-pipeline.org/ns/meta/v1` namespace). +- Flags control who may request capability lists, schemas, examples, prompts. -### `listeners` -Bounded capabilities. -- `name`: Discovery/logging (dots for hierarchy). -- `payload_class`: Full import to `@xmlify` dataclass. -- `handler`: Full import to function (dataclass → bytes). -- `description`: **Mandatory** human-readable blurb (lead-in for auto-prompt; fallback to generic if omitted). +#### `listeners` +All bounded capabilities (tools and agents). +- `name`: Unique registered name (dots allowed for hierarchy). Becomes prefix of derived root tag. +- `payload_class`: Full import path to `@xmlify` dataclass. +- `handler`: Full import path to async handler function. +- `description`: **Mandatory** short blurb — leads auto-generated tool prompts. +- `agent: true`: Designates LLM-driven listener → enforces unique root tag, exposes `own_name` in HandlerMetadata. +- `peers:`: List of registered names (or gateway groups) this listener is allowed to address. Enforced by pump for agents. +- `broadcast: true`: Opt-in flag allowing multiple listeners to share the exact same derived root tag (used for parallel gateways). -At startup/hot-reload: imports → Listener instantiation → bus.register() → XSD/example/prompt synthesis. +#### `gateways` +Federation peers (trusted remote organisms). +- Declared separately for clarity. +- Referenced in agent `peers:` lists by their registered `name`. -Cached XSDs: `schemas//v1.xsd`. +### Key Invariants (v2.1) +- Root tag = `{lowercase_name}.{lowercase_dataclass_name}` — fully derived, never written manually. +- Registered names must be unique across the organism. +- Normal listeners have globally unique root tags. +- Broadcast listeners may share root tags intentionally (same dataclass required). +- Agents always have unique root tags (enforced automatically). +- All structural changes after bootstrap require privileged OOB hot-reload. -### `agents` -LLM reasoners. -- `system_prompt`: Static file path. -- `tools`: Local names or remote references. -- Auto-injected live tool prompts at runtime. - -### `gateways` -Federation peers. -- Trusted public key required. -- Bidirectional regular traffic only. - -## Notes -- Hot-reload: Future privileged OOB commands (apply new YAML fragments, add/remove listeners). -- Namespaces: Capabilities under `https://xml-pipeline.org/ns///v1` (served live if configured). -- Edit → reload/restart → new bounded minds, self-describing and attack-resistant. - -This YAML is the organism's DNA — precise, auditable, and evolvable locally. \ No newline at end of file +This YAML is the organism’s DNA — precise, auditable, minimal, and fully aligned with listener-class-v2.1.md. diff --git a/docs/message-pump-v2.1.md b/docs/message-pump-v2.1.md index f8a7e89..83384d6 100644 --- a/docs/message-pump-v2.1.md +++ b/docs/message-pump-v2.1.md @@ -1,350 +1,185 @@ +**AgentServer v2.1 — Message Pump & Pipeline Architecture** -# Message Pump Architecture v2.1 -**January 06, 2026** -**AgentServer: Pipeline-per-Listener + Dispatcher Pattern** - -This document is the canonical specification for the AgentServer message pump. All implementation must conform to this architecture. +This document is the canonical specification for the AgentServer message pump in v2.1. +The previous version dated January 06, 2026 is hereby superseded. +All implementation must conform to this architecture. --- -## Core Pattern: Dictionary of Pipelines → Message Pump → Dispatcher +### Core Model -The message pump implements a three-stage architecture: - -1. **Pipeline Stage**: Parallel preprocessing pipelines (one per registered listener) that sanitize, validate, and prepare messages -2. **Message Pump**: Async event loop that orchestrates concurrent message processing, manages scheduling and backpressure -3. **Dispatcher**: Simple async function that delivers messages to handlers and awaits responses - -``` -Raw Message Ingress - ↓ -Pipeline Lookup & Assignment - ↓ -[Pipeline 1] [Pipeline 2] [Pipeline N] (parallel preprocessing) - ↓ ↓ ↓ -Pipeline Output Queues (processed messages ready for dispatch) - ↓ -Message Pump Event Loop - - Gathers ready messages - - Launches concurrent dispatcher(msg, handler) invocations - - Manages concurrency/scheduling/backpressure - ↓ -[dispatcher()] [dispatcher()] [dispatcher()] (concurrent, async) - ↓ ↓ ↓ -Handler Execution → await Response - ↓ -Message Pump Response Processing - - Extract multi-payloads (dummy wrap → parse → extract) - - Create envelopes with injection - - Re-inject to appropriate pipelines - ↓ -Pipeline Re-injection (cycle continues) -``` +- **Pipeline-per-listener** — each registered listener owns one dedicated preprocessing pipeline. +- **Permanent system pipeline** — always exists at bootstrap, even with zero user listeners. +- **Configurable ordered steps** — each pipeline is an ordered list of async coroutine functions that transform a universal `MessageState`. +- **Routing resolution inside pipeline** — routing is just another step; the dispatcher receives fully routed messages. +- **Dumb dispatcher** — only awaits handler(s) and processes responses. +- **Hard-coded multi-payload extraction** — handler responses are specially processed outside normal pipelines to support 1..n emitted payloads. --- -## Pipeline Architecture +### Universal Intermediate Representation: MessageState -### Pipeline Registration - -At boot (or hot-reload), each listener registration creates: -- Dedicated preprocessing pipeline instance -- Entry in routing table: `Dict[root_tag, Dict[listener_name, Pipeline]]` -- Cached XSD schema (derived from `@xmlify` dataclass) -- Example XML and tool description fragments - -**Example Registration**: ```python -@xmlify @dataclass -class CalculatorAdd: - """Add two numbers and return the sum.""" - a: float - b: float - -# Creates: -# - Pipeline instance for "calculator/add" -# - XSD cached at schemas/calculator/add/v1.xsd -# - Routing entry: pipelines["add"]["calculator"] = pipeline_instance +class MessageState: + raw_bytes: bytes | None = None # Initial ingress or extracted payload bytes + envelope_tree: Element | None = None # Full envelope after repair/C14N + payload_tree: Element | None = None # Extracted payload element + payload: Any | None = None # Deserialized @xmlify dataclass instance + thread_id: str | None = None # Opaque UUID inherited/carried + from_id: str | None = None # Registered name of sender (trustworthy) + target_listeners: list[Listener] | None = None # Resolved by routing step + error: str | None = None # Diagnostic message if step fails + metadata: dict[str, Any] = field(default_factory=dict) # Extension point ``` -### Pipeline Structure - -Each pipeline is identical in structure but operates on messages bound for its specific listener. A pipeline consists of an ordered array of processing tasks: - -**Standard Task Sequence**: -1. **Repair**: Fix malformed XML (lxml recovery mode) -2. **Canonicalization (C14N)**: Normalize whitespace, attributes, namespaces -3. **Envelope Validation**: Verify against `envelope.xsd` -4. **Payload Extraction**: Extract payload from `` wrapper -5. **XSD Validation**: Validate payload against listener's cached schema -6. **Deserialization**: Convert XML to typed `@dataclass` instance via `xmlable.from_xml` -7. **Error Injection**: On failure, inject `` error tag instead of discarding - -**Error Handling Philosophy**: -- Early pipelines (repair, C14N): May discard truly corrupt messages -- Later stages (validation): Inject `error description` into response -- LLMs see their errors and can self-correct -- Prevents silent failures while maintaining flow - -### System Pipeline - -A special system pipeline handles messages not bound for user listeners: -- Processes `` messages (startup trigger for human/keyboard listeners) -- Handles system-generated error responses -- Uses same task sequence but no XSD validation step +Every pipeline step receives and returns a `MessageState`. --- -## Dispatcher Architecture - -### Dispatcher Responsibilities - -The dispatcher is a **simple async function** that delivers a message to a handler and awaits the response: +### Default Listener Pipeline Steps (in order) ```python -async def dispatcher(msg, handler): - """Thin async routing layer - delivers message and awaits response""" - response = await handler(msg) - return response +default_listener_steps = [ + repair_step, # raw_bytes → envelope_tree (lxml recovery) + c14n_step, # normalize envelope_tree + envelope_validation_step, # validate against envelope.xsd + payload_extraction_step, # set payload_tree + xsd_validation_step, # validate against listener's cached XSD + deserialization_step, # set payload (dataclass instance) + routing_resolution_step, # set target_listeners based on root tag +] ``` -**Critical Property**: The dispatcher itself has no loop, no queue management, no concurrency control. It's a pure async delivery mechanism. All orchestration happens in the message pump. - -### Routing Logic - -**Lookup Key**: `(root_tag, listener_name)` from pipeline's registered listener - -**Delivery Rules**: -- **`` present**: Direct delivery to specific listener at `root_tag/listener_name` -- **`` absent**: Broadcast to ALL listeners registered for `root_tag` - -**Broadcast Semantics**: -- All handlers for a given root tag execute concurrently (via concurrent task launch). -- Responses are processed progressively as each handler completes (streaming/as-completed semantics). -- Each response is fully handled independently (multi-payload extraction, enveloping, re-injection). -- Responses bubble up in completion order (nondeterministic); no waiting for the full group. -- Ideal for racing parallel tools; agents handle any needed synchronization. - -**Example**: Message with root tag `` and no ``: -``` -Pump sees: root_tag="search", to=None -Lookup: pipelines["search"] → {"google": pipeline_1, "bing": pipeline_2} -Execute: - - Launch concurrent dispatchers for all handlers - - Monitor tasks via asyncio.as_completed - - As each completes: extract payloads, envelope, re-inject immediately - - No batch wait—fast responses bubble first -``` +Each step is an `async def step(state: MessageState) -> MessageState`. --- -## Message Pump Event Loop - -The message pump is the orchestration layer that manages concurrency, scheduling, and message flow: +### System Pipeline (fixed, shorter steps) ```python -async def message_pump(): - """Main event loop - orchestrates concurrent message processing""" - while True: - # Gather all ready messages from pipeline outputs - ready_messages = await gather_ready_messages_from_pipelines() - - # For each message, lookup handler(s) and launch dispatcher(s) - tasks = [] - for msg in ready_messages: - handlers = lookup_handlers(msg) # may return multiple for broadcast - for handler in handlers: - task = asyncio.create_task(dispatcher(msg, handler)) - tasks.append(task) - - # Process responses as they complete (streaming) - for completed_task in asyncio.as_completed(tasks): - response = await completed_task - # Extract multi-payloads (dummy wrap → parse → extract) - payloads = extract_payloads(response) - - # Wrap each in envelope, inject , re-route to pipelines - for payload in payloads: - enveloped = create_envelope(payload, response.context) - await send_to_pipeline(enveloped) +system_steps = [ + repair_step, + c14n_step, + envelope_validation_step, + payload_extraction_step, + system_routing_and_handler_step, # handles unknown roots, meta, leaked privileged, boot, emits or system messages +] ``` -**Key Responsibilities**: -1. **Concurrency Control**: Decides how many dispatchers to launch simultaneously -2. **Fair Scheduling**: Can implement priority queues, round-robin, or other fairness policies -3. **Backpressure**: Monitors pipeline queue depths, throttles if needed -4. **Response Handling**: Extracts multi-payloads and re-injects each response as soon as its handler completes (progressive streaming for broadcasts) - -**Concurrency Model**: Unbounded concurrent dispatchers; responses stream independently. Future enhancements include per-listener semaphores, global limits, and token-rate throttling. +The system pipeline is instantiated at organism bootstrap and never removed. --- -## Message Flow Example: Complete Cycle (Broadcast ) +### Pipeline Execution (shared by all pipelines) -1. **Ingress**: External `root` -2. **Pipeline Assignment**: Root tag "search" → multiple pipelines (google, bing) -3. **Pipeline Processing** (parallel per listener): Repair/C14N/validation/deserialization -4. **Message Pump**: Gathers ready messages, launches concurrent dispatchers -5. **Concurrent Handler Execution**: - - google_handler completes first (500ms): `Sunny, 72°F` → processed/bubbled immediately - - bing_handler completes second (700ms): `Clear skies, 70°F` → processed/bubbled next - - No waiting—receiver sees results as they arrive -6. **Response Processing** (progressive): As each completes, extract, envelope with ``, re-inject to target pipeline -7. **Response Bubbling**: Results route back to parent (e.g., researcher/user) in completion order - ---- - -## Boot Sequence - -1. On startup, system generates: `systemroot` -2. Sent to system pipeline -3. Dispatched to ALL listeners registered for `` root tag -4. Human listener can register for `` to: - - Display welcome message - - Await keyboard input - - Initiate first real conversation - -**Example Human Listener**: ```python -@xmlify -@dataclass -class Boot: - """System boot notification""" - pass +async def run_pipeline(state: MessageState, pipeline: Pipeline): + for step in pipeline.steps: + try: + state = await step(state) + if state.error: # early diagnostic + break + except Exception as exc: + state.error = f"Pipeline step {step.__name__} failed: {exc}" + break -async def human_boot_handler(msg: Boot) -> bytes: - print("System ready. Type your message:") - user_input = await async_input() - return f"{user_input}".encode() + if state.target_listeners: + await dispatcher(state) + else: + # Unroutable → send to system pipeline for + await system_pipeline.process(state) ``` ---- - -## Out-of-Band (OOB) Privileged Messages - -### Separation of Concerns - -Privileged operations (defined in `privileged-msg.xsd`) operate on a completely separate channel: -- Dedicated websocket port (or Unix socket) -- Bound to localhost by default -- Uses Ed25519 signature verification - -**The message pump dispatcher has NO knowledge of privileged messages**: -- Main dispatcher only routes messages with user/capability payloads -- Privileged messages like ``, ``, `` are handled by separate OOB handler -- No possibility of privilege escalation via main message flow - -**Security Guarantee**: Remote clients cannot send privileged messages (channel not exposed). Even if leaked to main port, dispatcher would fail routing lookup (no pipeline registered for privileged root tags). +Pipelines run concurrently; messages within a single pipeline are processed sequentially. --- -## Pipeline Optimization & Scheduling +### Handler Response Processing (hard-coded path) -### Pipeline Parallelism +After dispatcher awaits a handler: -Pipelines process independently and in parallel: -- Each listener's pipeline can execute simultaneously -- No shared state between pipelines (XSD schemas are cached read-only) -- Enables high throughput for multi-listener broadcasts +```python +response_bytes = await handler(state.payload, metadata) -### Future: Token-Rate Monitoring +# Safety guard +if response_bytes is None or not isinstance(response_bytes, bytes): + response_bytes = b"Handler failed to return valid bytes — likely missing return or wrong type" -Currently not implemented, but architecture supports: -- Each pipeline tracks tokens processed per minute -- Dispatcher can throttle high-volume agents -- Fair-share scheduling to prevent LLM monopolization +# Dedicated multi-payload extraction (hard-coded, tolerant) +payloads_bytes_list = await multi_payload_extract(response_bytes) -**Placeholder**: Token counting will be integrated once LLM abstraction layer is defined. - ---- - -## Configuration & Wiring - -### YAML Bootstrap (`organism.yaml`) - -Defines initial swarm topology: -```yaml -listeners: - - name: calculator - capability: calculator.add - root_tag: add - namespace: https://xml-pipeline.org/ns/tools/calculator/v1 - - - name: researcher - capability: llm.researcher - root_tag: research-query - namespace: https://xml-pipeline.org/ns/agents/researcher/v1 - tools: - - calculator # researcher can see/call calculator - - websearch - - - name: websearch - capability: tools.google_search - root_tag: search - namespace: https://xml-pipeline.org/ns/tools/websearch/v1 - -agents: - - name: researcher - type: llm - model: claude-sonnet-4 - system_prompt: "You are a research assistant..." - visible_tools: # restricts which listeners this agent can call - - calculator - - websearch - -meta: - allow_list_capabilities: admin # or "all", "none" - allow_schema_requests: admin +for payload_bytes in payloads_bytes_list: + # Create fresh initial state for each emitted payload + new_state = MessageState( + raw_bytes=payload_bytes, + thread_id=state.thread_id, # inherited + from_id=current_listener.name, # provenance injection + ) + # Route through normal pipeline resolution (root tag lookup) + await route_and_process(new_state) ``` -**Key Properties**: -- Defines initial routing table (`root_tag → listener_name`) -- Controls visibility (agent A may not know agent B exists) -- Meta introspection privileges -- All structural changes require OOB privileged commands (hot-reload) +`multi_payload_extract` wraps in `` (idempotent), repairs/parses, extracts all root elements, returns list of bytes. If none found → single diagnostic ``. --- -## Summary: Critical Invariants +### Routing Resolution Step -1. **Pipeline-per-Listener**: Each registered listener has dedicated preprocessing pipeline -2. **Async Concurrency**: Message pump launches concurrent dispatcher invocations; handlers run in parallel via asyncio -3. **Stateless Dispatcher**: Dispatcher is a simple async function `(msg, handler) → response`, no loop or state -4. **Pump Orchestrates**: Message pump event loop controls concurrency, scheduling, backpressure, and response handling -5. **UUID Privacy**: Thread paths are opaque UUIDs; system maintains actual tree privately -6. **Error Injection**: Validation failures inject `` instead of silent discard -7. **Multi-Payload Extraction**: Handlers may emit multiple payloads; pump extracts, envelopes, and re-injects each -8. **Broadcast = Streaming Concurrent**: Multiple listeners execute in parallel; responses processed and bubbled as they complete (no group wait) -9. **OOB Isolation**: Privileged messages never touch main message pump or dispatcher -10. **Boot Message**: System-generated `` enables listener-only architecture -11. **Stateless Handlers**: All routing, thread context, and identity is managed externally; handlers remain pure -12. **Parallel Everything**: Pipelines preprocess concurrently, pump launches dispatchers concurrently, responses stream progressively +Inside the pipeline, after deserialization: + +- Compute root tag = `{state.from_id.lower()}.{type(state.payload).__name__.lower()}` +- Lookup in primary routing table (root_tag → Listener) +- If found → `state.target_listeners = [listener]` +- If broadcast case matches → `state.target_listeners = list_of_matching_listeners` +- Else → `state.error = "Unknown capability"` + +Agents calling peers: pump enforces payload root tag is in allowed peers list (or broadcast group when we add it). --- -## Next Steps +### Dispatcher (dumb fire-and-await) -This document establishes the foundational architecture. Implementation priorities: +```python +async def dispatcher(state: MessageState): + if not state.target_listeners: + return -1. **Immediate (Echo Chamber Milestone)**: - - Implement basic pipeline task sequence (repair → C14N → validate) - - Implement sequential dispatcher with simple routing - - Basic `` error injection on validation failure - - Boot message generation + if len(state.target_listeners) == 1: + await process_single_handler(state) + else: # broadcast + tasks = [ + process_single_handler(state, listener_override=listener) + for listener in state.target_listeners + ] + for future in asyncio.as_completed(tasks): + await future # responses processed immediately as they complete +``` -2. **Near-Term**: - - Multi-payload extraction and re-injection - - UUID path registry and privacy enforcement - - YAML-driven listener registration - - Pipeline parallelism - -3. **Future**: - - Token-rate monitoring per pipeline - - Fair-share dispatcher scheduling - - Advanced error recovery strategies - - Hot-reload capability via OOB +`process_single_handler` awaits the handler and triggers the hard-coded response processing path above. --- -**Status**: This document is now the single source of truth for message pump architecture. All code, diagrams, and decisions must align with this specification. +### Key Invariants (v2.1) + +1. One dedicated pipeline per registered listener + permanent system pipeline. +2. Pipelines are ordered lists of async steps operating on universal `MessageState`. +3. Routing resolution is a normal pipeline step → dispatcher receives pre-routed targets. +4. Handler responses go through hard-coded multi-payload extraction → each payload becomes fresh `MessageState` routed normally. +5. Provenance (``) and thread continuity injected by pump, never by handlers. +6. `` guards protect against missing returns and step failures. +7. Extensibility: new steps (token counting, rate limiting, logging) insert anywhere in default list. + +--- + +### Future Extensions (not v2.1) + +- Hot-reload replace pipeline step list per listener +- Broadcast groups via `group:` YAML key (v2.2 candidate) +- Per-thread token bucket enforcement step + +--- + +This specification is now aligned with listener-class-v2.1.md and configuration-v2.1.md. +The message pump is simple, auditable, high-throughput, and infinitely extensible via pipeline steps. +