xml-pipeline/docs/wiki/architecture/Message-Pump.md
dullfig 3a128d4d1f Fix line endings in wiki docs
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 22:16:26 -08:00

10 KiB

Message Pump

The Message Pump (StreamPump) is the heart of xml-pipeline. It orchestrates message flow from ingress through processing to handler dispatch and response handling.

Overview

The pump uses aiostream for stream-based processing with concurrent fan-out capabilities.

from xml_pipeline.message_bus.stream_pump import StreamPump

pump = StreamPump(config)
await pump.start()

# Inject a message
await pump.inject(raw_bytes, from_id="console")

await pump.shutdown()

Architecture

                    ┌─────────────────────┐
                    │   Message Source    │
                    │ (Console, WebSocket)│
                    └──────────┬──────────┘
                               │
                               ▼
┌──────────────────────────────────────────────────────────────┐
│                      INGRESS PIPELINE                        │
│                                                              │
│  ┌─────────┐   ┌──────┐   ┌──────────┐   ┌─────────────┐     │
│  │ Repair  │ → │ C14N │ → │ Envelope │ → │   Payload   │     │
│  │  Step   │   │ Step │   │ Validate │   │  Extraction │     │
│  └─────────┘   └──────┘   └──────────┘   └─────────────┘     │
│                                                              │
│  ┌──────────┐   ┌─────────┐   ┌─────────────┐                │
│  │  Thread  │ → │   XSD   │ → │ Deserialize │                │
│  │  Assign  │   │ Validate│   │   to class  │                │
│  └──────────┘   └─────────┘   └─────────────┘                │
│                                                              │
└──────────────────────────────────────────────────────────────┘
                               │
                               ▼
                    ┌─────────────────────┐
                    │   ROUTING TABLE     │
                    │                     │
                    │  root_tag → [       │
                    │    Listener1,       │
                    │    Listener2,       │
                    │  ]                  │
                    └──────────┬──────────┘
                               │
              ┌────────────────┼────────────────┐
              ▼                ▼                ▼
    ┌─────────────────┐ ┌─────────────┐ ┌─────────────────┐
    │   Handler A     │ │  Handler B  │ │   Handler C     │
    │  (async/main)   │ │ (cpu_bound) │ │   (async)       │
    └────────┬────────┘ └──────┬──────┘ └────────┬────────┘
             │                 │                  │
             └─────────────────┼──────────────────┘
                               │
                               ▼
                    ┌─────────────────────┐
                    │  RESPONSE HANDLER   │
                    │                     │
                    │  • Serialize        │
                    │  • Wrap envelope    │
                    │  • Inject <from>    │
                    │  • Re-inject        │
                    └─────────────────────┘

Pipeline Steps

repair_step

Fixes malformed XML using lxml's recover mode:

async def repair_step(state: MessageState) -> MessageState:
    parser = etree.XMLParser(recover=True)
    state.envelope_tree = etree.fromstring(state.raw_bytes, parser)
    return state

Handles:

  • Missing closing tags
  • Invalid characters
  • Encoding issues

c14n_step

Canonicalizes XML using Exclusive C14N:

async def c14n_step(state: MessageState) -> MessageState:
    c14n_bytes = etree.tostring(state.envelope_tree, method='c14n')
    state.envelope_tree = etree.fromstring(c14n_bytes)
    return state

Ensures deterministic representation for signing.

envelope_validation_step

Validates against envelope.xsd:

<message xmlns="https://xml-pipeline.org/ns/envelope/v1">
  <meta>
    <from>...</from>
    <to>...</to>        <!-- optional -->
    <thread>...</thread> <!-- optional -->
  </meta>
  <!-- payload element -->
</message>

payload_extraction_step

Extracts the payload element:

async def payload_extraction_step(state: MessageState) -> MessageState:
    # Find first non-meta child
    for child in state.envelope_tree:
        if child.tag != 'meta':
            state.payload_tree = child
            break
    return state

thread_assignment_step

Assigns or inherits thread UUID:

async def thread_assignment_step(state: MessageState) -> MessageState:
    meta = state.envelope_tree.find('meta')
    thread_elem = meta.find('thread')

    if thread_elem is not None:
        state.thread_id = thread_elem.text
    else:
        # New thread - assign UUID
        state.thread_id = str(uuid.uuid4())
    return state

xsd_validation_step

Validates payload against listener's schema:

async def xsd_validation_step(state: MessageState) -> MessageState:
    listener = find_listener_for_payload(state.payload_tree)
    schema = load_schema(listener.schema_path)

    if not schema.validate(state.payload_tree):
        state.error = "XSD validation failed"
    return state

deserialization_step

Converts XML to typed dataclass:

async def deserialization_step(state: MessageState) -> MessageState:
    listener = find_listener_for_payload(state.payload_tree)
    state.payload = xmlify_deserialize(
        state.payload_tree,
        listener.payload_class
    )
    return state

Routing

Root Tag Derivation

Root tag = {listener_name}.{dataclass_name} (lowercase):

Listener: greeter
Dataclass: Greeting
Root tag: greeter.greeting

Routing Table

routing_table = {
    "greeter.greeting": [greeter_listener],
    "calculator.add": [calculator_listener],
    "search.query": [google_listener, bing_listener],  # Broadcast
}

Broadcast

Multiple listeners can share a root tag (broadcast: true):

listeners:
  - name: search.google
    broadcast: true
    # ...

  - name: search.bing
    broadcast: true
    # ...

All matching listeners execute concurrently.

Handler Dispatch

Async Handlers (Default)

Run in the main event loop:

async def _dispatch_async(state, listener):
    metadata = build_metadata(state, listener)
    response = await listener.handler(state.payload, metadata)
    await self._process_response(response, listener, state)

CPU-Bound Handlers

Dispatched to ProcessPoolExecutor:

async def _dispatch_to_process_pool(state, listener):
    # Store data in shared backend
    payload_uuid, metadata_uuid = store_task_data(
        self._backend, state.payload, metadata
    )

    # Submit to pool
    task = WorkerTask(
        thread_uuid=state.thread_id,
        payload_uuid=payload_uuid,
        handler_path=listener.handler_path,
        metadata_uuid=metadata_uuid,
    )

    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        self._process_pool, execute_handler, task
    )

    # Fetch response from backend
    response = fetch_response(self._backend, result.response_uuid)
    await self._process_response(response, listener, state)

Response Processing

When a handler returns HandlerResponse:

async def _process_response(response, listener, state):
    if response is None:
        return  # Chain terminates

    # Validate target
    if listener.is_agent and listener.peers:
        if response.to not in listener.peers:
            await self._emit_error(state, "Routing error")
            return

    # Build new envelope
    payload_xml = xmlify_serialize(response.payload)
    new_state = MessageState(
        raw_bytes=wrap_in_envelope(
            payload_xml,
            from_id=listener.name,  # SYSTEM INJECTS
            thread_id=get_new_thread_id(response, state),
        ),
    )

    # Re-inject
    await self._process(new_state)

Error Handling

Pipeline Errors

If any step sets state.error, processing stops and <huh> is emitted:

<huh xmlns="https://xml-pipeline.org/ns/core/v1">
  <error>XSD validation failed</error>
  <original-attempt>base64-encoded-bytes</original-attempt>
</huh>

Routing Errors

If an agent tries to route to an undeclared peer:

<SystemError>
  <code>routing</code>
  <message>Message could not be delivered.</message>
  <retry-allowed>true</retry-allowed>
</SystemError>

Lifecycle

# Start
pump = StreamPump(config)
await pump.start()

# Inject messages
await pump.inject(raw_bytes, from_id="console")

# Shutdown (graceful)
await pump.shutdown()

Configuration

process_pool:
  workers: 4
  max_tasks_per_child: 100

backend:
  type: redis
  redis_url: "redis://localhost:6379"

See Also