fixing docs

This commit is contained in:
dullfig 2026-01-07 13:12:08 -08:00
parent 6696c06e4f
commit 3105648fd1
5 changed files with 210 additions and 362 deletions

View file

@ -1,115 +1,128 @@
G# Configuration — organism.yaml (v2.1)
**AgentServer v2.1 — Organism Configuration**
This file is the canonical reference for `organism.yaml` format in v2.1.
The old `configuration.md` is hereby obsolete and superseded.
The entire organism is declared in a single YAML file (default: `config/organism.yaml`).
Loaded at bootstrap — single source of truth for initial composition.
Runtime changes (hot-reload) via local OOB privileged commands.
It is the single source of truth for initial composition, loaded at bootstrap.
Runtime structural changes (add/remove/replace listeners) are performed exclusively via privileged OOB commands (hot-reload).
## Example Full Configuration
### Full Example (ResearchSwarm-01)
```yaml
organism:
name: "ResearchSwarm-01"
identity: "config/identity/private.ed25519" # Ed25519 private key
port: 8765 # Main message bus WSS
identity: "config/identity/private.ed25519" # Ed25519 private key path
port: 8765 # Main WSS message bus
tls:
cert: "certs/fullchain.pem"
key: "certs/privkey.pem"
oob: # Out-of-band privileged channel (GUI/hot-reload ready)
oob: # Out-of-band privileged channel
enabled: true
bind: "127.0.0.1" # Localhost-only default
port: 8766 # Separate WSS port
# unix_socket: "/tmp/organism.sock" # Alternative
bind: "127.0.0.1" # Localhost-only by default (GUI safe)
port: 8766 # Separate WSS port from main bus
# unix_socket: "/tmp/organism.sock" # Alternative binding
thread_scheduling: "breadth-first" # or "depth-first" (default: breadth-first)
thread_scheduling: "breadth-first" # or "depth-first"
meta:
enabled: true
allow_list_capabilities: true
allow_schema_requests: "admin" # "admin" | "authenticated" | "none"
allow_schema_requests: "admin" # "admin" | "authenticated" | "none"
allow_example_requests: "admin"
allow_prompt_requests: "admin"
allow_remote: false # Federation peers query meta
allow_remote: false # Federation peers may query meta
listeners:
- name: calculator.add
payload_class: examples.calculator.AddPayload
handler: examples.calculator.add_handler
description: "Adds two integers and returns their sum." # Mandatory for usable tool prompts
description: "Adds two integers and returns their sum."
- name: summarizer
- name: calculator.multiply
payload_class: examples.calculator.MultiplyPayload
handler: examples.calculator.multiply_handler
description: "Multiplies two integers and returns their product."
- name: local_summarizer
payload_class: agents.summarizer.SummarizePayload
handler: agents.summarizer.summarize_handler
description: "Summarizes text via local LLM."
agents:
- name: researcher
system_prompt: "prompts/researcher_system.txt"
tools:
payload_class: agents.researcher.ResearchPayload
handler: agents.researcher.research_handler
description: "Primary research agent that reasons and coordinates tools."
agent: true # LLM agent → unique root tag, own_name exposed
peers: # Allowed call targets
- calculator.add
- summarizer
- name: web_search
remote: true
gateways:
- search_node1
- search_node2
- search_node3 # list = broadcast to all
mode: "first-answer-wins" # optional: "all" (collect responses), default "single" if one gateway
- calculator.multiply
- local_summarizer
- web_search # gateway group, defined below
- name: search.google
payload_class: gateways.google.SearchPayload
handler: gateways.google.search_handler
description: "Google search gateway."
broadcast: true # Shares root tag with other search.* listeners
- name: search.bing
payload_class: gateways.google.SearchPayload # Identical dataclass required
handler: gateways.bing.search_handler
description: "Bing search gateway."
broadcast: true
gateways:
- name: web_search
remote_url: "wss://trusted-search-node.example.org"
trusted_identity: "pubkeys/search_node.ed25519.pub"
description: "Federated web search capability."
description: "Federated web search gateway group."
```
## Sections Explained
### Sections Explained
### `organism`
Core settings.
- `name`: Logs/discovery.
- `identity`: Ed25519 private key path.
- `port` / `tls`: Main WSS bus.
#### `organism`
Core identity and main bus.
- `name`: Human identifier, used in logs and discovery.
- `identity`: Path to Ed25519 private key (signing, federation, OOB auth).
- `port` / `tls`: Main encrypted message bus.
### `oob`
Privileged local control channel.
- `enabled: false` → pure static (restart for changes).
- Localhost default for GUI safety.
- Separate from main port — bus oblivious.
#### `oob`
Privileged local control channel (GUI/hot-reload ready).
- Disabled → fully static configuration (restart required for changes).
- Bound to localhost by default for security.
### `thread_scheduling`
Balanced subthread execution.
- `"breadth-first"`: Fair round-robin (default, prevents deep starvation).
- `"depth-first"`: Dive deep into branches.
#### `thread_scheduling`
Subthread execution policy across the organism.
- `"breadth-first"` (default): fair round-robin, prevents deep branch starvation.
- `"depth-first"`: aggressive dive into branches.
### `meta`
Introspection controls (`https://xml-pipeline.org/ns/meta/v1`).
#### `meta`
Introspection controls (`https://xml-pipeline.org/ns/meta/v1` namespace).
- Flags control who may request capability lists, schemas, examples, prompts.
### `listeners`
Bounded capabilities.
- `name`: Discovery/logging (dots for hierarchy).
- `payload_class`: Full import to `@xmlify` dataclass.
- `handler`: Full import to function (dataclass → bytes).
- `description`: **Mandatory** human-readable blurb (lead-in for auto-prompt; fallback to generic if omitted).
#### `listeners`
All bounded capabilities (tools and agents).
- `name`: Unique registered name (dots allowed for hierarchy). Becomes prefix of derived root tag.
- `payload_class`: Full import path to `@xmlify` dataclass.
- `handler`: Full import path to async handler function.
- `description`: **Mandatory** short blurb — leads auto-generated tool prompts.
- `agent: true`: Designates LLM-driven listener → enforces unique root tag, exposes `own_name` in HandlerMetadata.
- `peers:`: List of registered names (or gateway groups) this listener is allowed to address. Enforced by pump for agents.
- `broadcast: true`: Opt-in flag allowing multiple listeners to share the exact same derived root tag (used for parallel gateways).
At startup/hot-reload: imports → Listener instantiation → bus.register() → XSD/example/prompt synthesis.
#### `gateways`
Federation peers (trusted remote organisms).
- Declared separately for clarity.
- Referenced in agent `peers:` lists by their registered `name`.
Cached XSDs: `schemas/<name>/v1.xsd`.
### Key Invariants (v2.1)
- Root tag = `{lowercase_name}.{lowercase_dataclass_name}` — fully derived, never written manually.
- Registered names must be unique across the organism.
- Normal listeners have globally unique root tags.
- Broadcast listeners may share root tags intentionally (same dataclass required).
- Agents always have unique root tags (enforced automatically).
- All structural changes after bootstrap require privileged OOB hot-reload.
### `agents`
LLM reasoners.
- `system_prompt`: Static file path.
- `tools`: Local names or remote references.
- Auto-injected live tool prompts at runtime.
### `gateways`
Federation peers.
- Trusted public key required.
- Bidirectional regular traffic only.
## Notes
- Hot-reload: Future privileged OOB commands (apply new YAML fragments, add/remove listeners).
- Namespaces: Capabilities under `https://xml-pipeline.org/ns/<category>/<name>/v1` (served live if configured).
- Edit → reload/restart → new bounded minds, self-describing and attack-resistant.
This YAML is the organism's DNA — precise, auditable, and evolvable locally.
This YAML is the organisms DNA — precise, auditable, minimal, and fully aligned with listener-class-v2.1.md.

View file

@ -1,350 +1,185 @@
**AgentServer v2.1 — Message Pump & Pipeline Architecture**
# Message Pump Architecture v2.1
**January 06, 2026**
**AgentServer: Pipeline-per-Listener + Dispatcher Pattern**
This document is the canonical specification for the AgentServer message pump. All implementation must conform to this architecture.
This document is the canonical specification for the AgentServer message pump in v2.1.
The previous version dated January 06, 2026 is hereby superseded.
All implementation must conform to this architecture.
---
## Core Pattern: Dictionary of Pipelines → Message Pump → Dispatcher
### Core Model
The message pump implements a three-stage architecture:
1. **Pipeline Stage**: Parallel preprocessing pipelines (one per registered listener) that sanitize, validate, and prepare messages
2. **Message Pump**: Async event loop that orchestrates concurrent message processing, manages scheduling and backpressure
3. **Dispatcher**: Simple async function that delivers messages to handlers and awaits responses
```
Raw Message Ingress
Pipeline Lookup & Assignment
[Pipeline 1] [Pipeline 2] [Pipeline N] (parallel preprocessing)
↓ ↓ ↓
Pipeline Output Queues (processed messages ready for dispatch)
Message Pump Event Loop
- Gathers ready messages
- Launches concurrent dispatcher(msg, handler) invocations
- Manages concurrency/scheduling/backpressure
[dispatcher()] [dispatcher()] [dispatcher()] (concurrent, async)
↓ ↓ ↓
Handler Execution → await Response
Message Pump Response Processing
- Extract multi-payloads (dummy wrap → parse → extract)
- Create envelopes with <from> injection
- Re-inject to appropriate pipelines
Pipeline Re-injection (cycle continues)
```
- **Pipeline-per-listener** — each registered listener owns one dedicated preprocessing pipeline.
- **Permanent system pipeline** — always exists at bootstrap, even with zero user listeners.
- **Configurable ordered steps** — each pipeline is an ordered list of async coroutine functions that transform a universal `MessageState`.
- **Routing resolution inside pipeline** — routing is just another step; the dispatcher receives fully routed messages.
- **Dumb dispatcher** — only awaits handler(s) and processes responses.
- **Hard-coded multi-payload extraction** — handler responses are specially processed outside normal pipelines to support 1..n emitted payloads.
---
## Pipeline Architecture
### Universal Intermediate Representation: MessageState
### Pipeline Registration
At boot (or hot-reload), each listener registration creates:
- Dedicated preprocessing pipeline instance
- Entry in routing table: `Dict[root_tag, Dict[listener_name, Pipeline]]`
- Cached XSD schema (derived from `@xmlify` dataclass)
- Example XML and tool description fragments
**Example Registration**:
```python
@xmlify
@dataclass
class CalculatorAdd:
"""Add two numbers and return the sum."""
a: float
b: float
# Creates:
# - Pipeline instance for "calculator/add"
# - XSD cached at schemas/calculator/add/v1.xsd
# - Routing entry: pipelines["add"]["calculator"] = pipeline_instance
class MessageState:
raw_bytes: bytes | None = None # Initial ingress or extracted payload bytes
envelope_tree: Element | None = None # Full <message> envelope after repair/C14N
payload_tree: Element | None = None # Extracted payload element
payload: Any | None = None # Deserialized @xmlify dataclass instance
thread_id: str | None = None # Opaque UUID inherited/carried
from_id: str | None = None # Registered name of sender (trustworthy)
target_listeners: list[Listener] | None = None # Resolved by routing step
error: str | None = None # Diagnostic message if step fails
metadata: dict[str, Any] = field(default_factory=dict) # Extension point
```
### Pipeline Structure
Each pipeline is identical in structure but operates on messages bound for its specific listener. A pipeline consists of an ordered array of processing tasks:
**Standard Task Sequence**:
1. **Repair**: Fix malformed XML (lxml recovery mode)
2. **Canonicalization (C14N)**: Normalize whitespace, attributes, namespaces
3. **Envelope Validation**: Verify against `envelope.xsd`
4. **Payload Extraction**: Extract payload from `<message>` wrapper
5. **XSD Validation**: Validate payload against listener's cached schema
6. **Deserialization**: Convert XML to typed `@dataclass` instance via `xmlable.from_xml`
7. **Error Injection**: On failure, inject `<huh>` error tag instead of discarding
**Error Handling Philosophy**:
- Early pipelines (repair, C14N): May discard truly corrupt messages
- Later stages (validation): Inject `<huh>error description</huh>` into response
- LLMs see their errors and can self-correct
- Prevents silent failures while maintaining flow
### System Pipeline
A special system pipeline handles messages not bound for user listeners:
- Processes `<boot/>` messages (startup trigger for human/keyboard listeners)
- Handles system-generated error responses
- Uses same task sequence but no XSD validation step
Every pipeline step receives and returns a `MessageState`.
---
## Dispatcher Architecture
### Dispatcher Responsibilities
The dispatcher is a **simple async function** that delivers a message to a handler and awaits the response:
### Default Listener Pipeline Steps (in order)
```python
async def dispatcher(msg, handler):
"""Thin async routing layer - delivers message and awaits response"""
response = await handler(msg)
return response
default_listener_steps = [
repair_step, # raw_bytes → envelope_tree (lxml recovery)
c14n_step, # normalize envelope_tree
envelope_validation_step, # validate against envelope.xsd
payload_extraction_step, # set payload_tree
xsd_validation_step, # validate against listener's cached XSD
deserialization_step, # set payload (dataclass instance)
routing_resolution_step, # set target_listeners based on root tag
]
```
**Critical Property**: The dispatcher itself has no loop, no queue management, no concurrency control. It's a pure async delivery mechanism. All orchestration happens in the message pump.
### Routing Logic
**Lookup Key**: `(root_tag, listener_name)` from pipeline's registered listener
**Delivery Rules**:
- **`<to/>` present**: Direct delivery to specific listener at `root_tag/listener_name`
- **`<to/>` absent**: Broadcast to ALL listeners registered for `root_tag`
**Broadcast Semantics**:
- All handlers for a given root tag execute concurrently (via concurrent task launch).
- Responses are processed progressively as each handler completes (streaming/as-completed semantics).
- Each response is fully handled independently (multi-payload extraction, enveloping, re-injection).
- Responses bubble up in completion order (nondeterministic); no waiting for the full group.
- Ideal for racing parallel tools; agents handle any needed synchronization.
**Example**: Message with root tag `<search>` and no `<to/>`:
```
Pump sees: root_tag="search", to=None
Lookup: pipelines["search"] → {"google": pipeline_1, "bing": pipeline_2}
Execute:
- Launch concurrent dispatchers for all handlers
- Monitor tasks via asyncio.as_completed
- As each completes: extract payloads, envelope, re-inject immediately
- No batch wait—fast responses bubble first
```
Each step is an `async def step(state: MessageState) -> MessageState`.
---
## Message Pump Event Loop
The message pump is the orchestration layer that manages concurrency, scheduling, and message flow:
### System Pipeline (fixed, shorter steps)
```python
async def message_pump():
"""Main event loop - orchestrates concurrent message processing"""
while True:
# Gather all ready messages from pipeline outputs
ready_messages = await gather_ready_messages_from_pipelines()
# For each message, lookup handler(s) and launch dispatcher(s)
tasks = []
for msg in ready_messages:
handlers = lookup_handlers(msg) # may return multiple for broadcast
for handler in handlers:
task = asyncio.create_task(dispatcher(msg, handler))
tasks.append(task)
# Process responses as they complete (streaming)
for completed_task in asyncio.as_completed(tasks):
response = await completed_task
# Extract multi-payloads (dummy wrap → parse → extract)
payloads = extract_payloads(response)
# Wrap each in envelope, inject <from>, re-route to pipelines
for payload in payloads:
enveloped = create_envelope(payload, response.context)
await send_to_pipeline(enveloped)
system_steps = [
repair_step,
c14n_step,
envelope_validation_step,
payload_extraction_step,
system_routing_and_handler_step, # handles unknown roots, meta, leaked privileged, boot, emits <huh> or system messages
]
```
**Key Responsibilities**:
1. **Concurrency Control**: Decides how many dispatchers to launch simultaneously
2. **Fair Scheduling**: Can implement priority queues, round-robin, or other fairness policies
3. **Backpressure**: Monitors pipeline queue depths, throttles if needed
4. **Response Handling**: Extracts multi-payloads and re-injects each response as soon as its handler completes (progressive streaming for broadcasts)
**Concurrency Model**: Unbounded concurrent dispatchers; responses stream independently. Future enhancements include per-listener semaphores, global limits, and token-rate throttling.
The system pipeline is instantiated at organism bootstrap and never removed.
---
## Message Flow Example: Complete Cycle (Broadcast <search>)
### Pipeline Execution (shared by all pipelines)
1. **Ingress**: External `<message><thread>root</thread><search query="weather today"/></message>`
2. **Pipeline Assignment**: Root tag "search" → multiple pipelines (google, bing)
3. **Pipeline Processing** (parallel per listener): Repair/C14N/validation/deserialization
4. **Message Pump**: Gathers ready messages, launches concurrent dispatchers
5. **Concurrent Handler Execution**:
- google_handler completes first (500ms): `<search-result>Sunny, 72°F</search-result>` → processed/bubbled immediately
- bing_handler completes second (700ms): `<search-result>Clear skies, 70°F</search-result>` → processed/bubbled next
- No waiting—receiver sees results as they arrive
6. **Response Processing** (progressive): As each completes, extract, envelope with `<from>`, re-inject to target pipeline
7. **Response Bubbling**: Results route back to parent (e.g., researcher/user) in completion order
---
## Boot Sequence
1. On startup, system generates: `<message><from>system</from><thread>root</thread><boot/></message>`
2. Sent to system pipeline
3. Dispatched to ALL listeners registered for `<boot/>` root tag
4. Human listener can register for `<boot/>` to:
- Display welcome message
- Await keyboard input
- Initiate first real conversation
**Example Human Listener**:
```python
@xmlify
@dataclass
class Boot:
"""System boot notification"""
pass
async def run_pipeline(state: MessageState, pipeline: Pipeline):
for step in pipeline.steps:
try:
state = await step(state)
if state.error: # early diagnostic
break
except Exception as exc:
state.error = f"Pipeline step {step.__name__} failed: {exc}"
break
async def human_boot_handler(msg: Boot) -> bytes:
print("System ready. Type your message:")
user_input = await async_input()
return f"<message>{user_input}</message>".encode()
if state.target_listeners:
await dispatcher(state)
else:
# Unroutable → send to system pipeline for <huh>
await system_pipeline.process(state)
```
---
## Out-of-Band (OOB) Privileged Messages
### Separation of Concerns
Privileged operations (defined in `privileged-msg.xsd`) operate on a completely separate channel:
- Dedicated websocket port (or Unix socket)
- Bound to localhost by default
- Uses Ed25519 signature verification
**The message pump dispatcher has NO knowledge of privileged messages**:
- Main dispatcher only routes messages with user/capability payloads
- Privileged messages like `<add-listener>`, `<remove-listener>`, `<hot-reload>` are handled by separate OOB handler
- No possibility of privilege escalation via main message flow
**Security Guarantee**: Remote clients cannot send privileged messages (channel not exposed). Even if leaked to main port, dispatcher would fail routing lookup (no pipeline registered for privileged root tags).
Pipelines run concurrently; messages within a single pipeline are processed sequentially.
---
## Pipeline Optimization & Scheduling
### Handler Response Processing (hard-coded path)
### Pipeline Parallelism
After dispatcher awaits a handler:
Pipelines process independently and in parallel:
- Each listener's pipeline can execute simultaneously
- No shared state between pipelines (XSD schemas are cached read-only)
- Enables high throughput for multi-listener broadcasts
```python
response_bytes = await handler(state.payload, metadata)
### Future: Token-Rate Monitoring
# Safety guard
if response_bytes is None or not isinstance(response_bytes, bytes):
response_bytes = b"<huh>Handler failed to return valid bytes — likely missing return or wrong type</huh>"
Currently not implemented, but architecture supports:
- Each pipeline tracks tokens processed per minute
- Dispatcher can throttle high-volume agents
- Fair-share scheduling to prevent LLM monopolization
# Dedicated multi-payload extraction (hard-coded, tolerant)
payloads_bytes_list = await multi_payload_extract(response_bytes)
**Placeholder**: Token counting will be integrated once LLM abstraction layer is defined.
---
## Configuration & Wiring
### YAML Bootstrap (`organism.yaml`)
Defines initial swarm topology:
```yaml
listeners:
- name: calculator
capability: calculator.add
root_tag: add
namespace: https://xml-pipeline.org/ns/tools/calculator/v1
- name: researcher
capability: llm.researcher
root_tag: research-query
namespace: https://xml-pipeline.org/ns/agents/researcher/v1
tools:
- calculator # researcher can see/call calculator
- websearch
- name: websearch
capability: tools.google_search
root_tag: search
namespace: https://xml-pipeline.org/ns/tools/websearch/v1
agents:
- name: researcher
type: llm
model: claude-sonnet-4
system_prompt: "You are a research assistant..."
visible_tools: # restricts which listeners this agent can call
- calculator
- websearch
meta:
allow_list_capabilities: admin # or "all", "none"
allow_schema_requests: admin
for payload_bytes in payloads_bytes_list:
# Create fresh initial state for each emitted payload
new_state = MessageState(
raw_bytes=payload_bytes,
thread_id=state.thread_id, # inherited
from_id=current_listener.name, # provenance injection
)
# Route through normal pipeline resolution (root tag lookup)
await route_and_process(new_state)
```
**Key Properties**:
- Defines initial routing table (`root_tag → listener_name`)
- Controls visibility (agent A may not know agent B exists)
- Meta introspection privileges
- All structural changes require OOB privileged commands (hot-reload)
`multi_payload_extract` wraps in `<dummy>` (idempotent), repairs/parses, extracts all root elements, returns list of bytes. If none found → single diagnostic `<huh>`.
---
## Summary: Critical Invariants
### Routing Resolution Step
1. **Pipeline-per-Listener**: Each registered listener has dedicated preprocessing pipeline
2. **Async Concurrency**: Message pump launches concurrent dispatcher invocations; handlers run in parallel via asyncio
3. **Stateless Dispatcher**: Dispatcher is a simple async function `(msg, handler) → response`, no loop or state
4. **Pump Orchestrates**: Message pump event loop controls concurrency, scheduling, backpressure, and response handling
5. **UUID Privacy**: Thread paths are opaque UUIDs; system maintains actual tree privately
6. **Error Injection**: Validation failures inject `<huh>` instead of silent discard
7. **Multi-Payload Extraction**: Handlers may emit multiple payloads; pump extracts, envelopes, and re-injects each
8. **Broadcast = Streaming Concurrent**: Multiple listeners execute in parallel; responses processed and bubbled as they complete (no group wait)
9. **OOB Isolation**: Privileged messages never touch main message pump or dispatcher
10. **Boot Message**: System-generated `<boot/>` enables listener-only architecture
11. **Stateless Handlers**: All routing, thread context, and identity is managed externally; handlers remain pure
12. **Parallel Everything**: Pipelines preprocess concurrently, pump launches dispatchers concurrently, responses stream progressively
Inside the pipeline, after deserialization:
- Compute root tag = `{state.from_id.lower()}.{type(state.payload).__name__.lower()}`
- Lookup in primary routing table (root_tag → Listener)
- If found → `state.target_listeners = [listener]`
- If broadcast case matches → `state.target_listeners = list_of_matching_listeners`
- Else → `state.error = "Unknown capability"`
Agents calling peers: pump enforces payload root tag is in allowed peers list (or broadcast group when we add it).
---
## Next Steps
### Dispatcher (dumb fire-and-await)
This document establishes the foundational architecture. Implementation priorities:
```python
async def dispatcher(state: MessageState):
if not state.target_listeners:
return
1. **Immediate (Echo Chamber Milestone)**:
- Implement basic pipeline task sequence (repair → C14N → validate)
- Implement sequential dispatcher with simple routing
- Basic `<huh>` error injection on validation failure
- Boot message generation
if len(state.target_listeners) == 1:
await process_single_handler(state)
else: # broadcast
tasks = [
process_single_handler(state, listener_override=listener)
for listener in state.target_listeners
]
for future in asyncio.as_completed(tasks):
await future # responses processed immediately as they complete
```
2. **Near-Term**:
- Multi-payload extraction and re-injection
- UUID path registry and privacy enforcement
- YAML-driven listener registration
- Pipeline parallelism
3. **Future**:
- Token-rate monitoring per pipeline
- Fair-share dispatcher scheduling
- Advanced error recovery strategies
- Hot-reload capability via OOB
`process_single_handler` awaits the handler and triggers the hard-coded response processing path above.
---
**Status**: This document is now the single source of truth for message pump architecture. All code, diagrams, and decisions must align with this specification.
### Key Invariants (v2.1)
1. One dedicated pipeline per registered listener + permanent system pipeline.
2. Pipelines are ordered lists of async steps operating on universal `MessageState`.
3. Routing resolution is a normal pipeline step → dispatcher receives pre-routed targets.
4. Handler responses go through hard-coded multi-payload extraction → each payload becomes fresh `MessageState` routed normally.
5. Provenance (`<from>`) and thread continuity injected by pump, never by handlers.
6. `<huh>` guards protect against missing returns and step failures.
7. Extensibility: new steps (token counting, rate limiting, logging) insert anywhere in default list.
---
### Future Extensions (not v2.1)
- Hot-reload replace pipeline step list per listener
- Broadcast groups via `group:` YAML key (v2.2 candidate)
- Per-thread token bucket enforcement step
---
This specification is now aligned with listener-class-v2.1.md and configuration-v2.1.md.
The message pump is simple, auditable, high-throughput, and infinitely extensible via pipeline steps.