xml-pipeline/docs/wiki/architecture/Shared-Backend.md
dullfig 515c738abb Add wiki documentation for xml-pipeline.org
Comprehensive documentation set for XWiki:
- Home, Installation, Quick Start guides
- Writing Handlers and LLM Router guides
- Architecture docs (Overview, Message Pump, Thread Registry, Shared Backend)
- Reference docs (Configuration, Handler Contract, CLI)
- Hello World tutorial
- Why XML rationale
- Pandoc conversion scripts (bash + PowerShell)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 20:40:47 -08:00

6.9 KiB

Shared Backend

The Shared Backend enables cross-process state sharing for multiprocess deployments. It provides storage for the Context Buffer and Thread Registry.

Overview

By default, xml-pipeline uses in-memory storage (single process). For CPU-bound handlers running in separate processes, you need shared state:

┌────────────────────┐     ┌────────────────────┐
│   Main Process     │     │  Worker Process    │
│   (StreamPump)     │     │  (cpu_bound)       │
└─────────┬──────────┘     └──────────┬─────────┘
          │                           │
          └───────────┬───────────────┘
                      │
                      ▼
          ┌─────────────────────┐
          │   Shared Backend    │
          │  (Redis/Manager)    │
          └─────────────────────┘

Backend Types

InMemoryBackend (Default)

Single-process, thread-safe storage using Python dictionaries.

from xml_pipeline.memory import get_shared_backend, BackendConfig

config = BackendConfig(backend_type="memory")
backend = get_shared_backend(config)

Use when:

  • Single process deployment
  • Development/testing
  • No CPU-bound handlers

ManagerBackend

Uses multiprocessing.Manager for local multi-process sharing.

config = BackendConfig(backend_type="manager")
backend = get_shared_backend(config)

Use when:

  • Local deployment with CPU-bound handlers
  • No Redis available
  • Single machine, multiple processes

RedisBackend

Distributed storage with TTL-based auto-cleanup.

config = BackendConfig(
    backend_type="redis",
    redis_url="redis://localhost:6379",
    redis_prefix="xp:",
    redis_ttl=86400,  # 24 hours
)
backend = get_shared_backend(config)

Use when:

  • Distributed deployment
  • Multiple machines
  • Need persistence
  • Production environments

Configuration

Via organism.yaml

backend:
  type: redis                          # memory | manager | redis
  redis_url: "redis://localhost:6379"  # Redis connection URL
  redis_prefix: "xp:"                  # Key prefix for multi-tenancy
  redis_ttl: 86400                     # Key TTL in seconds

Programmatic

from xml_pipeline.memory import get_shared_backend, BackendConfig

config = BackendConfig(
    backend_type="redis",
    redis_url="redis://localhost:6379",
    redis_prefix="myapp:",
    redis_ttl=3600,
)
backend = get_shared_backend(config)

Storage Schema

Context Buffer

Stores message history per thread.

In-Memory/Manager:

_buffers = {
    "thread-uuid-1": [slot_bytes_0, slot_bytes_1, ...],
    "thread-uuid-2": [...],
}

Redis:

{prefix}buffer:{thread_id} → LIST of pickled BufferSlots

Thread Registry

Maps UUIDs to call chains.

In-Memory/Manager:

_chain_to_uuid = {"console.greeter": "uuid-123"}
_uuid_to_chain = {"uuid-123": "console.greeter"}

Redis:

{prefix}chain:{chain} → {uuid}
{prefix}uuid:{uuid} → {chain}

API

Buffer Operations

# Append a slot
index = backend.buffer_append(thread_id, slot_bytes)

# Get all slots for thread
slots = backend.buffer_get_thread(thread_id)

# Get specific slot
slot = backend.buffer_get_slot(thread_id, index)

# Check thread exists
exists = backend.buffer_thread_exists(thread_id)

# Delete thread
deleted = backend.buffer_delete_thread(thread_id)

# List all threads
threads = backend.buffer_list_threads()

# Clear all (testing)
backend.buffer_clear()

Registry Operations

# Set chain ↔ UUID mapping
backend.registry_set(chain, uuid)

# Get UUID from chain
uuid = backend.registry_get_uuid(chain)

# Get chain from UUID
chain = backend.registry_get_chain(uuid)

# Delete mapping
deleted = backend.registry_delete(uuid)

# List all mappings
all_mappings = backend.registry_list_all()

# Clear all (testing)
backend.registry_clear()

Serialization

Slots are serialized using pickle:

from xml_pipeline.memory import serialize_slot, deserialize_slot

# Serialize for storage
slot_bytes = serialize_slot(buffer_slot)

# Deserialize after retrieval
slot = deserialize_slot(slot_bytes)

Integration

With ContextBuffer

from xml_pipeline.memory import get_context_buffer

# Uses shared backend automatically if configured
buffer = get_context_buffer(backend=backend)

# Check if using shared storage
print(buffer.is_shared)  # True

With ThreadRegistry

from xml_pipeline.message_bus.thread_registry import get_registry

registry = get_registry(backend=backend)

# Check if using shared storage
print(registry.is_shared)  # True

With StreamPump

The pump automatically uses the configured backend:

backend:
  type: redis
  redis_url: "redis://localhost:6379"

process_pool:
  workers: 4

listeners:
  - name: analyzer
    cpu_bound: true  # Uses shared backend for data exchange

Worker Data Flow

For CPU-bound handlers, data flows through the backend:

1. Main Process
   ├── Serialize payload + metadata
   ├── Store in backend (payload_uuid, metadata_uuid)
   └── Submit WorkerTask to ProcessPool

2. Worker Process
   ├── Fetch payload + metadata from backend
   ├── Execute handler
   ├── Store response in backend (response_uuid)
   └── Return WorkerResult

3. Main Process
   ├── Fetch response from backend
   ├── Clean up temporary data
   └── Process response normally

TTL and Cleanup

Redis TTL

Redis keys automatically expire:

backend:
  redis_ttl: 86400  # Keys expire after 24 hours

Manual Cleanup

# Delete specific thread
backend.buffer_delete_thread(thread_id)
backend.registry_delete(uuid)

# Clear all (testing only)
backend.buffer_clear()
backend.registry_clear()

Multi-Tenancy

Use prefixes to isolate different organisms:

# Organism A
backend:
  type: redis
  redis_prefix: "orgA:"

# Organism B
backend:
  type: redis
  redis_prefix: "orgB:"

Monitoring

Redis Info

info = backend.info()
# {'buffer_threads': 5, 'registry_entries': 12}

Health Check

is_healthy = backend.ping()  # True if connected

Testing

import pytest
from xml_pipeline.memory import InMemoryBackend

@pytest.fixture
def backend():
    backend = InMemoryBackend()
    yield backend
    backend.close()

def test_buffer_operations(backend):
    backend.buffer_append("thread-1", b"data")
    assert backend.buffer_thread_exists("thread-1")

See Also