xml-pipeline/xml_pipeline/primitives/sequence.py
dullfig a623c534d5 Add Sequence and Buffer orchestration primitives
Implement two virtual node patterns for message flow orchestration:

- Sequence: Chains listeners in order (A→B→C), feeding each step's
  output as input to the next. Uses ephemeral listeners to intercept
  step results without modifying core pump behavior.

- Buffer: Fan-out to parallel worker threads with optional result
  collection. Supports fire-and-forget mode (collect=False) for
  non-blocking dispatch.

New files:
- sequence_registry.py / buffer_registry.py: State tracking
- sequence.py / buffer.py: Payloads and handlers
- test_sequence.py / test_buffer.py: 52 new tests

Pump additions:
- register_generic_listener(): Accept any payload type
- unregister_listener(): Cleanup ephemeral listeners
- Global singleton accessors for pump instance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 14:56:15 -08:00

299 lines
9 KiB
Python

"""
sequence.py — Sequence orchestration primitives.
Sequences chain multiple listeners in order, feeding the output of one step
as input to the next. Steps remain transparent - they don't know they're
part of a sequence.
Usage by an agent:
# Start a sequence: add two numbers, then multiply
return HandlerResponse(
payload=SequenceStart(
steps="calculator.add,calculator.multiply",
payload='<AddPayload><a>5</a><b>3</b></AddPayload>',
return_to="my-agent",
),
to="system.sequence",
)
Flow:
1. system.sequence receives SequenceStart
2. Creates ephemeral listener sequence_{id} to receive step results
3. Sends initial payload to first step FROM sequence_{id}
4. Step processes and responds → routes to sequence_{id}
5. Ephemeral handler advances, sends to next step
6. When all steps complete, sends SequenceComplete to return_to
7. Cleans up ephemeral listener
Key insight: Steps use normal .respond() - the ephemeral listener IS the
caller in the thread chain, so responses naturally route back to it.
"""
from dataclasses import dataclass
from typing import Optional
import uuid as uuid_module
import logging
from lxml import etree
from third_party.xmlable import xmlify
from xml_pipeline.message_bus.message_state import (
HandlerMetadata,
HandlerResponse,
MessageState,
)
from xml_pipeline.message_bus.sequence_registry import get_sequence_registry
logger = logging.getLogger(__name__)
# ============================================================================
# Payloads
# ============================================================================
@xmlify
@dataclass
class SequenceStart:
"""
Start a new sequence execution.
Sent to system.sequence to begin chaining steps.
"""
steps: str = "" # Comma-separated listener names
payload: str = "" # Initial XML payload for first step
return_to: str = "" # Where to send final result
sequence_id: str = "" # Auto-generated if empty
@xmlify
@dataclass
class SequenceComplete:
"""
Sequence completed successfully.
Sent to the return_to listener when all steps finish.
"""
sequence_id: str = ""
final_result: str = "" # XML result from last step
step_count: int = 0 # How many steps were executed
@xmlify
@dataclass
class SequenceError:
"""
Sequence failed at a step.
Sent to return_to when a step fails.
"""
sequence_id: str = ""
failed_step: str = "" # Which step failed
step_index: int = 0 # 0-based index of failed step
error: str = "" # Error message
# ============================================================================
# Handlers
# ============================================================================
async def handle_sequence_start(
payload: SequenceStart,
metadata: HandlerMetadata,
) -> Optional[HandlerResponse]:
"""
Handle SequenceStart — begin a sequence execution.
Creates an ephemeral listener for this sequence, stores state,
and kicks off the first step.
"""
from xml_pipeline.message_bus.stream_pump import get_stream_pump
# Parse and validate
steps = [s.strip() for s in payload.steps.split(",") if s.strip()]
if not steps:
logger.error("SequenceStart with no steps")
return HandlerResponse(
payload=SequenceError(
sequence_id=payload.sequence_id or "unknown",
failed_step="",
step_index=0,
error="No steps specified",
),
to=payload.return_to or metadata.from_id,
)
# Generate sequence ID if not provided
seq_id = payload.sequence_id or str(uuid_module.uuid4())[:8]
# Validate all steps exist
pump = get_stream_pump()
for step in steps:
if step not in pump.listeners:
logger.error(f"SequenceStart: unknown step '{step}'")
return HandlerResponse(
payload=SequenceError(
sequence_id=seq_id,
failed_step=step,
step_index=steps.index(step),
error=f"Unknown listener: {step}",
),
to=payload.return_to or metadata.from_id,
)
# Create sequence state
registry = get_sequence_registry()
state = registry.create(
sequence_id=seq_id,
steps=steps,
return_to=payload.return_to or metadata.from_id,
thread_id=metadata.thread_id,
from_id=metadata.from_id,
initial_payload=payload.payload,
)
# Create ephemeral handler for this sequence
ephemeral_name = f"sequence_{seq_id}"
async def sequence_handler(
payload_tree: etree._Element,
meta: HandlerMetadata,
) -> Optional[HandlerResponse]:
"""Ephemeral handler that processes step results."""
return await _handle_sequence_step_result(seq_id, payload_tree, meta)
# Register ephemeral listener (generic mode - accepts any payload)
pump.register_generic_listener(
name=ephemeral_name,
handler=sequence_handler,
description=f"Ephemeral sequence handler for {seq_id}",
)
logger.info(
f"Sequence {seq_id} started: {len(steps)} steps, "
f"return_to={state.return_to}"
)
# Kick off first step
first_step = steps[0]
return _create_step_message(
seq_id=seq_id,
target=first_step,
payload_xml=payload.payload,
from_name=ephemeral_name,
)
async def _handle_sequence_step_result(
seq_id: str,
payload_tree: etree._Element,
metadata: HandlerMetadata,
) -> Optional[HandlerResponse]:
"""
Handle a step result in the sequence.
Called by the ephemeral listener when a step responds.
"""
from xml_pipeline.message_bus.stream_pump import get_stream_pump
registry = get_sequence_registry()
state = registry.get(seq_id)
if state is None:
logger.error(f"Sequence {seq_id} not found in registry")
return None
# Serialize the result for storage
result_xml = etree.tostring(payload_tree, encoding="unicode")
# Check for error responses
if payload_tree.tag.lower() in ("huh", "systemerror"):
# Step failed
error_text = payload_tree.text or etree.tostring(payload_tree, encoding="unicode")
registry.mark_failed(seq_id, state.current_step or "unknown", error_text)
# Clean up and send error
pump = get_stream_pump()
pump.unregister_listener(f"sequence_{seq_id}")
registry.remove(seq_id)
logger.warning(f"Sequence {seq_id} failed at step {state.current_index}")
return HandlerResponse(
payload=SequenceError(
sequence_id=seq_id,
failed_step=state.current_step or "unknown",
step_index=state.current_index,
error=error_text[:200], # Truncate long errors
),
to=state.return_to,
)
# Advance to next step
state = registry.advance(seq_id, result_xml)
if state.is_complete:
# All steps done - send completion
pump = get_stream_pump()
pump.unregister_listener(f"sequence_{seq_id}")
registry.remove(seq_id)
logger.info(f"Sequence {seq_id} completed: {len(state.steps)} steps")
return HandlerResponse(
payload=SequenceComplete(
sequence_id=seq_id,
final_result=result_xml,
step_count=len(state.steps),
),
to=state.return_to,
)
# More steps to go - send to next step
next_step = state.current_step
logger.debug(
f"Sequence {seq_id} advancing to step {state.current_index}: {next_step}"
)
return _create_step_message(
seq_id=seq_id,
target=next_step,
payload_xml=result_xml,
from_name=f"sequence_{seq_id}",
)
def _create_step_message(
seq_id: str,
target: str,
payload_xml: str,
from_name: str,
) -> HandlerResponse:
"""
Create a HandlerResponse to send payload to a step.
We need to inject the message with the ephemeral listener as the sender,
so that .respond() routes back to us.
"""
from xml_pipeline.primitives.sequence import _RawPayloadCarrier
# Return a special carrier that tells the pump to:
# 1. Use the raw XML bytes directly
# 2. Set from_id to from_name (the ephemeral listener)
return HandlerResponse(
payload=_RawPayloadCarrier(xml=payload_xml, from_override=from_name),
to=target,
)
class _RawPayloadCarrier:
"""
Internal carrier for raw XML that bypasses normal serialization.
When the pump sees this, it uses the raw XML directly instead of
serializing a dataclass.
"""
def __init__(self, xml: str, from_override: Optional[str] = None):
self.xml = xml
self.from_override = from_override
def to_xml(self) -> str:
"""Return raw XML for envelope wrapping."""
return self.xml