From 515c738abb7c578299a62f76a22c51902030e972 Mon Sep 17 00:00:00 2001 From: dullfig Date: Tue, 20 Jan 2026 20:40:47 -0800 Subject: [PATCH] Add wiki documentation for xml-pipeline.org Comprehensive documentation set for XWiki: - Home, Installation, Quick Start guides - Writing Handlers and LLM Router guides - Architecture docs (Overview, Message Pump, Thread Registry, Shared Backend) - Reference docs (Configuration, Handler Contract, CLI) - Hello World tutorial - Why XML rationale - Pandoc conversion scripts (bash + PowerShell) Co-Authored-By: Claude Opus 4.5 --- docs/wiki/Home.md | 67 ++++ docs/wiki/Installation.md | 97 ++++++ docs/wiki/LLM-Router.md | 303 +++++++++++++++++ docs/wiki/Quick-Start.md | 159 +++++++++ docs/wiki/README.md | 97 ++++++ docs/wiki/Why-XML.md | 254 +++++++++++++++ docs/wiki/Writing-Handlers.md | 308 ++++++++++++++++++ docs/wiki/architecture/Message-Pump.md | 346 ++++++++++++++++++++ docs/wiki/architecture/Overview.md | 256 +++++++++++++++ docs/wiki/architecture/Shared-Backend.md | 339 +++++++++++++++++++ docs/wiki/architecture/Thread-Registry.md | 261 +++++++++++++++ docs/wiki/convert-to-xwiki.ps1 | 76 +++++ docs/wiki/convert-to-xwiki.sh | 75 +++++ docs/wiki/reference/CLI.md | 129 ++++++++ docs/wiki/reference/Configuration.md | 196 +++++++++++ docs/wiki/reference/Handler-Contract.md | 293 +++++++++++++++++ docs/wiki/tutorials/Hello-World.md | 376 ++++++++++++++++++++++ 17 files changed, 3632 insertions(+) create mode 100644 docs/wiki/Home.md create mode 100644 docs/wiki/Installation.md create mode 100644 docs/wiki/LLM-Router.md create mode 100644 docs/wiki/Quick-Start.md create mode 100644 docs/wiki/README.md create mode 100644 docs/wiki/Why-XML.md create mode 100644 docs/wiki/Writing-Handlers.md create mode 100644 docs/wiki/architecture/Message-Pump.md create mode 100644 docs/wiki/architecture/Overview.md create mode 100644 docs/wiki/architecture/Shared-Backend.md create mode 100644 docs/wiki/architecture/Thread-Registry.md create mode 100644 docs/wiki/convert-to-xwiki.ps1 create mode 100644 docs/wiki/convert-to-xwiki.sh create mode 100644 docs/wiki/reference/CLI.md create mode 100644 docs/wiki/reference/Configuration.md create mode 100644 docs/wiki/reference/Handler-Contract.md create mode 100644 docs/wiki/tutorials/Hello-World.md diff --git a/docs/wiki/Home.md b/docs/wiki/Home.md new file mode 100644 index 0000000..a9efdd6 --- /dev/null +++ b/docs/wiki/Home.md @@ -0,0 +1,67 @@ +# xml-pipeline + +**A tamper-proof nervous system for multi-agent AI systems.** + +xml-pipeline (also called AgentServer) provides a schema-driven, Turing-complete message bus where AI agents communicate through validated XML payloads. It features automatic XSD generation, handler isolation, and built-in security guarantees against agent misbehavior. + +## Why XML? + +While JSON dominates web APIs, XML provides critical features for secure multi-agent systems: + +- **Schema validation** — XSD enforces exact contracts on the wire +- **Namespaces** — Safely mix vocabularies without collision +- **Canonicalization** — C14N enables deterministic signing +- **Repair tolerance** — Malformed XML can be recovered; malformed JSON cannot + +See [[Why XML]] for the full rationale. + +## Key Features + +| Feature | Description | +|---------|-------------| +| **Schema-Driven** | Define payloads as Python dataclasses; XSD generated automatically | +| **Handler Isolation** | Handlers are sandboxed—cannot forge identity or escape threads | +| **Thread Tracking** | Opaque UUIDs hide topology; call chains tracked privately | +| **LLM Router** | Multi-backend routing with failover, rate limiting, retries | +| **Multiprocess Ready** | CPU-bound handlers run in ProcessPoolExecutor | +| **Shared State** | Redis/Manager backends for distributed deployments | + +## Quick Links + +### Getting Started +- [[Installation]] — Install the package +- [[Quick Start]] — Run your first organism in 5 minutes +- [[Configuration]] — Configure organisms via YAML + +### Guides +- [[Writing Handlers]] — Create message handlers +- [[Using the LLM Router]] — Call language models from handlers +- [[Multiprocess Handlers]] — Run CPU-bound work in separate processes + +### Architecture +- [[Architecture Overview]] — How xml-pipeline works +- [[Message Pump]] — The streaming message processor +- [[Thread Registry]] — Call chain tracking with opaque UUIDs +- [[Shared Backend]] — Cross-process state with Redis + +### Tutorials +- [[Hello World Tutorial]] — Build a greeting agent step by step +- [[Calculator Tool Tutorial]] — Create a tool that agents can call + +### Reference +- [[Handler Contract]] — Handler function signature and return types +- [[Configuration Reference]] — Complete organism.yaml specification +- [[CLI Reference]] — Command-line interface + +## Version + +Current version: **0.4.0** + +## License + +MIT License + +## Links + +- [GitHub Repository](https://github.com/xml-pipeline/xml-pipeline) +- [PyPI Package](https://pypi.org/project/xml-pipeline/) diff --git a/docs/wiki/Installation.md b/docs/wiki/Installation.md new file mode 100644 index 0000000..e3b232a --- /dev/null +++ b/docs/wiki/Installation.md @@ -0,0 +1,97 @@ +# Installation + +## Requirements + +- Python 3.11 or higher +- pip, uv, or pipx + +## Install from PyPI + +```bash +pip install xml-pipeline +``` + +## Install with Optional Features + +xml-pipeline has optional dependencies for different use cases: + +```bash +# Core only (minimal) +pip install xml-pipeline + +# With Anthropic Claude support +pip install xml-pipeline[anthropic] + +# With OpenAI support +pip install xml-pipeline[openai] + +# With Redis for distributed deployments +pip install xml-pipeline[redis] + +# With web search capability +pip install xml-pipeline[search] + +# With interactive console (for examples) +pip install xml-pipeline[console] + +# Everything +pip install xml-pipeline[all] + +# Development (includes testing tools) +pip install xml-pipeline[dev] +``` + +## Install from Source + +```bash +# Clone the repository +git clone https://github.com/xml-pipeline/xml-pipeline.git +cd xml-pipeline + +# Create virtual environment +python -m venv .venv + +# Activate (Windows) +.venv\Scripts\activate + +# Activate (Linux/macOS) +source .venv/bin/activate + +# Install in development mode +pip install -e ".[all]" +``` + +## Verify Installation + +```bash +# Check version +xml-pipeline version + +# Or use the short alias +xp version +``` + +Expected output: +``` +xml-pipeline 0.4.0 +Python 3.11.x +Features: anthropic, console, redis, search +``` + +## Environment Variables + +Create a `.env` file in your project root for API keys: + +```env +# LLM Provider Keys (add the ones you need) +XAI_API_KEY=xai-... +ANTHROPIC_API_KEY=sk-ant-... +OPENAI_API_KEY=sk-... +``` + +The library automatically loads `.env` files via python-dotenv. + +## Next Steps + +- [[Quick Start]] — Run your first organism +- [[Configuration]] — Learn about organism.yaml diff --git a/docs/wiki/LLM-Router.md b/docs/wiki/LLM-Router.md new file mode 100644 index 0000000..939bf32 --- /dev/null +++ b/docs/wiki/LLM-Router.md @@ -0,0 +1,303 @@ +# LLM Router + +The LLM Router provides a unified interface for language model calls. Agents request a model by name; the router handles backend selection, failover, rate limiting, and retries. + +## Overview + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Agent Handler │ +│ response = await complete("grok-4.1", messages) │ +└─────────────────────────────────┬───────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ LLM Router │ +│ • Find backends serving model │ +│ • Select backend (strategy) │ +│ • Retry on failure │ +│ • Track usage per agent │ +└────────────┬────────────────┬────────────────┬──────────────────┘ + │ │ │ + ▼ ▼ ▼ + ┌──────────┐ ┌──────────┐ ┌──────────┐ + │ XAI │ │Anthropic │ │ Ollama │ + │ Backend │ │ Backend │ │ Backend │ + └──────────┘ └──────────┘ └──────────┘ +``` + +## Quick Start + +### Basic Usage + +```python +from xml_pipeline.platform.llm_api import complete + +response = await complete( + model="grok-4.1", + messages=[ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello!"}, + ], +) + +print(response.content) +``` + +### In a Handler + +```python +async def my_agent(payload: Query, metadata: HandlerMetadata) -> HandlerResponse: + from xml_pipeline.platform.llm_api import complete + + response = await complete( + model="grok-4.1", + messages=[ + {"role": "system", "content": metadata.usage_instructions}, + {"role": "user", "content": payload.question}, + ], + temperature=0.7, + max_tokens=2048, + ) + + return HandlerResponse( + payload=Answer(text=response.content), + to="output", + ) +``` + +## Configuration + +### organism.yaml + +```yaml +llm: + strategy: failover # Backend selection strategy + retries: 3 # Max retry attempts + retry_base_delay: 1.0 # Base delay for backoff + retry_max_delay: 60.0 # Max delay between retries + + backends: + - provider: xai + api_key_env: XAI_API_KEY + priority: 1 # Lower = preferred + rate_limit_tpm: 100000 # Tokens per minute + max_concurrent: 20 # Concurrent request limit + + - provider: anthropic + api_key_env: ANTHROPIC_API_KEY + priority: 2 + + - provider: openai + api_key_env: OPENAI_API_KEY + priority: 3 + + - provider: ollama + base_url: http://localhost:11434 + supported_models: [llama3, mistral] +``` + +### Environment Variables + +```env +# .env file +XAI_API_KEY=xai-abc123... +ANTHROPIC_API_KEY=sk-ant-... +OPENAI_API_KEY=sk-... +``` + +## Supported Providers + +| Provider | Models | Auth | +|----------|--------|------| +| `xai` | grok-* | Bearer token | +| `anthropic` | claude-* | x-api-key header | +| `openai` | gpt-*, o1-*, o3-* | Bearer token | +| `ollama` | Any local model | None (local) | + +### Model Routing + +The router automatically selects backends based on model name: + +- `grok-4.1` → XAI backend +- `claude-sonnet-4` → Anthropic backend +- `gpt-4o` → OpenAI backend +- `llama3` → Ollama (if in `supported_models`) + +## Strategies + +### Failover (Default) + +Tries backends in priority order. Falls back on error. + +```yaml +llm: + strategy: failover + backends: + - provider: xai + priority: 1 # Try first + - provider: anthropic + priority: 2 # Fallback +``` + +### Round-Robin + +Distributes requests evenly across backends. + +```yaml +llm: + strategy: round-robin +``` + +### Least-Loaded + +Routes to the backend with lowest current load. + +```yaml +llm: + strategy: least-loaded +``` + +## Response Format + +```python +@dataclass +class LLMResponse: + content: str # Generated text + model: str # Model used + usage: Dict[str, int] # Token counts + finish_reason: str # stop, length, tool_calls + raw: Any # Provider-specific response +``` + +### Usage Dict + +```python +response.usage = { + "prompt_tokens": 150, + "completion_tokens": 50, + "total_tokens": 200, +} +``` + +## Parameters + +```python +response = await complete( + model="grok-4.1", # Required: model name + messages=[...], # Required: conversation + temperature=0.7, # Optional: randomness (0-2) + max_tokens=2048, # Optional: response limit + top_p=0.9, # Optional: nucleus sampling + stop=["END"], # Optional: stop sequences +) +``` + +## Error Handling + +### Rate Limits + +On 429 responses: +1. Reads `Retry-After` header +2. Falls back to exponential backoff with jitter +3. Tries next backend (if failover) + +### Provider Errors + +On 5xx responses: +1. Logs error +2. Retries with backoff +3. Tries next backend (if failover) + +### All Backends Failed + +```python +from xml_pipeline.llm.router import BackendError + +try: + response = await complete(model, messages) +except BackendError as e: + # All backends failed + logger.error(f"LLM call failed: {e}") +``` + +## Rate Limiting + +Each backend has independent limits: + +- **Token bucket**: Limits tokens per minute (`rate_limit_tpm`) +- **Semaphore**: Limits concurrent requests (`max_concurrent`) + +Requests wait if limits are reached. + +## Token Tracking + +Track usage per agent: + +```python +from xml_pipeline.llm.router import get_router + +router = get_router() + +# Get usage for agent +usage = router.get_agent_usage("greeter") +print(f"Total tokens: {usage.total_tokens}") +print(f"Requests: {usage.request_count}") + +# Reset tracking +router.reset_agent_usage("greeter") +``` + +## Best Practices + +### 1. Use System Prompts + +```python +response = await complete( + model="grok-4.1", + messages=[ + {"role": "system", "content": metadata.usage_instructions}, + {"role": "user", "content": payload.query}, + ], +) +``` + +### 2. Handle Errors Gracefully + +```python +try: + response = await complete(model, messages) +except BackendError: + return HandlerResponse( + payload=ErrorResponse(message="LLM unavailable"), + to=metadata.from_id, + ) +``` + +### 3. Set Appropriate Limits + +```yaml +llm: + backends: + - provider: xai + rate_limit_tpm: 50000 # Conservative limit + max_concurrent: 10 # Prevent overload +``` + +### 4. Use Failover for Reliability + +```yaml +llm: + strategy: failover + backends: + - provider: xai + priority: 1 + - provider: anthropic + priority: 2 # Backup +``` + +## See Also + +- [[Writing Handlers]] — Using LLM in handlers +- [[Configuration]] — Full LLM configuration +- [[Architecture Overview]] — System architecture diff --git a/docs/wiki/Quick-Start.md b/docs/wiki/Quick-Start.md new file mode 100644 index 0000000..d668d0a --- /dev/null +++ b/docs/wiki/Quick-Start.md @@ -0,0 +1,159 @@ +# Quick Start + +Get an organism running in 5 minutes. + +## 1. Install the Package + +```bash +pip install xml-pipeline[console] +``` + +## 2. Create a Project Directory + +```bash +mkdir my-organism +cd my-organism +``` + +## 3. Initialize Configuration + +```bash +xml-pipeline init my-organism +``` + +This creates: +``` +my-organism/ +├── config/ +│ └── organism.yaml +├── handlers/ +│ └── hello.py +└── .env.example +``` + +## 4. Examine the Generated Files + +### config/organism.yaml + +```yaml +organism: + name: my-organism + port: 8765 + +listeners: + - name: greeter + payload_class: handlers.hello.Greeting + handler: handlers.hello.handle_greeting + description: A friendly greeting handler + peers: [] +``` + +### handlers/hello.py + +```python +from dataclasses import dataclass +from third_party.xmlable import xmlify +from xml_pipeline.message_bus.message_state import HandlerMetadata, HandlerResponse + +@xmlify +@dataclass +class Greeting: + """A greeting request.""" + name: str + +@xmlify +@dataclass +class GreetingResponse: + """A greeting response.""" + message: str + +async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> HandlerResponse: + """Handle a greeting and respond.""" + return HandlerResponse( + payload=GreetingResponse(message=f"Hello, {payload.name}!"), + to=metadata.from_id, # Reply to sender + ) +``` + +## 5. Run the Organism + +```bash +xml-pipeline run config/organism.yaml +``` + +You should see: +``` +Organism: my-organism +Listeners: 1 +Root thread: abc123-... +Routing: ['greeter.greeting'] +``` + +## 6. Try the Interactive Console + +If you installed with `[console]`: + +```bash +python -m examples.console +``` + +Type `@greeter Alice` to send a greeting message. + +## What Just Happened? + +1. **Payload defined** — `Greeting` dataclass with `@xmlify` decorator +2. **XSD generated** — Schema auto-created at `schemas/greeter/v1.xsd` +3. **Handler registered** — `handle_greeting` mapped to `greeter.greeting` root tag +4. **Message pump started** — Waiting for messages + +## Understanding the Message Flow + +``` +Input: @greeter Alice + │ + ▼ +┌─────────────────────────────────────┐ +│ XML Envelope Created │ +│ │ +│ │ +│ console │ +│ greeter │ +│ uuid-123 │ +│ │ +│ │ +│ Alice │ +│ │ +│ │ +└─────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────┐ +│ Pipeline Processing │ +│ 1. Repair (fix malformed XML) │ +│ 2. C14N (canonicalize) │ +│ 3. Envelope validation │ +│ 4. Payload extraction │ +│ 5. XSD validation │ +│ 6. Deserialization → Greeting │ +│ 7. Route to greeter handler │ +└─────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────┐ +│ Handler Execution │ +│ handle_greeting( │ +│ payload=Greeting(name="Alice"), │ +│ metadata=HandlerMetadata(...) │ +│ ) │ +│ → HandlerResponse(...) │ +└─────────────────────────────────────┘ + │ + ▼ +Output: Hello, Alice! +``` + +## Next Steps + +- [[Writing Handlers]] — Create your own handlers +- [[Configuration]] — Customize organism.yaml +- [[Hello World Tutorial]] — Step-by-step tutorial diff --git a/docs/wiki/README.md b/docs/wiki/README.md new file mode 100644 index 0000000..1c61603 --- /dev/null +++ b/docs/wiki/README.md @@ -0,0 +1,97 @@ +# Wiki Documentation + +This directory contains documentation for the xml-pipeline.org XWiki. + +## Structure + +``` +wiki/ +├── Home.md # Wiki home page +├── Installation.md # Installation guide +├── Quick-Start.md # Quick start guide +├── Writing-Handlers.md # Handler guide +├── LLM-Router.md # LLM router guide +├── Why-XML.md # Rationale for XML +├── architecture/ +│ ├── Overview.md # Architecture overview +│ ├── Message-Pump.md # Message pump details +│ ├── Thread-Registry.md # Thread registry details +│ └── Shared-Backend.md # Shared backend details +├── reference/ +│ ├── Configuration.md # Full configuration reference +│ ├── Handler-Contract.md # Handler specification +│ └── CLI.md # CLI reference +├── tutorials/ +│ └── Hello-World.md # Step-by-step tutorial +├── convert-to-xwiki.sh # Bash conversion script +├── convert-to-xwiki.ps1 # PowerShell conversion script +└── README.md # This file +``` + +## Converting to XWiki Format + +### Prerequisites + +Install Pandoc: https://pandoc.org/installing.html + +### Convert All Files + +**Windows (PowerShell):** +```powershell +cd docs/wiki +.\convert-to-xwiki.ps1 +``` + +**Linux/macOS (Bash):** +```bash +cd docs/wiki +chmod +x convert-to-xwiki.sh +./convert-to-xwiki.sh +``` + +### Output + +Converted files are placed in `xwiki/` directory with `.xwiki` extension. + +## Uploading to XWiki + +### Option 1: XWiki REST API + +```bash +# Upload a single page +curl -u admin:password -X PUT \ + 'https://xml-pipeline.org/rest/wikis/xwiki/spaces/Docs/pages/Home' \ + -H 'Content-Type: text/plain' \ + -d @xwiki/Home.xwiki +``` + +### Option 2: XWiki Import + +1. Go to XWiki Administration +2. Content → Import +3. Upload the files + +### Option 3: Copy/Paste + +1. Create page in XWiki +2. Switch to Wiki editing mode +3. Paste converted content + +## Wiki Links + +The Markdown files use `[[Page Name]]` wiki-link syntax. Pandoc converts these to XWiki's `[[Page Name]]` format. + +If your XWiki uses different space structure, you may need to adjust links: + +``` +[[Installation]] → [[Docs.Installation]] +[[architecture/Overview]] → [[Docs.Architecture.Overview]] +``` + +## Updating Documentation + +1. Edit the Markdown files in this directory +2. Run the conversion script +3. Upload to XWiki + +Keep Markdown as the source of truth for version control. diff --git a/docs/wiki/Why-XML.md b/docs/wiki/Why-XML.md new file mode 100644 index 0000000..b5e1d5d --- /dev/null +++ b/docs/wiki/Why-XML.md @@ -0,0 +1,254 @@ +# Why XML? + +XML is the right format for a sovereign, attack-resistant message bus in a multi-agent system. JSON is not. + +## The Short Answer + +| Feature | XML | JSON | +|---------|-----|------| +| Schema validation | XSD (built-in, precise) | JSON Schema (optional, lossy) | +| Namespaces | Native support | None | +| Canonicalization | C14N standard | No standard | +| Repair tolerance | lxml recover mode | Parser fails | +| Comments | Supported | Forbidden | +| Mixed content | Native | Fragile | + +## JSON's Origins + +JSON (JavaScript Object Notation) was invented in the early 2000s as a subset of JavaScript literal syntax for simple data exchange in web browsers. It was never designed as a general-purpose format—just a quick way to serialize objects for Ajax calls. + +It became popular because: +- Simple for JavaScript developers +- Human-readable +- Web API boom (REST over SOAP) +- Low barrier to entry + +## Why JSON Fails for Multi-Agent Systems + +### No Schema Enforcement + +JSON Schema exists but is: +- Optional (rarely enforced on wire) +- Lossy (can't express all constraints) +- Inconsistently implemented + +Result: Messages accepted without validation, bugs discovered at runtime. + +### No Namespaces + +Can't safely mix vocabularies: + +```json +{ + "name": "Alice", // User name? Product name? + "type": "admin" // User type? Message type? +} +``` + +### No Canonicalization + +No standard way to normalize for signing: + +```json +{"a": 1, "b": 2} +{"b": 2, "a": 1} +``` + +Same data? Different bytes. Can't sign reliably. + +### No Repair Tolerance + +One syntax error → entire payload rejected: + +```json +{"name": "Alice",} // Trailing comma → FAIL +``` + +### Escaping Hell + +Strings with special characters are fragile: + +```json +{"message": "She said \"hello\""} // Manual escaping +``` + +Easy to break, security vulnerability vector. + +## Why JSON Fails for LLM Integration + +### Hallucination Fragility + +LLMs routinely produce invalid JSON: +- Trailing commas +- Missing quotes +- Wrong nesting +- Comments (forbidden!) + +Result: Massive prompt bloat ("You MUST output valid JSON, NO trailing commas EVER...") and post-processing parsers. + +### No Graceful Degradation + +One parse error → entire response lost. No partial recovery. + +### Injection Attacks + +User input in strings can break JSON structure: + +```json +{"user_input": "Alice", "role": "admin"} +``` + +If user provides `", "role": "admin"` in their name → injection. + +## Why XML Succeeds + +### Schema as Contract + +XSD enforces exact structure on the wire: + +```xml + + + + + + + +``` + +Every message validated before processing. No ambiguity. + +### Namespaces + +Safe vocabulary mixing: + +```xml + + + Alice + + +``` + +### Canonicalization (C14N) + +Deterministic representation for signing: + +```python +c14n_bytes = etree.tostring(tree, method='c14n') +signature = sign(c14n_bytes) +``` + +Same logical content → same bytes → verifiable signatures. + +### Repair Tolerance + +lxml recover mode fixes common issues: + +```python +parser = etree.XMLParser(recover=True) +tree = etree.fromstring(broken_xml, parser) +``` + +Partial documents, encoding issues, missing tags → recovered. + +### Self-Describing + +Elements carry meaning: + +```xml + + Alice + +``` + +vs JSON: + +```json +["Alice"] // What is this? +``` + +## LLM + XML = Reliable + +### Natural Streaming + +XML streams naturally (can process before complete). + +### Repair on Output + +LLM produces broken XML? lxml fixes it: + +```python +from lxml import etree + +parser = etree.XMLParser(recover=True) +tree = etree.fromstring(llm_output, parser) +# Works even with minor errors +``` + +### Schema-Guided Generation + +XSD tells LLM exactly what to produce: + +``` +Generate XML matching this schema: +string +``` + +Clear contract, fewer hallucinations. + +### Graceful Validation + +Validation errors become helpful feedback: + +```xml + + Element 'greeting' missing required element 'name' + +``` + +LLM can self-correct. + +## The Trade-Offs + +### XML is More Verbose + +```xml +Alice +``` + +vs + +```json +{"name": "Alice"} +``` + +**But:** Compression eliminates this on wire. And verbosity aids debugging. + +### XML Parsing is Slower + +Microseconds more than JSON parsing. + +**But:** Network latency dominates. And lxml is highly optimized. + +### XML is "Old" + +True. Also mature, battle-tested, standards-based. + +## Conclusion + +JSON won the web because it was "good enough" for stateless HTTP requests. + +XML wins for multi-agent systems because: +- Security requires schema enforcement +- Signing requires canonicalization +- LLMs require repair tolerance +- Complexity requires namespaces + +**JSON won the web. XML wins the swarm.** + +## Further Reading + +- [W3C XML Schema](https://www.w3.org/XML/Schema) +- [Exclusive XML Canonicalization](https://www.w3.org/TR/xml-exc-c14n/) +- [lxml Documentation](https://lxml.de/) diff --git a/docs/wiki/Writing-Handlers.md b/docs/wiki/Writing-Handlers.md new file mode 100644 index 0000000..443f733 --- /dev/null +++ b/docs/wiki/Writing-Handlers.md @@ -0,0 +1,308 @@ +# Writing Handlers + +Handlers are async Python functions that process messages. This guide covers everything you need to know to write effective handlers. + +## Basic Handler Structure + +Every handler follows this pattern: + +```python +from dataclasses import dataclass +from third_party.xmlable import xmlify +from xml_pipeline.message_bus.message_state import HandlerMetadata, HandlerResponse + +# 1. Define your payload (input) +@xmlify +@dataclass +class MyPayload: + """Description of what this payload represents.""" + field1: str + field2: int + +# 2. Define your response (output) +@xmlify +@dataclass +class MyResponse: + """Description of the response.""" + result: str + +# 3. Write the handler +async def my_handler(payload: MyPayload, metadata: HandlerMetadata) -> HandlerResponse: + """Process the payload and return a response.""" + result = f"Processed {payload.field1} with {payload.field2}" + + return HandlerResponse( + payload=MyResponse(result=result), + to="next-listener", # Where to send the response + ) +``` + +## The @xmlify Decorator + +The `@xmlify` decorator enables automatic XML serialization and XSD generation: + +```python +from dataclasses import dataclass +from third_party.xmlable import xmlify + +@xmlify +@dataclass +class Greeting: + name: str # Required field + formal: bool = False # Optional with default + count: int = 1 # Optional with default +``` + +This generates XML like: +```xml + + Alice + true + 3 + +``` + +### Supported Types + +| Python Type | XML Representation | +|-------------|-------------------| +| `str` | Text content | +| `int` | Integer text | +| `float` | Decimal text | +| `bool` | `true` / `false` | +| `list[T]` | Repeated elements | +| `Optional[T]` | Optional element | +| `@xmlify` class | Nested element | + +### Nested Payloads + +```python +@xmlify +@dataclass +class Address: + street: str + city: str + +@xmlify +@dataclass +class Person: + name: str + address: Address # Nested payload +``` + +## HandlerMetadata + +The `metadata` parameter provides context about the message: + +```python +@dataclass +class HandlerMetadata: + thread_id: str # Opaque thread UUID + from_id: str # Who sent this message + own_name: str | None # This listener's name (agents only) + is_self_call: bool # True if message is from self + usage_instructions: str # Peer schemas for LLM prompts + todo_nudge: str # System reminders about pending todos +``` + +### Usage Examples + +```python +async def my_handler(payload: MyPayload, metadata: HandlerMetadata) -> HandlerResponse: + # Log who sent the message + print(f"Received from: {metadata.from_id}") + + # Check if this is a self-call (agent iterating) + if metadata.is_self_call: + print("This is a self-call") + + # For agents: use peer schemas in LLM prompts + if metadata.usage_instructions: + system_prompt = metadata.usage_instructions + "\n\nYour custom instructions..." +``` + +## Response Types + +### Forward to Target + +Send the response to a specific listener: + +```python +return HandlerResponse( + payload=MyResponse(result="done"), + to="next-listener", +) +``` + +### Respond to Caller + +Return to whoever sent the message: + +```python +return HandlerResponse.respond( + payload=ResultPayload(value=42) +) +``` + +This uses the thread registry to route back up the call chain. + +### Terminate Chain + +End processing with no response: + +```python +return None +``` + +Use this for terminal handlers (logging, display, etc.). + +## Handler Patterns + +### Simple Tool + +A stateless transformation: + +```python +@xmlify +@dataclass +class AddPayload: + a: int + b: int + +@xmlify +@dataclass +class AddResult: + sum: int + +async def add_handler(payload: AddPayload, metadata: HandlerMetadata) -> HandlerResponse: + result = payload.a + payload.b + return HandlerResponse.respond(payload=AddResult(sum=result)) +``` + +### LLM Agent + +An agent that uses language models: + +```python +async def research_handler(payload: ResearchQuery, metadata: HandlerMetadata) -> HandlerResponse: + from xml_pipeline.platform.llm_api import complete + + # Build prompt with peer awareness + system_prompt = f""" +{metadata.usage_instructions} + +You are a research agent. Answer the query using available tools. +""" + + response = await complete( + model="grok-4.1", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": payload.query}, + ], + ) + + return HandlerResponse( + payload=ResearchResult(answer=response.content), + to="summarizer", + ) +``` + +### Terminal Handler + +A handler that displays output and ends the chain: + +```python +async def console_output(payload: TextOutput, metadata: HandlerMetadata) -> None: + print(f"[{payload.source}] {payload.text}") + return None # Chain ends here +``` + +### Self-Iterating Agent + +An agent that calls itself to continue reasoning: + +```python +async def thinking_agent(payload: ThinkPayload, metadata: HandlerMetadata) -> HandlerResponse: + # Check if we should continue thinking + if payload.iteration >= 5: + return HandlerResponse( + payload=FinalAnswer(answer=payload.current_answer), + to="output", + ) + + # Continue thinking by calling self + return HandlerResponse( + payload=ThinkPayload( + iteration=payload.iteration + 1, + current_answer=f"Refined: {payload.current_answer}", + ), + to=metadata.own_name, # Self-call + ) +``` + +## Error Handling + +Handlers should handle errors gracefully: + +```python +async def safe_handler(payload: MyPayload, metadata: HandlerMetadata) -> HandlerResponse: + try: + result = await risky_operation(payload) + return HandlerResponse.respond(payload=SuccessResult(data=result)) + except ValidationError as e: + return HandlerResponse.respond(payload=ErrorResult(error=str(e))) + except Exception as e: + # Log and return generic error + logger.exception("Handler failed") + return HandlerResponse.respond(payload=ErrorResult(error="Internal error")) +``` + +## Registration + +Register handlers in `organism.yaml`: + +```yaml +listeners: + - name: calculator.add + payload_class: handlers.calc.AddPayload + handler: handlers.calc.add_handler + description: "Adds two numbers and returns the sum" +``` + +The `description` is important—it's used in auto-generated tool prompts for LLM agents. + +## CPU-Bound Handlers + +For computationally expensive handlers, mark them as `cpu_bound`: + +```yaml +listeners: + - name: analyzer + payload_class: handlers.analyze.AnalyzePayload + handler: handlers.analyze.analyze_handler + description: "Heavy document analysis" + cpu_bound: true +``` + +These run in a separate process pool to avoid blocking the event loop. + +## Security Notes + +Handlers are **untrusted code**. The system enforces: + +1. **Identity injection** — `` is always set by the pump, never by handlers +2. **Thread isolation** — Handlers see only opaque UUIDs +3. **Peer constraints** — Agents can only send to declared peers + +Even compromised handlers cannot: +- Forge sender identity +- Access other threads +- Discover organism topology +- Route to undeclared peers + +## See Also + +- [[Handler Contract]] — Complete handler specification +- [[Configuration]] — Registering handlers +- [[LLM Router]] — Using language models diff --git a/docs/wiki/architecture/Message-Pump.md b/docs/wiki/architecture/Message-Pump.md new file mode 100644 index 0000000..3391c67 --- /dev/null +++ b/docs/wiki/architecture/Message-Pump.md @@ -0,0 +1,346 @@ +# Message Pump + +The Message Pump (StreamPump) is the heart of xml-pipeline. It orchestrates message flow from ingress through processing to handler dispatch and response handling. + +## Overview + +The pump uses [aiostream](https://aiostream.readthedocs.io/) for stream-based processing with concurrent fan-out capabilities. + +```python +from xml_pipeline.message_bus.stream_pump import StreamPump + +pump = StreamPump(config) +await pump.start() + +# Inject a message +await pump.inject(raw_bytes, from_id="console") + +await pump.shutdown() +``` + +## Architecture + +``` + ┌─────────────────────┐ + │ Message Source │ + │ (Console, WebSocket)│ + └──────────┬──────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────────────┐ +│ INGRESS PIPELINE │ +│ │ +│ ┌─────────┐ ┌──────┐ ┌──────────┐ ┌─────────────┐ │ +│ │ Repair │ → │ C14N │ → │ Envelope │ → │ Payload │ │ +│ │ Step │ │ Step │ │ Validate │ │ Extraction │ │ +│ └─────────┘ └──────┘ └──────────┘ └─────────────┘ │ +│ │ +│ ┌──────────┐ ┌─────────┐ ┌─────────────┐ │ +│ │ Thread │ → │ XSD │ → │ Deserialize │ │ +│ │ Assign │ │ Validate│ │ to class │ │ +│ └──────────┘ └─────────┘ └─────────────┘ │ +│ │ +└──────────────────────────────────────────────────────────────┘ + │ + ▼ + ┌─────────────────────┐ + │ ROUTING TABLE │ + │ │ + │ root_tag → [ │ + │ Listener1, │ + │ Listener2, │ + │ ] │ + └──────────┬──────────┘ + │ + ┌────────────────┼────────────────┐ + ▼ ▼ ▼ + ┌─────────────────┐ ┌─────────────┐ ┌─────────────────┐ + │ Handler A │ │ Handler B │ │ Handler C │ + │ (async/main) │ │ (cpu_bound) │ │ (async) │ + └────────┬────────┘ └──────┬──────┘ └────────┬────────┘ + │ │ │ + └─────────────────┼──────────────────┘ + │ + ▼ + ┌─────────────────────┐ + │ RESPONSE HANDLER │ + │ │ + │ • Serialize │ + │ • Wrap envelope │ + │ • Inject │ + │ • Re-inject │ + └─────────────────────┘ +``` + +## Pipeline Steps + +### repair_step + +Fixes malformed XML using lxml's recover mode: + +```python +async def repair_step(state: MessageState) -> MessageState: + parser = etree.XMLParser(recover=True) + state.envelope_tree = etree.fromstring(state.raw_bytes, parser) + return state +``` + +Handles: +- Missing closing tags +- Invalid characters +- Encoding issues + +### c14n_step + +Canonicalizes XML using Exclusive C14N: + +```python +async def c14n_step(state: MessageState) -> MessageState: + c14n_bytes = etree.tostring(state.envelope_tree, method='c14n') + state.envelope_tree = etree.fromstring(c14n_bytes) + return state +``` + +Ensures deterministic representation for signing. + +### envelope_validation_step + +Validates against `envelope.xsd`: + +```xml + + + ... + ... + ... + + + +``` + +### payload_extraction_step + +Extracts the payload element: + +```python +async def payload_extraction_step(state: MessageState) -> MessageState: + # Find first non-meta child + for child in state.envelope_tree: + if child.tag != 'meta': + state.payload_tree = child + break + return state +``` + +### thread_assignment_step + +Assigns or inherits thread UUID: + +```python +async def thread_assignment_step(state: MessageState) -> MessageState: + meta = state.envelope_tree.find('meta') + thread_elem = meta.find('thread') + + if thread_elem is not None: + state.thread_id = thread_elem.text + else: + # New thread - assign UUID + state.thread_id = str(uuid.uuid4()) + return state +``` + +### xsd_validation_step + +Validates payload against listener's schema: + +```python +async def xsd_validation_step(state: MessageState) -> MessageState: + listener = find_listener_for_payload(state.payload_tree) + schema = load_schema(listener.schema_path) + + if not schema.validate(state.payload_tree): + state.error = "XSD validation failed" + return state +``` + +### deserialization_step + +Converts XML to typed dataclass: + +```python +async def deserialization_step(state: MessageState) -> MessageState: + listener = find_listener_for_payload(state.payload_tree) + state.payload = xmlify_deserialize( + state.payload_tree, + listener.payload_class + ) + return state +``` + +## Routing + +### Root Tag Derivation + +Root tag = `{listener_name}.{dataclass_name}` (lowercase): + +``` +Listener: greeter +Dataclass: Greeting +Root tag: greeter.greeting +``` + +### Routing Table + +```python +routing_table = { + "greeter.greeting": [greeter_listener], + "calculator.add": [calculator_listener], + "search.query": [google_listener, bing_listener], # Broadcast +} +``` + +### Broadcast + +Multiple listeners can share a root tag (`broadcast: true`): + +```yaml +listeners: + - name: search.google + broadcast: true + # ... + + - name: search.bing + broadcast: true + # ... +``` + +All matching listeners execute concurrently. + +## Handler Dispatch + +### Async Handlers (Default) + +Run in the main event loop: + +```python +async def _dispatch_async(state, listener): + metadata = build_metadata(state, listener) + response = await listener.handler(state.payload, metadata) + await self._process_response(response, listener, state) +``` + +### CPU-Bound Handlers + +Dispatched to ProcessPoolExecutor: + +```python +async def _dispatch_to_process_pool(state, listener): + # Store data in shared backend + payload_uuid, metadata_uuid = store_task_data( + self._backend, state.payload, metadata + ) + + # Submit to pool + task = WorkerTask( + thread_uuid=state.thread_id, + payload_uuid=payload_uuid, + handler_path=listener.handler_path, + metadata_uuid=metadata_uuid, + ) + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + self._process_pool, execute_handler, task + ) + + # Fetch response from backend + response = fetch_response(self._backend, result.response_uuid) + await self._process_response(response, listener, state) +``` + +## Response Processing + +When a handler returns `HandlerResponse`: + +```python +async def _process_response(response, listener, state): + if response is None: + return # Chain terminates + + # Validate target + if listener.is_agent and listener.peers: + if response.to not in listener.peers: + await self._emit_error(state, "Routing error") + return + + # Build new envelope + payload_xml = xmlify_serialize(response.payload) + new_state = MessageState( + raw_bytes=wrap_in_envelope( + payload_xml, + from_id=listener.name, # SYSTEM INJECTS + thread_id=get_new_thread_id(response, state), + ), + ) + + # Re-inject + await self._process(new_state) +``` + +## Error Handling + +### Pipeline Errors + +If any step sets `state.error`, processing stops and `` is emitted: + +```xml + + XSD validation failed + base64-encoded-bytes + +``` + +### Routing Errors + +If an agent tries to route to an undeclared peer: + +```xml + + routing + Message could not be delivered. + true + +``` + +## Lifecycle + +```python +# Start +pump = StreamPump(config) +await pump.start() + +# Inject messages +await pump.inject(raw_bytes, from_id="console") + +# Shutdown (graceful) +await pump.shutdown() +``` + +## Configuration + +```yaml +process_pool: + workers: 4 + max_tasks_per_child: 100 + +backend: + type: redis + redis_url: "redis://localhost:6379" +``` + +## See Also + +- [[Architecture Overview]] — High-level view +- [[Thread Registry]] — Thread tracking +- [[Shared Backend]] — Cross-process state +- [[Writing Handlers]] — Handler patterns diff --git a/docs/wiki/architecture/Overview.md b/docs/wiki/architecture/Overview.md new file mode 100644 index 0000000..f56f2cf --- /dev/null +++ b/docs/wiki/architecture/Overview.md @@ -0,0 +1,256 @@ +# Architecture Overview + +xml-pipeline implements a stream-based message pump where all communication flows through validated XML envelopes. The architecture enforces strict isolation between handlers (untrusted code) and the system (trusted zone). + +## High-Level Architecture + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ TRUSTED ZONE (System) │ +│ • Thread registry (UUID ↔ call chain mapping) │ +│ • Listener registry (name → peers, schema) │ +│ • Envelope injection (, , ) │ +│ • Peer constraint enforcement │ +└─────────────────────────────────────────────────────────────────────┘ + ↕ + Coroutine Capture Boundary + ↕ +┌─────────────────────────────────────────────────────────────────────┐ +│ UNTRUSTED ZONE (Handlers) │ +│ • Receive typed payload + metadata │ +│ • Return HandlerResponse or None │ +│ • Cannot forge identity, escape thread, or probe topology │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +## Core Components + +### Message Pump (StreamPump) + +The central orchestrator that: +1. Receives raw XML bytes +2. Runs messages through preprocessing pipeline +3. Routes to appropriate handlers +4. Processes responses and re-injects + +See [[Message Pump]] for details. + +### Pipeline Steps + +Messages flow through ordered processing stages: + +``` +Raw Bytes + │ + ▼ +┌─────────────────┐ +│ repair_step │ Fix malformed XML (lxml recover mode) +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ c14n_step │ Canonicalize XML (Exclusive C14N) +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ envelope_valid │ Validate against envelope.xsd +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ payload_extract │ Extract payload from envelope +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ thread_assign │ Assign or inherit thread UUID +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ xsd_validate │ Validate against listener's XSD +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ deserialize │ XML → @xmlify dataclass +└────────┬────────┘ + ▼ +┌─────────────────┐ +│ routing │ Match to listener(s) +└────────┬────────┘ + ▼ + Handler +``` + +### Thread Registry + +Maps opaque UUIDs to call chains: + +``` +UUID: 550e8400-e29b-41d4-... +Chain: system.organism.console.greeter.calculator + │ │ │ │ │ + │ │ │ │ └─ Current handler + │ │ │ └─ Previous hop + │ │ └─ Entry point + │ └─ Organism name + └─ Root +``` + +Handlers only see the UUID. The actual chain is private to the system. + +See [[Thread Registry]] for details. + +### Listener Registry + +Tracks registered listeners: + +``` +name: "greeter" + ├── payload_class: Greeting + ├── handler: handle_greeting + ├── description: "Friendly greeting handler" + ├── agent: true + ├── peers: [shouter, calculator] + └── schema: schemas/greeter/v1.xsd +``` + +### Context Buffer + +Stores message history per thread: + +``` +Thread: uuid-123 + ├── Slot 0: Greeting(name="Alice") from console + ├── Slot 1: GreetingResponse(message="Hello!") from greeter + └── Slot 2: ShoutResponse(text="HELLO!") from shouter +``` + +Append-only, immutable slots. Auto-GC when thread is pruned. + +## Message Flow + +### 1. Message Arrival + +External message arrives (console, WebSocket, etc.): + +```xml + + + console + greeter + + + Alice + + +``` + +### 2. Pipeline Processing + +Message flows through pipeline steps. Each step transforms `MessageState`: + +```python +@dataclass +class MessageState: + raw_bytes: bytes | None # Input + envelope_tree: Element | None # After repair + payload_tree: Element | None # After extraction + payload: Any | None # After deserialization + thread_id: str | None # After assignment + from_id: str | None # Sender + target_listeners: list | None # After routing + error: str | None # If step fails +``` + +### 3. Handler Dispatch + +Handler receives typed payload + metadata: + +```python +async def handle_greeting(payload: Greeting, metadata: HandlerMetadata): + # payload.name == "Alice" + # metadata.thread_id == "uuid-123" + # metadata.from_id == "console" +``` + +### 4. Response Processing + +Handler returns `HandlerResponse`: + +```python +return HandlerResponse( + payload=GreetingResponse(message="Hello, Alice!"), + to="shouter", +) +``` + +System: +1. Validates `to` against peer list +2. Serializes payload to XML +3. Creates new envelope with injected `` +4. Re-injects into pipeline + +## Trust Boundaries + +### What the System Controls + +| Aspect | System Responsibility | +|--------|----------------------| +| `` | Always injected from listener.name | +| `` | Managed by thread registry | +| `` validation | Checked against peers list | +| Schema enforcement | XSD validation on every message | +| Call chain | Private, never exposed to handlers | + +### What Handlers Control + +| Aspect | Handler Capability | +|--------|-------------------| +| Payload content | Full control | +| Target selection | Via `HandlerResponse.to` (validated) | +| Response/no response | Return value | +| Self-iteration | Call own name | + +### What Handlers Cannot Do + +- Forge sender identity +- Access other threads +- Discover topology +- Route to undeclared peers +- Modify message history +- Access other handlers' state + +## Multiprocess Architecture + +For CPU-bound handlers: + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Main Process (StreamPump) │ +│ - Ingress pipeline │ +│ - Routing decisions │ +│ - Response re-injection │ +└───────────────────────────┬─────────────────────────────────────┘ + │ UUID + handler_path (minimal IPC) + ┌─────────────┼─────────────┐ + ▼ ▼ ▼ +┌─────────────────┐ ┌─────────────┐ ┌─────────────────┐ +│ Python Async │ │ ProcessPool │ │ (Future: WASM) │ +│ (main process) │ │ (N workers) │ │ │ +│ - Default mode │ │ - cpu_bound │ │ │ +└────────┬────────┘ └──────┬──────┘ └────────┬────────┘ + │ │ │ + └─────────────────┼──────────────────┘ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Shared Backend (Redis / Manager / Memory) │ +│ - Context buffer slots │ +│ - Thread registry mappings │ +└─────────────────────────────────────────────────────────────────┘ +``` + +See [[Shared Backend]] for details. + +## See Also + +- [[Message Pump]] — Detailed pump architecture +- [[Thread Registry]] — Call chain tracking +- [[Shared Backend]] — Cross-process state +- [[Handler Contract]] — Handler specification diff --git a/docs/wiki/architecture/Shared-Backend.md b/docs/wiki/architecture/Shared-Backend.md new file mode 100644 index 0000000..e38a17b --- /dev/null +++ b/docs/wiki/architecture/Shared-Backend.md @@ -0,0 +1,339 @@ +# Shared Backend + +The Shared Backend enables cross-process state sharing for multiprocess deployments. It provides storage for the Context Buffer and Thread Registry. + +## Overview + +By default, xml-pipeline uses in-memory storage (single process). For CPU-bound handlers running in separate processes, you need shared state: + +``` +┌────────────────────┐ ┌────────────────────┐ +│ Main Process │ │ Worker Process │ +│ (StreamPump) │ │ (cpu_bound) │ +└─────────┬──────────┘ └──────────┬─────────┘ + │ │ + └───────────┬───────────────┘ + │ + ▼ + ┌─────────────────────┐ + │ Shared Backend │ + │ (Redis/Manager) │ + └─────────────────────┘ +``` + +## Backend Types + +### InMemoryBackend (Default) + +Single-process, thread-safe storage using Python dictionaries. + +```python +from xml_pipeline.memory import get_shared_backend, BackendConfig + +config = BackendConfig(backend_type="memory") +backend = get_shared_backend(config) +``` + +**Use when:** +- Single process deployment +- Development/testing +- No CPU-bound handlers + +### ManagerBackend + +Uses `multiprocessing.Manager` for local multi-process sharing. + +```python +config = BackendConfig(backend_type="manager") +backend = get_shared_backend(config) +``` + +**Use when:** +- Local deployment with CPU-bound handlers +- No Redis available +- Single machine, multiple processes + +### RedisBackend + +Distributed storage with TTL-based auto-cleanup. + +```python +config = BackendConfig( + backend_type="redis", + redis_url="redis://localhost:6379", + redis_prefix="xp:", + redis_ttl=86400, # 24 hours +) +backend = get_shared_backend(config) +``` + +**Use when:** +- Distributed deployment +- Multiple machines +- Need persistence +- Production environments + +## Configuration + +### Via organism.yaml + +```yaml +backend: + type: redis # memory | manager | redis + redis_url: "redis://localhost:6379" # Redis connection URL + redis_prefix: "xp:" # Key prefix for multi-tenancy + redis_ttl: 86400 # Key TTL in seconds +``` + +### Programmatic + +```python +from xml_pipeline.memory import get_shared_backend, BackendConfig + +config = BackendConfig( + backend_type="redis", + redis_url="redis://localhost:6379", + redis_prefix="myapp:", + redis_ttl=3600, +) +backend = get_shared_backend(config) +``` + +## Storage Schema + +### Context Buffer + +Stores message history per thread. + +**In-Memory/Manager:** +```python +_buffers = { + "thread-uuid-1": [slot_bytes_0, slot_bytes_1, ...], + "thread-uuid-2": [...], +} +``` + +**Redis:** +``` +{prefix}buffer:{thread_id} → LIST of pickled BufferSlots +``` + +### Thread Registry + +Maps UUIDs to call chains. + +**In-Memory/Manager:** +```python +_chain_to_uuid = {"console.greeter": "uuid-123"} +_uuid_to_chain = {"uuid-123": "console.greeter"} +``` + +**Redis:** +``` +{prefix}chain:{chain} → {uuid} +{prefix}uuid:{uuid} → {chain} +``` + +## API + +### Buffer Operations + +```python +# Append a slot +index = backend.buffer_append(thread_id, slot_bytes) + +# Get all slots for thread +slots = backend.buffer_get_thread(thread_id) + +# Get specific slot +slot = backend.buffer_get_slot(thread_id, index) + +# Check thread exists +exists = backend.buffer_thread_exists(thread_id) + +# Delete thread +deleted = backend.buffer_delete_thread(thread_id) + +# List all threads +threads = backend.buffer_list_threads() + +# Clear all (testing) +backend.buffer_clear() +``` + +### Registry Operations + +```python +# Set chain ↔ UUID mapping +backend.registry_set(chain, uuid) + +# Get UUID from chain +uuid = backend.registry_get_uuid(chain) + +# Get chain from UUID +chain = backend.registry_get_chain(uuid) + +# Delete mapping +deleted = backend.registry_delete(uuid) + +# List all mappings +all_mappings = backend.registry_list_all() + +# Clear all (testing) +backend.registry_clear() +``` + +### Serialization + +Slots are serialized using pickle: + +```python +from xml_pipeline.memory import serialize_slot, deserialize_slot + +# Serialize for storage +slot_bytes = serialize_slot(buffer_slot) + +# Deserialize after retrieval +slot = deserialize_slot(slot_bytes) +``` + +## Integration + +### With ContextBuffer + +```python +from xml_pipeline.memory import get_context_buffer + +# Uses shared backend automatically if configured +buffer = get_context_buffer(backend=backend) + +# Check if using shared storage +print(buffer.is_shared) # True +``` + +### With ThreadRegistry + +```python +from xml_pipeline.message_bus.thread_registry import get_registry + +registry = get_registry(backend=backend) + +# Check if using shared storage +print(registry.is_shared) # True +``` + +### With StreamPump + +The pump automatically uses the configured backend: + +```yaml +backend: + type: redis + redis_url: "redis://localhost:6379" + +process_pool: + workers: 4 + +listeners: + - name: analyzer + cpu_bound: true # Uses shared backend for data exchange +``` + +## Worker Data Flow + +For CPU-bound handlers, data flows through the backend: + +``` +1. Main Process + ├── Serialize payload + metadata + ├── Store in backend (payload_uuid, metadata_uuid) + └── Submit WorkerTask to ProcessPool + +2. Worker Process + ├── Fetch payload + metadata from backend + ├── Execute handler + ├── Store response in backend (response_uuid) + └── Return WorkerResult + +3. Main Process + ├── Fetch response from backend + ├── Clean up temporary data + └── Process response normally +``` + +## TTL and Cleanup + +### Redis TTL + +Redis keys automatically expire: + +```yaml +backend: + redis_ttl: 86400 # Keys expire after 24 hours +``` + +### Manual Cleanup + +```python +# Delete specific thread +backend.buffer_delete_thread(thread_id) +backend.registry_delete(uuid) + +# Clear all (testing only) +backend.buffer_clear() +backend.registry_clear() +``` + +## Multi-Tenancy + +Use prefixes to isolate different organisms: + +```yaml +# Organism A +backend: + type: redis + redis_prefix: "orgA:" + +# Organism B +backend: + type: redis + redis_prefix: "orgB:" +``` + +## Monitoring + +### Redis Info + +```python +info = backend.info() +# {'buffer_threads': 5, 'registry_entries': 12} +``` + +### Health Check + +```python +is_healthy = backend.ping() # True if connected +``` + +## Testing + +```python +import pytest +from xml_pipeline.memory import InMemoryBackend + +@pytest.fixture +def backend(): + backend = InMemoryBackend() + yield backend + backend.close() + +def test_buffer_operations(backend): + backend.buffer_append("thread-1", b"data") + assert backend.buffer_thread_exists("thread-1") +``` + +## See Also + +- [[Architecture Overview]] — High-level architecture +- [[Message Pump]] — How the pump uses backends +- [[Configuration]] — Backend configuration options diff --git a/docs/wiki/architecture/Thread-Registry.md b/docs/wiki/architecture/Thread-Registry.md new file mode 100644 index 0000000..2166785 --- /dev/null +++ b/docs/wiki/architecture/Thread-Registry.md @@ -0,0 +1,261 @@ +# Thread Registry + +The Thread Registry maps opaque UUIDs to call chains, enabling thread tracking while hiding topology from handlers. + +## Purpose + +When agents communicate, they form call chains: + +``` +console → greeter → calculator → back to greeter → shouter +``` + +The registry: +1. **Tracks call chains** for routing responses +2. **Provides opaque UUIDs** to handlers (hiding topology) +3. **Manages chain pruning** when handlers respond + +## Concepts + +### Call Chain + +A dot-separated path showing message flow: + +``` +system.organism.console.greeter.calculator +│ │ │ │ │ +│ │ │ │ └─ Current position +│ │ │ └─ Greeter called calculator +│ │ └─ Console called greeter +│ └─ Organism name +└─ Root +``` + +### Opaque UUID + +What handlers actually see: + +``` +550e8400-e29b-41d4-a716-446655440000 +``` + +Handlers never see the actual chain. This prevents: +- Topology probing +- Call chain forgery +- Thread hijacking + +## API + +### Initialize Root + +At boot time: + +```python +from xml_pipeline.message_bus.thread_registry import get_registry + +registry = get_registry() +root_uuid = registry.initialize_root("my-organism") +# Creates: system.my-organism → uuid +``` + +### Get or Create + +Get UUID for a chain (creates if needed): + +```python +uuid = registry.get_or_create("console.greeter") +# Returns: existing UUID or creates new one +``` + +### Lookup + +Get chain for a UUID: + +```python +chain = registry.lookup(uuid) +# Returns: "console.greeter" or None +``` + +### Extend Chain + +When forwarding to a new handler: + +```python +new_uuid = registry.extend_chain(current_uuid, "calculator") +# Before: console.greeter (uuid-123) +# After: console.greeter.calculator (uuid-456) +``` + +### Prune for Response + +When a handler returns `.respond()`: + +```python +target, new_uuid = registry.prune_for_response(current_uuid) +# Before: console.greeter.calculator (uuid-456) +# After: console.greeter (uuid-123) +# target: "greeter" +``` + +### Register External Thread + +For messages arriving with pre-assigned UUIDs: + +```python +registry.register_thread( + thread_id="external-uuid", + initiator="console", + target="greeter" +) +# Creates: system.organism.console.greeter → external-uuid +``` + +## Thread Lifecycle + +### Creation + +``` +1. External message arrives without thread + │ + ▼ +2. thread_assignment_step generates UUID + │ + ▼ +3. Registry maps: chain → UUID +``` + +### Extension + +``` +1. Handler A forwards to Handler B + │ + ▼ +2. Pump calls extend_chain(uuid_A, "B") + │ + ▼ +3. Registry creates: chain.B → uuid_B +``` + +### Pruning + +``` +1. Handler B calls .respond() + │ + ▼ +2. Pump calls prune_for_response(uuid_B) + │ + ▼ +3. Registry: + - Looks up chain: "...A.B" + - Prunes last segment: "...A" + - Returns target "A" and uuid_A + │ + ▼ +4. Response routed to Handler A +``` + +### Cleanup + +``` +1. Chain exhausted (root reached) or + Handler returns None + │ + ▼ +2. UUID mapping removed + │ + ▼ +3. Context buffer for thread deleted +``` + +## Shared Backend Support + +For multiprocess deployments, the registry can use a shared backend: + +```python +from xml_pipeline.memory.shared_backend import get_shared_backend, BackendConfig + +# Use Redis for distributed deployments +config = BackendConfig(backend_type="redis", redis_url="redis://localhost:6379") +backend = get_shared_backend(config) +registry = get_registry(backend=backend) +``` + +### Storage Schema (Redis) + +``` +xp:chain:{chain} → {uuid} # Chain to UUID +xp:uuid:{uuid} → {chain} # UUID to Chain +``` + +## Security Properties + +### What Handlers See + +```python +metadata.thread_id = "550e8400-..." # Opaque UUID +metadata.from_id = "greeter" # Only immediate caller +``` + +### What Handlers Don't See + +- Full call chain +- Other thread UUIDs +- Thread count or topology +- Parent/child relationships + +### Why This Matters + +Even compromised handlers cannot: +- **Forge thread IDs** — UUIDs are cryptographically random +- **Discover topology** — Chain hidden behind UUID +- **Hijack threads** — Registry validates all operations +- **Probe other threads** — No enumeration API + +## Debugging + +For operators (not exposed to handlers): + +```python +# Dump all mappings +chains = registry.debug_dump() +# {'uuid-123': 'console.greeter', 'uuid-456': 'console.greeter.calc'} + +# Clear (testing only) +registry.clear() +``` + +## Example Flow + +``` +1. Console sends @greeter hello + ├── UUID assigned: uuid-1 + └── Chain: system.org.console.greeter + +2. Greeter forwards to calculator + ├── extend_chain(uuid-1, "calculator") + ├── New UUID: uuid-2 + └── Chain: system.org.console.greeter.calculator + +3. Calculator responds + ├── prune_for_response(uuid-2) + ├── Target: "greeter" + └── UUID: uuid-1 (back to greeter's context) + +4. Greeter responds + ├── prune_for_response(uuid-1) + ├── Target: "console" + └── Chain exhausted → cleanup +``` + +## Configuration + +No explicit configuration needed. The registry: +- Initializes automatically at pump startup +- Uses shared backend if configured +- Cleans up on thread termination + +## See Also + +- [[Architecture Overview]] — High-level architecture +- [[Message Pump]] — How the pump uses the registry +- [[Shared Backend]] — Cross-process storage diff --git a/docs/wiki/convert-to-xwiki.ps1 b/docs/wiki/convert-to-xwiki.ps1 new file mode 100644 index 0000000..0bdcdf2 --- /dev/null +++ b/docs/wiki/convert-to-xwiki.ps1 @@ -0,0 +1,76 @@ +# Convert Markdown wiki docs to XWiki format using Pandoc +# +# Prerequisites: +# - Pandoc installed (https://pandoc.org/installing.html) +# - Run from docs/wiki directory +# +# Usage: +# .\convert-to-xwiki.ps1 +# +# Output: +# Creates xwiki/ directory with converted files + +$ErrorActionPreference = "Stop" + +$ScriptDir = Split-Path -Parent $MyInvocation.MyCommand.Path +$OutputDir = Join-Path $ScriptDir "xwiki" + +Write-Host "Converting Markdown to XWiki format..." +Write-Host "Output directory: $OutputDir" +Write-Host "" + +# Create output directory structure +New-Item -ItemType Directory -Force -Path $OutputDir | Out-Null +New-Item -ItemType Directory -Force -Path (Join-Path $OutputDir "architecture") | Out-Null +New-Item -ItemType Directory -Force -Path (Join-Path $OutputDir "reference") | Out-Null +New-Item -ItemType Directory -Force -Path (Join-Path $OutputDir "tutorials") | Out-Null + +# Function to convert a single file +function Convert-File { + param ( + [string]$Input, + [string]$Output + ) + + if (Test-Path $Input) { + Write-Host "Converting: $Input" + pandoc -f markdown -t xwiki $Input -o $Output + } +} + +# Convert root level docs +Convert-File (Join-Path $ScriptDir "Home.md") (Join-Path $OutputDir "Home.xwiki") +Convert-File (Join-Path $ScriptDir "Installation.md") (Join-Path $OutputDir "Installation.xwiki") +Convert-File (Join-Path $ScriptDir "Quick-Start.md") (Join-Path $OutputDir "Quick-Start.xwiki") +Convert-File (Join-Path $ScriptDir "Writing-Handlers.md") (Join-Path $OutputDir "Writing-Handlers.xwiki") +Convert-File (Join-Path $ScriptDir "LLM-Router.md") (Join-Path $OutputDir "LLM-Router.xwiki") +Convert-File (Join-Path $ScriptDir "Why-XML.md") (Join-Path $OutputDir "Why-XML.xwiki") + +# Convert architecture docs +Convert-File (Join-Path $ScriptDir "architecture\Overview.md") (Join-Path $OutputDir "architecture\Overview.xwiki") +Convert-File (Join-Path $ScriptDir "architecture\Message-Pump.md") (Join-Path $OutputDir "architecture\Message-Pump.xwiki") +Convert-File (Join-Path $ScriptDir "architecture\Thread-Registry.md") (Join-Path $OutputDir "architecture\Thread-Registry.xwiki") +Convert-File (Join-Path $ScriptDir "architecture\Shared-Backend.md") (Join-Path $OutputDir "architecture\Shared-Backend.xwiki") + +# Convert reference docs +Convert-File (Join-Path $ScriptDir "reference\Configuration.md") (Join-Path $OutputDir "reference\Configuration.xwiki") +Convert-File (Join-Path $ScriptDir "reference\Handler-Contract.md") (Join-Path $OutputDir "reference\Handler-Contract.xwiki") +Convert-File (Join-Path $ScriptDir "reference\CLI.md") (Join-Path $OutputDir "reference\CLI.xwiki") + +# Convert tutorials +Convert-File (Join-Path $ScriptDir "tutorials\Hello-World.md") (Join-Path $OutputDir "tutorials\Hello-World.xwiki") + +Write-Host "" +Write-Host "Conversion complete!" +Write-Host "" +Write-Host "Files created in: $OutputDir" +Write-Host "" +Write-Host "Next steps:" +Write-Host " 1. Review the converted files" +Write-Host " 2. Upload to XWiki via REST API or import feature" +Write-Host "" +Write-Host "XWiki REST API example:" +Write-Host " curl -u admin:password -X PUT ``" +Write-Host " 'https://xml-pipeline.org/rest/wikis/xwiki/spaces/Docs/pages/Home' ``" +Write-Host " -H 'Content-Type: text/plain' ``" +Write-Host " -d @$OutputDir\Home.xwiki" diff --git a/docs/wiki/convert-to-xwiki.sh b/docs/wiki/convert-to-xwiki.sh new file mode 100644 index 0000000..478e9db --- /dev/null +++ b/docs/wiki/convert-to-xwiki.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# Convert Markdown wiki docs to XWiki format using Pandoc +# +# Prerequisites: +# - Pandoc installed (https://pandoc.org/installing.html) +# - Run from docs/wiki directory +# +# Usage: +# ./convert-to-xwiki.sh +# +# Output: +# Creates xwiki/ directory with converted files + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +OUTPUT_DIR="$SCRIPT_DIR/xwiki" + +echo "Converting Markdown to XWiki format..." +echo "Output directory: $OUTPUT_DIR" +echo "" + +# Create output directory structure +mkdir -p "$OUTPUT_DIR" +mkdir -p "$OUTPUT_DIR/architecture" +mkdir -p "$OUTPUT_DIR/reference" +mkdir -p "$OUTPUT_DIR/tutorials" + +# Function to convert a single file +convert_file() { + local input="$1" + local output="$2" + + if [ -f "$input" ]; then + echo "Converting: $input" + pandoc -f markdown -t xwiki "$input" -o "$output" + fi +} + +# Convert root level docs +convert_file "$SCRIPT_DIR/Home.md" "$OUTPUT_DIR/Home.xwiki" +convert_file "$SCRIPT_DIR/Installation.md" "$OUTPUT_DIR/Installation.xwiki" +convert_file "$SCRIPT_DIR/Quick-Start.md" "$OUTPUT_DIR/Quick-Start.xwiki" +convert_file "$SCRIPT_DIR/Writing-Handlers.md" "$OUTPUT_DIR/Writing-Handlers.xwiki" +convert_file "$SCRIPT_DIR/LLM-Router.md" "$OUTPUT_DIR/LLM-Router.xwiki" +convert_file "$SCRIPT_DIR/Why-XML.md" "$OUTPUT_DIR/Why-XML.xwiki" + +# Convert architecture docs +convert_file "$SCRIPT_DIR/architecture/Overview.md" "$OUTPUT_DIR/architecture/Overview.xwiki" +convert_file "$SCRIPT_DIR/architecture/Message-Pump.md" "$OUTPUT_DIR/architecture/Message-Pump.xwiki" +convert_file "$SCRIPT_DIR/architecture/Thread-Registry.md" "$OUTPUT_DIR/architecture/Thread-Registry.xwiki" +convert_file "$SCRIPT_DIR/architecture/Shared-Backend.md" "$OUTPUT_DIR/architecture/Shared-Backend.xwiki" + +# Convert reference docs +convert_file "$SCRIPT_DIR/reference/Configuration.md" "$OUTPUT_DIR/reference/Configuration.xwiki" +convert_file "$SCRIPT_DIR/reference/Handler-Contract.md" "$OUTPUT_DIR/reference/Handler-Contract.xwiki" +convert_file "$SCRIPT_DIR/reference/CLI.md" "$OUTPUT_DIR/reference/CLI.xwiki" + +# Convert tutorials +convert_file "$SCRIPT_DIR/tutorials/Hello-World.md" "$OUTPUT_DIR/tutorials/Hello-World.xwiki" + +echo "" +echo "Conversion complete!" +echo "" +echo "Files created in: $OUTPUT_DIR" +echo "" +echo "Next steps:" +echo " 1. Review the converted files" +echo " 2. Upload to XWiki via REST API or import feature" +echo "" +echo "XWiki REST API example:" +echo " curl -u admin:password -X PUT \\" +echo " 'https://xml-pipeline.org/rest/wikis/xwiki/spaces/Docs/pages/Home' \\" +echo " -H 'Content-Type: text/plain' \\" +echo " -d @$OUTPUT_DIR/Home.xwiki" diff --git a/docs/wiki/reference/CLI.md b/docs/wiki/reference/CLI.md new file mode 100644 index 0000000..23b7d29 --- /dev/null +++ b/docs/wiki/reference/CLI.md @@ -0,0 +1,129 @@ +# CLI Reference + +xml-pipeline provides a command-line interface for running and managing organisms. + +## Commands + +### xml-pipeline run + +Run an organism from a configuration file. + +```bash +xml-pipeline run [CONFIG_PATH] +``` + +**Arguments:** +- `CONFIG_PATH` — Path to organism.yaml (default: `config/organism.yaml`) + +**Examples:** +```bash +xml-pipeline run config/organism.yaml +xml-pipeline run # Uses default path +xp run config/my-organism.yaml # Short alias +``` + +### xml-pipeline init + +Create a new organism configuration from template. + +```bash +xml-pipeline init [NAME] +``` + +**Arguments:** +- `NAME` — Organism name (default: `my-organism`) + +**Creates:** +``` +{NAME}/ +├── config/ +│ └── organism.yaml +├── handlers/ +│ └── hello.py +└── .env.example +``` + +**Examples:** +```bash +xml-pipeline init my-agent +xml-pipeline init # Uses default name +``` + +### xml-pipeline check + +Validate configuration without running. + +```bash +xml-pipeline check [CONFIG_PATH] +``` + +**Arguments:** +- `CONFIG_PATH` — Path to organism.yaml (default: `config/organism.yaml`) + +**Output:** +``` +Config valid: hello-world + Listeners: 3 + LLM backends: 1 +``` + +**Examples:** +```bash +xml-pipeline check config/organism.yaml +xml-pipeline check # Uses default path +``` + +### xml-pipeline version + +Show version and installed features. + +```bash +xml-pipeline version +``` + +**Output:** +``` +xml-pipeline 0.4.0 +Python 3.11.5 +Features: anthropic, console, redis, search +``` + +## Short Alias + +The `xp` command is a short alias for `xml-pipeline`: + +```bash +xp run config/organism.yaml +xp init my-agent +xp check +xp version +``` + +## Environment Variables + +| Variable | Description | +|----------|-------------| +| `XAI_API_KEY` | xAI (Grok) API key | +| `ANTHROPIC_API_KEY` | Anthropic (Claude) API key | +| `OPENAI_API_KEY` | OpenAI API key | + +Create a `.env` file in your project root: + +```env +XAI_API_KEY=xai-... +ANTHROPIC_API_KEY=sk-ant-... +``` + +## Exit Codes + +| Code | Meaning | +|------|---------| +| 0 | Success | +| 1 | Configuration error | +| 2 | Runtime error | + +## See Also + +- [[Installation]] — Installing xml-pipeline +- [[Quick Start]] — Getting started +- [[Configuration]] — Configuration reference diff --git a/docs/wiki/reference/Configuration.md b/docs/wiki/reference/Configuration.md new file mode 100644 index 0000000..c9a8d0f --- /dev/null +++ b/docs/wiki/reference/Configuration.md @@ -0,0 +1,196 @@ +# Configuration Reference + +Organisms are configured via YAML files. The default location is `config/organism.yaml`. + +## Minimal Configuration + +```yaml +organism: + name: my-organism + +listeners: + - name: greeter + payload_class: handlers.hello.Greeting + handler: handlers.hello.handle_greeting + description: Greeting handler +``` + +## Full Configuration Reference + +```yaml +# ============================================================ +# ORGANISM SECTION +# Core identity and network settings +# ============================================================ +organism: + name: "my-organism" # Human-readable name (required) + port: 8765 # WebSocket port (optional) + identity: "config/identity.key" # Ed25519 private key path (optional) + tls: # TLS settings (optional) + cert: "certs/fullchain.pem" + key: "certs/privkey.pem" + +# ============================================================ +# LLM SECTION +# Language model routing configuration +# ============================================================ +llm: + strategy: failover # failover | round-robin | least-loaded + retries: 3 # Max retry attempts + retry_base_delay: 1.0 # Base delay for exponential backoff + retry_max_delay: 60.0 # Maximum delay between retries + + backends: + - provider: xai # xai | anthropic | openai | ollama + api_key_env: XAI_API_KEY # Environment variable name + priority: 1 # Lower = preferred (for failover) + rate_limit_tpm: 100000 # Tokens per minute limit + max_concurrent: 20 # Max concurrent requests + + - provider: anthropic + api_key_env: ANTHROPIC_API_KEY + priority: 2 + + - provider: ollama + base_url: http://localhost:11434 + supported_models: [llama3, mistral] + +# ============================================================ +# BACKEND SECTION (Optional) +# Shared state for multiprocess deployments +# ============================================================ +backend: + type: memory # memory | manager | redis + # Redis-specific settings (when type: redis) + redis_url: "redis://localhost:6379" + redis_prefix: "xp:" # Key prefix for multi-tenancy + redis_ttl: 86400 # TTL in seconds (24 hours) + +# ============================================================ +# PROCESS POOL SECTION (Optional) +# Worker processes for CPU-bound handlers +# ============================================================ +process_pool: + workers: 4 # Number of worker processes + max_tasks_per_child: 100 # Restart workers after N tasks + +# ============================================================ +# LISTENERS SECTION +# Message handlers (tools and agents) +# ============================================================ +listeners: + # Simple tool (non-agent) + - name: calculator.add + payload_class: handlers.calc.AddPayload + handler: handlers.calc.add_handler + description: "Adds two numbers" + + # LLM Agent + - name: researcher + payload_class: handlers.research.ResearchQuery + handler: handlers.research.research_handler + description: "Research agent that searches and synthesizes" + agent: true # Marks as LLM agent + peers: # Allowed call targets + - calculator.add + - web_search + prompt: | # System prompt for LLM + You are a research assistant. + Use tools to find information. + + # CPU-bound handler (runs in process pool) + - name: librarian + payload_class: handlers.librarian.Query + handler: handlers.librarian.handle_query + description: "Document analysis with heavy computation" + cpu_bound: true # Dispatch to ProcessPoolExecutor + +# ============================================================ +# GATEWAYS SECTION (Optional) +# Federation with remote organisms +# ============================================================ +gateways: + - name: remote_search + remote_url: "wss://search.example.org" + trusted_identity: "keys/search_node.pub" + description: "Federated search gateway" +``` + +## Section Details + +### organism + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `name` | string | Yes | Human-readable organism name | +| `port` | int | No | WebSocket server port | +| `identity` | path | No | Ed25519 private key for signing | +| `tls.cert` | path | No | TLS certificate path | +| `tls.key` | path | No | TLS private key path | + +### llm.backends[] + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `provider` | string | Yes | `xai`, `anthropic`, `openai`, `ollama` | +| `api_key_env` | string | Depends | Env var containing API key | +| `base_url` | string | No | Override API endpoint | +| `priority` | int | No | Lower = preferred (default: 1) | +| `rate_limit_tpm` | int | No | Tokens per minute limit | +| `max_concurrent` | int | No | Max concurrent requests | +| `supported_models` | list | No | Models this backend serves (ollama) | + +### listeners[] + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `name` | string | Yes | Unique listener name | +| `payload_class` | string | Yes | Import path to `@xmlify` dataclass | +| `handler` | string | Yes | Import path to handler function | +| `description` | string | Yes | Short description (used in prompts) | +| `agent` | bool | No | Is this an LLM agent? (default: false) | +| `peers` | list | No | Allowed call targets for agents | +| `prompt` | string | No | System prompt for LLM agents | +| `cpu_bound` | bool | No | Run in ProcessPoolExecutor (default: false) | +| `broadcast` | bool | No | Allow shared root tag (default: false) | + +### backend + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `type` | string | No | `memory`, `manager`, `redis` (default: memory) | +| `redis_url` | string | If redis | Redis connection URL | +| `redis_prefix` | string | No | Key prefix (default: `xp:`) | +| `redis_ttl` | int | No | Key TTL in seconds | + +### process_pool + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `workers` | int | No | Number of worker processes (default: CPU count) | +| `max_tasks_per_child` | int | No | Tasks before worker restart | + +## Environment Variables + +API keys should be stored in environment variables, referenced via `api_key_env`: + +```env +# .env file +XAI_API_KEY=xai-abc123... +ANTHROPIC_API_KEY=sk-ant-... +OPENAI_API_KEY=sk-... +``` + +## Validation + +Validate your configuration without running: + +```bash +xml-pipeline check config/organism.yaml +``` + +## See Also + +- [[Quick Start]] — Get started quickly +- [[Writing Handlers]] — Create handlers +- [[LLM Router]] — LLM backend details diff --git a/docs/wiki/reference/Handler-Contract.md b/docs/wiki/reference/Handler-Contract.md new file mode 100644 index 0000000..9475276 --- /dev/null +++ b/docs/wiki/reference/Handler-Contract.md @@ -0,0 +1,293 @@ +# Handler Contract + +The complete specification for handler functions in xml-pipeline. + +## Signature + +Every handler must be declared as: + +```python +async def handler( + payload: PayloadDataclass, + metadata: HandlerMetadata +) -> HandlerResponse | None: + ... +``` + +### Requirements + +| Aspect | Requirement | +|--------|-------------| +| Function type | Must be `async def` | +| First parameter | XSD-validated `@xmlify` dataclass | +| Second parameter | `HandlerMetadata` (required) | +| Return type | `HandlerResponse` or `None` | + +## HandlerMetadata + +```python +@dataclass +class HandlerMetadata: + thread_id: str # Opaque thread UUID + from_id: str # Who sent this message + own_name: str | None # This listener's name (agents only) + is_self_call: bool # True if message from self + usage_instructions: str # Peer schemas for LLM prompts + todo_nudge: str # System reminders +``` + +### Field Details + +#### thread_id + +Opaque UUID for the current conversation thread. + +- Use for thread-scoped storage +- Never parse or make assumptions about format +- Maps internally to call chain (hidden) + +#### from_id + +The registered name of the listener that sent this message. + +- Only shows immediate sender, not full chain +- Use for logging/debugging +- Don't use for routing (use `HandlerResponse.to`) + +#### own_name + +This listener's registered name. Only set for agents (`agent: true`). + +- Enables self-referential reasoning +- Used for self-iteration: `to=metadata.own_name` +- `None` for non-agent listeners + +#### is_self_call + +`True` if this message was sent by this same handler. + +- Detect iteration loops +- Handle self-messages differently if needed + +#### usage_instructions + +Auto-generated documentation of peer capabilities. + +- Contains XSD schemas of declared peers +- Inject into LLM system prompts +- Empty if no peers declared + +#### todo_nudge + +System-generated reminders about pending todos. + +- Contains info about raised watchers +- Used by agents for task tracking +- Empty if no pending todos + +## HandlerResponse + +```python +@dataclass +class HandlerResponse: + payload: Any # @xmlify dataclass instance + to: str # Target listener name +``` + +### Construction Methods + +#### Direct Construction + +Forward to a specific listener: + +```python +return HandlerResponse( + payload=MyResponse(data="result"), + to="next-handler", +) +``` + +#### Respond to Caller + +Return to whoever sent the message: + +```python +return HandlerResponse.respond( + payload=ResultPayload(value=42) +) +``` + +### Return None + +End the chain with no response: + +```python +return None +``` + +## Return Types + +| Return | Effect | +|--------|--------| +| `HandlerResponse(to="x")` | Forward to listener "x" | +| `HandlerResponse.respond()` | Return to caller (prunes chain) | +| `None` | Terminate chain | + +## Envelope Control + +The system enforces these rules on responses: + +| Field | Handler Control | System Override | +|-------|-----------------|-----------------| +| `` | None | Always `listener.name` | +| `` | Via `response.to` | Validated against peers | +| `` | None | Managed by registry | +| `` | Full control | — | + +## Peer Constraints + +Agents can only send to declared peers: + +```yaml +listeners: + - name: greeter + agent: true + peers: [shouter, logger] # Only these allowed +``` + +### Violation Handling + +If agent sends to undeclared peer: + +1. Message **blocked** (never routed) +2. `SystemError` returned to agent +3. Thread stays alive (can retry) + +```xml + + routing + Message could not be delivered. + true + +``` + +## Response Semantics + +### Critical: Pruning on Respond + +When you call `.respond()`, the call chain is **pruned**: + +``` +Before: console → greeter → calculator + ↑ (you respond here) + +After: console → greeter + ↑ (response delivered here) +``` + +**Consequences:** + +- Sub-agents you called are terminated +- Their state/context is lost +- You cannot call them again in this context + +**Therefore:** Complete ALL sub-tasks before responding. + +## Examples + +### Simple Tool + +```python +@xmlify +@dataclass +class AddPayload: + a: int + b: int + +@xmlify +@dataclass +class AddResult: + sum: int + +async def add_handler(payload: AddPayload, metadata: HandlerMetadata) -> HandlerResponse: + result = payload.a + payload.b + return HandlerResponse.respond(payload=AddResult(sum=result)) +``` + +### LLM Agent + +```python +async def research_handler(payload: ResearchQuery, metadata: HandlerMetadata) -> HandlerResponse: + from xml_pipeline.platform.llm_api import complete + + response = await complete( + model="grok-4.1", + messages=[ + {"role": "system", "content": metadata.usage_instructions}, + {"role": "user", "content": payload.query}, + ], + ) + + return HandlerResponse( + payload=ResearchResult(answer=response.content), + to="summarizer", + ) +``` + +### Self-Iterating Agent + +```python +async def thinking_agent(payload: ThinkPayload, metadata: HandlerMetadata) -> HandlerResponse: + if payload.iteration >= 5: + # Done thinking - respond to caller + return HandlerResponse.respond( + payload=FinalAnswer(answer=payload.current_answer) + ) + + # Continue thinking by calling self + return HandlerResponse( + payload=ThinkPayload( + iteration=payload.iteration + 1, + current_answer=f"Refined: {payload.current_answer}", + ), + to=metadata.own_name, # Self-call + ) +``` + +### Terminal Handler + +```python +async def console_output(payload: TextOutput, metadata: HandlerMetadata) -> None: + print(f"[{payload.source}] {payload.text}") + return None # Chain ends +``` + +### Error Handling + +```python +async def safe_handler(payload: MyPayload, metadata: HandlerMetadata) -> HandlerResponse: + try: + result = await risky_operation(payload) + return HandlerResponse.respond(payload=SuccessResult(data=result)) + except ValidationError as e: + return HandlerResponse.respond(payload=ErrorResult(error=str(e))) + except Exception: + logger.exception("Handler failed") + return HandlerResponse.respond(payload=ErrorResult(error="Internal error")) +``` + +## Security Properties + +Handlers are **untrusted code**. Even compromised handlers cannot: + +- Forge sender identity +- Access other threads +- Discover organism topology +- Route to undeclared peers +- Modify message history + +## See Also + +- [[Writing Handlers]] — Practical guide +- [[Configuration]] — Registering handlers +- [[Architecture Overview]] — System architecture diff --git a/docs/wiki/tutorials/Hello-World.md b/docs/wiki/tutorials/Hello-World.md new file mode 100644 index 0000000..6aa2be2 --- /dev/null +++ b/docs/wiki/tutorials/Hello-World.md @@ -0,0 +1,376 @@ +# Hello World Tutorial + +Build a complete greeting agent from scratch. By the end, you'll understand payloads, handlers, configuration, and message flow. + +## What We're Building + +A greeting system with three components: + +``` +User Input → Greeter Agent → Shouter Tool → Output + │ │ │ │ + │ │ │ │ + "Alice" "Hello, "HELLO, Displayed + Alice!" ALICE!" to user +``` + +## Prerequisites + +- Python 3.11+ +- xml-pipeline installed (`pip install xml-pipeline[console]`) + +## Step 1: Create Project Structure + +```bash +mkdir hello-world +cd hello-world +mkdir -p config handlers +``` + +## Step 2: Define Payloads + +Create `handlers/payloads.py`: + +```python +from dataclasses import dataclass +from third_party.xmlable import xmlify + +@xmlify +@dataclass +class Greeting: + """Request to greet someone.""" + name: str + +@xmlify +@dataclass +class GreetingResponse: + """A friendly greeting.""" + message: str + +@xmlify +@dataclass +class ShoutRequest: + """Request to shout text.""" + text: str + +@xmlify +@dataclass +class ShoutResponse: + """Shouted text.""" + text: str + +@xmlify +@dataclass +class ConsoleOutput: + """Text to display.""" + text: str + source: str = "system" +``` + +**What's happening:** +- `@xmlify` enables XML serialization +- `@dataclass` provides the fields +- Each class becomes a valid XML payload + +## Step 3: Write Handlers + +Create `handlers/greeter.py`: + +```python +from xml_pipeline.message_bus.message_state import HandlerMetadata, HandlerResponse +from handlers.payloads import Greeting, GreetingResponse, ShoutRequest + +async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> HandlerResponse: + """ + Receive a greeting request, create a friendly message, + then forward to the shouter to make it exciting. + """ + # Create the greeting + message = f"Hello, {payload.name}! Welcome to xml-pipeline!" + + # Forward to shouter (will come back to us? No - goes to output) + return HandlerResponse( + payload=ShoutRequest(text=message), + to="shouter", + ) +``` + +Create `handlers/shouter.py`: + +```python +from xml_pipeline.message_bus.message_state import HandlerMetadata, HandlerResponse +from handlers.payloads import ShoutRequest, ConsoleOutput + +async def handle_shout(payload: ShoutRequest, metadata: HandlerMetadata) -> HandlerResponse: + """ + Take text and SHOUT IT! + Then send to console for display. + """ + shouted = payload.text.upper() + "!!!" + + return HandlerResponse( + payload=ConsoleOutput(text=shouted, source="shouter"), + to="console-output", + ) +``` + +Create `handlers/output.py`: + +```python +from xml_pipeline.message_bus.message_state import HandlerMetadata +from handlers.payloads import ConsoleOutput + +async def handle_output(payload: ConsoleOutput, metadata: HandlerMetadata) -> None: + """ + Display text to console. + Returns None to end the message chain. + """ + print(f"\n[{payload.source}] {payload.text}\n") + return None # Chain ends here +``` + +## Step 4: Configure the Organism + +Create `config/organism.yaml`: + +```yaml +organism: + name: hello-world + +listeners: + # The greeter agent + - name: greeter + payload_class: handlers.payloads.Greeting + handler: handlers.greeter.handle_greeting + description: "Greets people by name" + peers: + - shouter + + # The shouter tool + - name: shouter + payload_class: handlers.payloads.ShoutRequest + handler: handlers.shouter.handle_shout + description: "Makes text LOUD" + peers: + - console-output + + # Output handler + - name: console-output + payload_class: handlers.payloads.ConsoleOutput + handler: handlers.output.handle_output + description: "Displays text to console" +``` + +## Step 5: Verify Configuration + +```bash +xml-pipeline check config/organism.yaml +``` + +Expected output: +``` +Config valid: hello-world + Listeners: 3 + LLM backends: 0 +``` + +## Step 6: Create Test Script + +Create `test_greeting.py`: + +```python +import asyncio +from xml_pipeline.message_bus.stream_pump import StreamPump +from xml_pipeline.config.loader import load_config + +async def main(): + # Load configuration + config = load_config("config/organism.yaml") + + # Create and start the pump + pump = StreamPump(config) + await pump.start() + + print("Organism started! Injecting greeting...") + + # Create a greeting message + greeting_xml = b""" + + + test + greeter + + + Alice + + + """ + + # Inject the message + await pump.inject(greeting_xml, from_id="test") + + # Give it time to process + await asyncio.sleep(1) + + # Shutdown + await pump.shutdown() + print("Done!") + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Step 7: Run It + +```bash +python test_greeting.py +``` + +Expected output: +``` +Organism started! Injecting greeting... + +[shouter] HELLO, ALICE! WELCOME TO XML-PIPELINE!!!! + +Done! +``` + +## What Just Happened? + +Let's trace the message flow: + +### 1. Message Injection + +```xml + + Alice + +``` + +Injected with `from=test`, `to=greeter`. + +### 2. Pipeline Processing + +``` +Raw bytes + ↓ +repair_step → Valid XML tree + ↓ +c14n_step → Canonicalized + ↓ +envelope_valid → Matches envelope.xsd + ↓ +payload_extract → Extracts + ↓ +thread_assign → UUID: abc-123 + ↓ +xsd_validate → Matches Greeting schema + ↓ +deserialize → Greeting(name="Alice") + ↓ +routing → target: greeter +``` + +### 3. Handler Dispatch + +```python +# greeter receives: +payload = Greeting(name="Alice") +metadata.thread_id = "abc-123" +metadata.from_id = "test" +``` + +### 4. Response Processing + +Greeter returns: +```python +HandlerResponse( + payload=ShoutRequest(text="Hello, Alice!..."), + to="shouter", +) +``` + +System: +1. Validates `shouter` is in greeter's peers ✓ +2. Serializes ShoutRequest to XML +3. Wraps in envelope with `from=greeter` +4. Re-injects into pipeline + +### 5. Chain Continues + +Shouter receives ShoutRequest, returns ConsoleOutput to `console-output`. + +### 6. Chain Terminates + +`handle_output` returns `None` → chain ends. + +## Exercises + +### Exercise 1: Add a Counter + +Modify the shouter to count how many times it's been called: + +```python +# Hint: Use a module-level variable (simple) or +# the context buffer (proper way) +``` + +### Exercise 2: Add Error Handling + +What happens if someone sends an empty name? Add validation: + +```python +async def handle_greeting(payload: Greeting, metadata: HandlerMetadata): + if not payload.name.strip(): + return HandlerResponse( + payload=ConsoleOutput(text="Error: Name required", source="greeter"), + to="console-output", + ) + # ... rest of handler +``` + +### Exercise 3: Make Greeter an LLM Agent + +Convert greeter to use an LLM for creative greetings: + +```python +from xml_pipeline.platform.llm_api import complete + +async def handle_greeting(payload: Greeting, metadata: HandlerMetadata): + response = await complete( + model="grok-4.1", + messages=[ + {"role": "system", "content": "Generate a creative, friendly greeting."}, + {"role": "user", "content": f"Greet someone named {payload.name}"}, + ], + ) + + return HandlerResponse( + payload=ShoutRequest(text=response.content), + to="shouter", + ) +``` + +Don't forget to add LLM configuration: + +```yaml +llm: + backends: + - provider: xai + api_key_env: XAI_API_KEY +``` + +## Summary + +You've learned: +- **Payloads**: Define messages with `@xmlify` dataclasses +- **Handlers**: Async functions that process and respond +- **Configuration**: Wire everything in organism.yaml +- **Message Flow**: How messages traverse the pipeline +- **Chaining**: Handlers forward to each other via `HandlerResponse` + +## Next Steps + +- [[Writing Handlers]] — More handler patterns +- [[Configuration]] — Full configuration reference +- [[Architecture Overview]] — Deep dive into internals