Add thread registry, LLM router, console handler, and docs updates

Thread Registry:
- Root thread initialization at boot
- Thread chain tracking for message flow
- register_thread() for external message UUIDs

LLM Router:
- Multi-backend support with failover strategy
- Token bucket rate limiting per backend
- Async completion API with retries

Console Handler:
- Message-driven REPL (not separate async loop)
- ConsolePrompt/ConsoleInput payloads
- Handler returns None to disconnect

Boot System:
- System primitives module
- Boot message injected at startup
- Initializes root thread context

Documentation:
- Updated v2.1 docs for new architecture
- LLM router documentation
- Gap analysis cross-check

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
dullfig 2026-01-10 16:53:38 -08:00
parent 8aa58715df
commit a5e2ab22da
21 changed files with 3578 additions and 274 deletions

12
.env.example Normal file
View file

@ -0,0 +1,12 @@
# LLM API Keys
# Copy this file to .env and fill in your keys
# NEVER commit .env to git!
# xAI (Grok)
XAI_API_KEY=xai-your-key-here
# Anthropic (Claude)
ANTHROPIC_API_KEY=sk-ant-your-key-here
# OpenAI
OPENAI_API_KEY=sk-your-key-here

3
.gitignore vendored
View file

@ -10,6 +10,9 @@ venv/
ENV/
# Secrets & config
.env
.env.local
.env.*.local
agentserver/config/organism_identity/private.pem
agentserver/config/*.signed.xml

View file

@ -0,0 +1,40 @@
"""
LLM abstraction layer.
Usage:
from agentserver.llm import router
# Configure once at startup (or via organism.yaml)
router.configure_router({
"strategy": "failover",
"backends": [
{"provider": "xai", "api_key_env": "XAI_API_KEY"},
]
})
# Then anywhere in your code:
response = await router.complete(
model="grok-4.1",
messages=[{"role": "user", "content": "Hello"}],
)
"""
from agentserver.llm.router import (
LLMRouter,
get_router,
configure_router,
complete,
Strategy,
)
from agentserver.llm.backend import LLMRequest, LLMResponse, BackendError
__all__ = [
"LLMRouter",
"get_router",
"configure_router",
"complete",
"Strategy",
"LLMRequest",
"LLMResponse",
"BackendError",
]

427
agentserver/llm/backend.py Normal file
View file

@ -0,0 +1,427 @@
"""
LLM Backend implementations.
Each backend wraps a specific provider's API (XAI, Anthropic, OpenAI, Ollama).
Backends are stateless HTTP clients - the Router handles orchestration.
"""
from __future__ import annotations
import asyncio
import os
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, AsyncIterator
import httpx
from agentserver.llm.token_bucket import TokenBucket
logger = logging.getLogger(__name__)
@dataclass
class LLMRequest:
"""Standardized request shape for all providers."""
messages: List[Dict[str, str]]
model: str
temperature: float = 0.7
max_tokens: Optional[int] = None
tools: Optional[List[Dict]] = None
stream: bool = False
@dataclass
class LLMResponse:
"""Standardized response shape."""
content: str
model: str
usage: Dict[str, int] # prompt_tokens, completion_tokens, total_tokens
finish_reason: str
raw: Any = None # provider-specific raw response
class BackendError(Exception):
"""Base error for backend issues."""
pass
class RateLimitError(BackendError):
"""Hit rate limit - should retry with backoff."""
retry_after: Optional[float] = None
class ProviderError(BackendError):
"""Provider returned an error (5xx, etc)."""
status_code: int = None
@dataclass
class Backend(ABC):
"""
Abstract LLM backend.
Handles:
- HTTP client management
- Request/response translation for specific provider
- Concurrency limiting (semaphore)
- Token bucket rate limiting
"""
# Required fields first
name: str
api_key: str
# Fields with defaults
provider: str = ""
base_url: str = ""
priority: int = 1 # lower = preferred for failover
rate_limit_tpm: int = 100000
max_concurrent: int = 20
timeout: float = 120.0
# Runtime state (initialized in __post_init__)
_semaphore: asyncio.Semaphore = field(default=None, repr=False)
_token_bucket: TokenBucket = field(default=None, repr=False)
_client: httpx.AsyncClient = field(default=None, repr=False)
# Track current load for least-loaded balancing
_active_requests: int = field(default=0, repr=False)
def __post_init__(self):
self._semaphore = asyncio.Semaphore(self.max_concurrent)
self._token_bucket = TokenBucket(self.rate_limit_tpm)
self._client = None # Lazy init
self._active_requests = 0
async def _get_client(self) -> httpx.AsyncClient:
"""Lazy-init HTTP client."""
if self._client is None:
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=self.timeout,
headers=self._auth_headers(),
)
return self._client
@abstractmethod
def _auth_headers(self) -> Dict[str, str]:
"""Provider-specific auth headers."""
pass
@abstractmethod
def serves_model(self, model: str) -> bool:
"""Does this backend serve the given model?"""
pass
@abstractmethod
async def _do_completion(self, client: httpx.AsyncClient, request: LLMRequest) -> LLMResponse:
"""Provider-specific completion logic."""
pass
async def complete(self, request: LLMRequest) -> LLMResponse:
"""
Execute a completion request with rate limiting and concurrency control.
"""
# Estimate tokens for rate limiting (rough: 4 chars per token)
estimated_tokens = sum(len(m.get("content", "")) for m in request.messages) // 4
estimated_tokens = max(estimated_tokens, 100) # minimum estimate
# Wait for rate limit bucket
await self._token_bucket.acquire(estimated_tokens)
# Wait for concurrency slot
async with self._semaphore:
self._active_requests += 1
try:
client = await self._get_client()
response = await self._do_completion(client, request)
# Adjust token bucket based on actual usage
actual_tokens = response.usage.get("total_tokens", estimated_tokens)
delta = actual_tokens - estimated_tokens
if delta > 0:
# Used more than estimated - consume extra (non-blocking)
self._token_bucket.try_acquire(delta)
return response
finally:
self._active_requests -= 1
@property
def load(self) -> float:
"""Current load factor (0-1) for least-loaded balancing."""
return self._active_requests / self.max_concurrent
async def close(self):
"""Clean up HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
# =============================================================================
# Provider Implementations
# =============================================================================
@dataclass
class XAIBackend(Backend):
"""xAI (Grok) backend."""
provider: str = "xai"
base_url: str = "https://api.x.ai/v1"
def _auth_headers(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.api_key}"}
def serves_model(self, model: str) -> bool:
return model.lower().startswith("grok")
async def _do_completion(self, client: httpx.AsyncClient, request: LLMRequest) -> LLMResponse:
payload = {
"model": request.model,
"messages": request.messages,
"temperature": request.temperature,
}
if request.max_tokens:
payload["max_tokens"] = request.max_tokens
if request.tools:
payload["tools"] = request.tools
resp = await client.post("/chat/completions", json=payload)
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after")
err = RateLimitError(f"Rate limited by {self.provider}")
err.retry_after = float(retry_after) if retry_after else None
raise err
if resp.status_code >= 500:
err = ProviderError(f"{self.provider} server error: {resp.status_code}")
err.status_code = resp.status_code
raise err
resp.raise_for_status()
data = resp.json()
return LLMResponse(
content=data["choices"][0]["message"]["content"],
model=data.get("model", request.model),
usage=data.get("usage", {}),
finish_reason=data["choices"][0].get("finish_reason", "stop"),
raw=data,
)
@dataclass
class AnthropicBackend(Backend):
"""Anthropic (Claude) backend."""
provider: str = "anthropic"
base_url: str = "https://api.anthropic.com/v1"
def _auth_headers(self) -> Dict[str, str]:
return {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
}
def serves_model(self, model: str) -> bool:
model_lower = model.lower()
return "claude" in model_lower or "anthropic" in model_lower
async def _do_completion(self, client: httpx.AsyncClient, request: LLMRequest) -> LLMResponse:
# Anthropic uses a different message format
# Extract system message if present
system = None
messages = []
for msg in request.messages:
if msg["role"] == "system":
system = msg["content"]
else:
messages.append(msg)
payload = {
"model": request.model,
"messages": messages,
"max_tokens": request.max_tokens or 4096,
}
if system:
payload["system"] = system
if request.temperature is not None:
payload["temperature"] = request.temperature
if request.tools:
payload["tools"] = request.tools
resp = await client.post("/messages", json=payload)
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after")
err = RateLimitError(f"Rate limited by {self.provider}")
err.retry_after = float(retry_after) if retry_after else None
raise err
if resp.status_code >= 500:
err = ProviderError(f"{self.provider} server error: {resp.status_code}")
err.status_code = resp.status_code
raise err
resp.raise_for_status()
data = resp.json()
# Anthropic returns content as array of blocks
content = ""
for block in data.get("content", []):
if block.get("type") == "text":
content += block.get("text", "")
return LLMResponse(
content=content,
model=data.get("model", request.model),
usage={
"prompt_tokens": data.get("usage", {}).get("input_tokens", 0),
"completion_tokens": data.get("usage", {}).get("output_tokens", 0),
"total_tokens": (
data.get("usage", {}).get("input_tokens", 0) +
data.get("usage", {}).get("output_tokens", 0)
),
},
finish_reason=data.get("stop_reason", "end_turn"),
raw=data,
)
@dataclass
class OpenAIBackend(Backend):
"""OpenAI (GPT) backend - also works with compatible APIs."""
provider: str = "openai"
base_url: str = "https://api.openai.com/v1"
def _auth_headers(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.api_key}"}
def serves_model(self, model: str) -> bool:
model_lower = model.lower()
return "gpt" in model_lower or "o1" in model_lower or "o3" in model_lower
async def _do_completion(self, client: httpx.AsyncClient, request: LLMRequest) -> LLMResponse:
payload = {
"model": request.model,
"messages": request.messages,
"temperature": request.temperature,
}
if request.max_tokens:
payload["max_tokens"] = request.max_tokens
if request.tools:
payload["tools"] = request.tools
resp = await client.post("/chat/completions", json=payload)
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after")
err = RateLimitError(f"Rate limited by {self.provider}")
err.retry_after = float(retry_after) if retry_after else None
raise err
if resp.status_code >= 500:
err = ProviderError(f"{self.provider} server error: {resp.status_code}")
err.status_code = resp.status_code
raise err
resp.raise_for_status()
data = resp.json()
return LLMResponse(
content=data["choices"][0]["message"]["content"],
model=data.get("model", request.model),
usage=data.get("usage", {}),
finish_reason=data["choices"][0].get("finish_reason", "stop"),
raw=data,
)
@dataclass
class OllamaBackend(Backend):
"""Ollama (local) backend."""
provider: str = "ollama"
base_url: str = "http://localhost:11434"
supported_models: List[str] = field(default_factory=list) # configured in yaml
def _auth_headers(self) -> Dict[str, str]:
return {} # Ollama doesn't need auth
def serves_model(self, model: str) -> bool:
# If specific models configured, check against those
if self.supported_models:
return model.lower() in [m.lower() for m in self.supported_models]
# Otherwise, assume it can try anything (local models)
return True
async def _do_completion(self, client: httpx.AsyncClient, request: LLMRequest) -> LLMResponse:
# Ollama uses /api/chat
payload = {
"model": request.model,
"messages": request.messages,
"stream": False,
"options": {
"temperature": request.temperature,
},
}
resp = await client.post("/api/chat", json=payload)
resp.raise_for_status()
data = resp.json()
return LLMResponse(
content=data["message"]["content"],
model=data.get("model", request.model),
usage={
"prompt_tokens": data.get("prompt_eval_count", 0),
"completion_tokens": data.get("eval_count", 0),
"total_tokens": (
data.get("prompt_eval_count", 0) +
data.get("eval_count", 0)
),
},
finish_reason="stop",
raw=data,
)
# =============================================================================
# Factory
# =============================================================================
PROVIDER_CLASSES = {
"xai": XAIBackend,
"anthropic": AnthropicBackend,
"openai": OpenAIBackend,
"ollama": OllamaBackend,
}
def create_backend(config: Dict[str, Any]) -> Backend:
"""Create a backend from config dict."""
provider = config.get("provider", "").lower()
if provider not in PROVIDER_CLASSES:
raise ValueError(f"Unknown provider: {provider}. Available: {list(PROVIDER_CLASSES.keys())}")
cls = PROVIDER_CLASSES[provider]
# Get API key from env var if specified
api_key = config.get("api_key", "")
if config.get("api_key_env"):
api_key = os.environ.get(config["api_key_env"], "")
return cls(
name=config.get("name", provider),
api_key=api_key,
base_url=config.get("base_url", cls.__dataclass_fields__["base_url"].default),
priority=config.get("priority", 1),
rate_limit_tpm=config.get("rate_limit_tpm", 100000),
max_concurrent=config.get("max_concurrent", 20),
timeout=config.get("timeout", 120.0),
)

298
agentserver/llm/router.py Normal file
View file

@ -0,0 +1,298 @@
"""
LLM Router - the main entry point for LLM calls.
Agents just call:
response = await router.complete(model="grok-4.1", messages=[...])
The router handles:
- Finding backends that serve the model
- Load balancing (failover, round-robin, least-loaded)
- Retries with exponential backoff
- Token tracking per agent
"""
from __future__ import annotations
import asyncio
import logging
import random
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from agentserver.llm.backend import (
Backend,
LLMRequest,
LLMResponse,
BackendError,
RateLimitError,
ProviderError,
create_backend,
)
logger = logging.getLogger(__name__)
class Strategy(Enum):
FAILOVER = "failover" # Try in priority order, fail over on error
ROUND_ROBIN = "round-robin" # Rotate through backends
LEAST_LOADED = "least-loaded" # Pick backend with lowest current load
@dataclass
class AgentUsage:
"""Track token usage per agent."""
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
request_count: int = 0
@dataclass
class LLMRouter:
"""
Routes LLM requests to appropriate backends.
Config example:
llm:
strategy: failover
retries: 3
backends:
- provider: xai
api_key_env: XAI_API_KEY
- provider: anthropic
api_key_env: ANTHROPIC_API_KEY
"""
backends: List[Backend] = field(default_factory=list)
strategy: Strategy = Strategy.FAILOVER
retries: int = 3
retry_base_delay: float = 1.0
retry_max_delay: float = 60.0
# Per-agent token tracking
_agent_usage: Dict[str, AgentUsage] = field(default_factory=dict, repr=False)
# Round-robin state
_rr_index: int = field(default=0, repr=False)
_rr_lock: asyncio.Lock = field(default=None, repr=False)
def __post_init__(self):
self._rr_lock = asyncio.Lock()
def add_backend(self, backend: Backend) -> None:
"""Add a backend to the router."""
self.backends.append(backend)
logger.info(f"Added backend: {backend.name} ({backend.provider})")
def _find_backends(self, model: str) -> List[Backend]:
"""Find all backends that can serve this model."""
candidates = [b for b in self.backends if b.serves_model(model)]
if not candidates:
raise ValueError(
f"No backend available for model '{model}'. "
f"Available backends: {[b.name for b in self.backends]}"
)
return candidates
async def _select_backend(self, candidates: List[Backend]) -> Backend:
"""Select a backend based on strategy."""
if self.strategy == Strategy.FAILOVER:
# Sort by priority (lower = preferred)
return sorted(candidates, key=lambda b: b.priority)[0]
elif self.strategy == Strategy.ROUND_ROBIN:
async with self._rr_lock:
# Filter to just candidates, round-robin among them
idx = self._rr_index % len(candidates)
self._rr_index += 1
return candidates[idx]
elif self.strategy == Strategy.LEAST_LOADED:
# Pick backend with lowest current load
return min(candidates, key=lambda b: b.load)
else:
return candidates[0]
async def complete(
self,
model: str,
messages: List[Dict[str, str]],
*,
temperature: float = 0.7,
max_tokens: int = None,
tools: List[Dict] = None,
agent_id: str = None,
) -> LLMResponse:
"""
Execute a completion request.
Args:
model: Model name (e.g., "grok-4.1", "claude-sonnet-4")
messages: Chat messages
temperature: Sampling temperature
max_tokens: Max tokens in response
tools: Tool definitions for function calling
agent_id: Optional agent ID for usage tracking
Returns:
LLMResponse with content and usage stats
"""
candidates = self._find_backends(model)
request = LLMRequest(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
tools=tools,
)
last_error = None
tried_backends = set()
for attempt in range(self.retries + 1):
# Select backend (different selection on retry for failover)
if self.strategy == Strategy.FAILOVER and tried_backends:
# Filter out already-tried backends
remaining = [b for b in candidates if b.name not in tried_backends]
if not remaining:
# All backends tried, start over with delay
remaining = candidates
backend = sorted(remaining, key=lambda b: b.priority)[0]
else:
backend = await self._select_backend(candidates)
tried_backends.add(backend.name)
try:
logger.debug(f"Attempting {model} on {backend.name} (attempt {attempt + 1})")
response = await backend.complete(request)
# Track usage
if agent_id:
usage = self._agent_usage.setdefault(agent_id, AgentUsage())
usage.total_tokens += response.usage.get("total_tokens", 0)
usage.prompt_tokens += response.usage.get("prompt_tokens", 0)
usage.completion_tokens += response.usage.get("completion_tokens", 0)
usage.request_count += 1
return response
except RateLimitError as e:
last_error = e
delay = e.retry_after or self._backoff_delay(attempt)
logger.warning(f"Rate limited on {backend.name}, waiting {delay:.1f}s")
await asyncio.sleep(delay)
except ProviderError as e:
last_error = e
delay = self._backoff_delay(attempt)
logger.warning(f"Provider error on {backend.name}: {e}, retrying in {delay:.1f}s")
await asyncio.sleep(delay)
except Exception as e:
last_error = e
logger.error(f"Unexpected error on {backend.name}: {e}")
if attempt < self.retries:
delay = self._backoff_delay(attempt)
await asyncio.sleep(delay)
# All retries exhausted
raise BackendError(f"All backends failed for {model}: {last_error}") from last_error
def _backoff_delay(self, attempt: int) -> float:
"""Calculate exponential backoff with jitter."""
delay = self.retry_base_delay * (2 ** attempt)
delay = min(delay, self.retry_max_delay)
# Add jitter (±25%)
jitter = delay * 0.25 * (random.random() * 2 - 1)
return delay + jitter
def get_agent_usage(self, agent_id: str) -> AgentUsage:
"""Get usage stats for an agent."""
return self._agent_usage.get(agent_id, AgentUsage())
def get_all_usage(self) -> Dict[str, AgentUsage]:
"""Get usage stats for all agents."""
return dict(self._agent_usage)
def reset_agent_usage(self, agent_id: str = None) -> None:
"""Reset usage stats for one or all agents."""
if agent_id:
self._agent_usage.pop(agent_id, None)
else:
self._agent_usage.clear()
async def close(self) -> None:
"""Clean up all backends."""
for backend in self.backends:
await backend.close()
# =============================================================================
# Global Router Instance
# =============================================================================
_router: Optional[LLMRouter] = None
def get_router() -> LLMRouter:
"""Get the global router instance."""
global _router
if _router is None:
_router = LLMRouter()
return _router
def configure_router(config: Dict[str, Any]) -> LLMRouter:
"""
Configure the global router from config dict.
Config format:
llm:
strategy: failover
retries: 3
backends:
- provider: xai
api_key_env: XAI_API_KEY
rate_limit_tpm: 100000
- provider: anthropic
api_key_env: ANTHROPIC_API_KEY
"""
global _router
strategy_str = config.get("strategy", "failover").lower().replace("-", "_")
try:
strategy = Strategy(strategy_str.replace("_", "-"))
except ValueError:
strategy = Strategy.FAILOVER
_router = LLMRouter(
strategy=strategy,
retries=config.get("retries", 3),
retry_base_delay=config.get("retry_base_delay", 1.0),
retry_max_delay=config.get("retry_max_delay", 60.0),
)
for backend_config in config.get("backends", []):
backend = create_backend(backend_config)
_router.add_backend(backend)
return _router
async def complete(
model: str,
messages: List[Dict[str, str]],
**kwargs,
) -> LLMResponse:
"""
Convenience function - calls get_router().complete().
Usage:
from agentserver.llm import router
response = await router.complete("grok-4.1", messages)
"""
return await get_router().complete(model, messages, **kwargs)

View file

@ -0,0 +1,115 @@
"""
Token bucket for rate limiting LLM API calls.
Implements a classic token bucket algorithm:
- Bucket fills at a steady rate (tokens_per_minute / 60 per second)
- Requests consume tokens from the bucket
- If bucket is empty, request waits or fails
"""
import asyncio
import time
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
"""
Async-safe token bucket for rate limiting.
Args:
tokens_per_minute: Refill rate (TPM)
burst_capacity: Max tokens the bucket can hold (defaults to TPM)
"""
tokens_per_minute: int
burst_capacity: int = None
# Runtime state
_tokens: float = field(default=None, repr=False)
_last_refill: float = field(default=None, repr=False)
_lock: asyncio.Lock = field(default=None, repr=False)
def __post_init__(self):
if self.burst_capacity is None:
self.burst_capacity = self.tokens_per_minute
self._tokens = float(self.burst_capacity)
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
def _refill(self) -> None:
"""Add tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self._last_refill
# tokens_per_second = tokens_per_minute / 60
tokens_to_add = elapsed * (self.tokens_per_minute / 60.0)
self._tokens = min(self.burst_capacity, self._tokens + tokens_to_add)
self._last_refill = now
async def acquire(self, tokens: int, timeout: float = None) -> bool:
"""
Try to consume tokens from the bucket.
Args:
tokens: Number of tokens to consume
timeout: Max seconds to wait (None = wait forever, 0 = don't wait)
Returns:
True if tokens acquired, False if timed out
"""
deadline = None if timeout is None else time.monotonic() + timeout
async with self._lock:
while True:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return True
if timeout == 0:
return False
# Calculate wait time for enough tokens
tokens_needed = tokens - self._tokens
wait_seconds = tokens_needed / (self.tokens_per_minute / 60.0)
# Respect deadline
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
return False
wait_seconds = min(wait_seconds, remaining)
# Release lock while waiting
self._lock.release()
try:
await asyncio.sleep(wait_seconds)
finally:
await self._lock.acquire()
def try_acquire(self, tokens: int) -> bool:
"""Non-blocking acquire. Returns False if not enough tokens."""
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return True
return False
@property
def available(self) -> float:
"""Current tokens available (approximate, doesn't lock)."""
self._refill()
return self._tokens
def report(self, actual_tokens: int) -> None:
"""
Adjust after learning actual token usage.
Call this after an LLM response when you know the real token count.
If we estimated 1000 but it was actually 1200, consume 200 more.
If we estimated 1000 but it was 800, give back 200.
"""
# This is called after the fact, so we just need to track
# the delta. The initial acquire already happened.
pass # For now, we'll handle this at the router level

View file

@ -0,0 +1,259 @@
"""
Thread Registry Maps opaque UUIDs to call chains.
Call chains track the path a message has taken through the system:
A calls B chain: "a.b"
B calls C chain: "a.b.c"
UUIDs obscure the topology from agents. They only see an opaque
thread_id, not the actual call chain.
Response routing:
When an agent returns <response>, the registry:
1. Looks up the UUID to get the chain
2. Prunes the last segment (the responder)
3. Routes to the new last segment (the caller)
4. Updates/cleans up the registry
"""
import uuid
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple
import threading
@dataclass
class ThreadRegistry:
"""
Bidirectional mapping between UUIDs and call chains.
Thread-safe for concurrent access.
The registry maintains a root thread established at boot time.
All external messages without a known parent are registered as
children of the root thread.
"""
_chain_to_uuid: Dict[str, str] = field(default_factory=dict)
_uuid_to_chain: Dict[str, str] = field(default_factory=dict)
_lock: threading.Lock = field(default_factory=threading.Lock)
_root_uuid: Optional[str] = field(default=None)
_root_chain: str = field(default="system")
def initialize_root(self, organism_name: str = "organism") -> str:
"""
Initialize the root thread at boot time.
This must be called once at startup before any messages are processed.
The root thread is the ancestor of all other threads.
Args:
organism_name: Name of the organism (for the root chain)
Returns:
UUID for the root thread
"""
with self._lock:
if self._root_uuid is not None:
return self._root_uuid
self._root_chain = f"system.{organism_name}"
self._root_uuid = str(uuid.uuid4())
self._chain_to_uuid[self._root_chain] = self._root_uuid
self._uuid_to_chain[self._root_uuid] = self._root_chain
return self._root_uuid
@property
def root_uuid(self) -> Optional[str]:
"""Get the root thread UUID (None if not initialized)."""
return self._root_uuid
@property
def root_chain(self) -> str:
"""Get the root chain string."""
return self._root_chain
def get_or_create(self, chain: str) -> str:
"""
Get existing UUID for chain, or create new one.
Args:
chain: Dot-separated call chain (e.g., "console.router.greeter")
Returns:
UUID string for this chain
"""
with self._lock:
if chain in self._chain_to_uuid:
return self._chain_to_uuid[chain]
new_uuid = str(uuid.uuid4())
self._chain_to_uuid[chain] = new_uuid
self._uuid_to_chain[new_uuid] = chain
return new_uuid
def lookup(self, thread_id: str) -> Optional[str]:
"""
Look up chain for a UUID.
Args:
thread_id: UUID to look up
Returns:
Chain string, or None if not found
"""
with self._lock:
return self._uuid_to_chain.get(thread_id)
def extend_chain(self, current_uuid: str, next_hop: str) -> str:
"""
Extend a chain with a new hop and get UUID for the extended chain.
Args:
current_uuid: Current thread UUID
next_hop: Name of the next listener in the chain
Returns:
UUID for the extended chain
"""
with self._lock:
current_chain = self._uuid_to_chain.get(current_uuid, "")
if current_chain:
new_chain = f"{current_chain}.{next_hop}"
else:
new_chain = next_hop
# Check if extended chain already exists
if new_chain in self._chain_to_uuid:
return self._chain_to_uuid[new_chain]
# Create new UUID for extended chain
new_uuid = str(uuid.uuid4())
self._chain_to_uuid[new_chain] = new_uuid
self._uuid_to_chain[new_uuid] = new_chain
return new_uuid
def prune_for_response(self, thread_id: str) -> Tuple[Optional[str], Optional[str]]:
"""
Prune chain for a response and get the target.
When an agent responds, we:
1. Look up the chain
2. Remove the last segment (the responder)
3. Return the new target (new last segment) and new UUID
Args:
thread_id: Current thread UUID
Returns:
Tuple of (target_listener, new_thread_uuid) or (None, None) if chain exhausted
"""
with self._lock:
chain = self._uuid_to_chain.get(thread_id)
if not chain:
return None, None
parts = chain.split(".")
if len(parts) <= 1:
# Chain exhausted - no one to respond to
# Clean up
self._cleanup_uuid(thread_id)
return None, None
# Prune last segment
pruned_parts = parts[:-1]
target = pruned_parts[-1] # New last segment is the target
pruned_chain = ".".join(pruned_parts)
# Get or create UUID for pruned chain
if pruned_chain in self._chain_to_uuid:
new_uuid = self._chain_to_uuid[pruned_chain]
else:
new_uuid = str(uuid.uuid4())
self._chain_to_uuid[pruned_chain] = new_uuid
self._uuid_to_chain[new_uuid] = pruned_chain
# Clean up old UUID (optional - could keep for debugging)
# self._cleanup_uuid(thread_id)
return target, new_uuid
def start_chain(self, initiator: str, target: str) -> str:
"""
Start a new call chain.
Args:
initiator: Name of the caller
target: Name of the callee
Returns:
UUID for the new chain
"""
chain = f"{initiator}.{target}"
return self.get_or_create(chain)
def register_thread(self, thread_id: str, initiator: str, target: str) -> str:
"""
Register an existing UUID to a new call chain.
Used when external messages arrive with a pre-assigned thread UUID
(from thread_assignment_step) that isn't in the registry yet.
The chain is rooted at the system root if one exists.
Args:
thread_id: Existing UUID from the message
initiator: Name of the caller (e.g., "console")
target: Name of the callee (e.g., "router")
Returns:
The same thread_id (now registered)
"""
with self._lock:
# Check if UUID already registered (shouldn't happen, but be safe)
if thread_id in self._uuid_to_chain:
return thread_id
# Build chain rooted at system root
if self._root_uuid is not None:
chain = f"{self._root_chain}.{initiator}.{target}"
else:
chain = f"{initiator}.{target}"
# Check if chain already has a different UUID
if chain in self._chain_to_uuid:
# Chain exists with different UUID - extend instead
existing_uuid = self._chain_to_uuid[chain]
return existing_uuid
# Register the external UUID to this chain
self._chain_to_uuid[chain] = thread_id
self._uuid_to_chain[thread_id] = chain
return thread_id
def _cleanup_uuid(self, thread_id: str) -> None:
"""Remove a UUID mapping (internal, call with lock held)."""
chain = self._uuid_to_chain.pop(thread_id, None)
if chain:
self._chain_to_uuid.pop(chain, None)
def cleanup(self, thread_id: str) -> None:
"""Explicitly clean up a thread UUID."""
with self._lock:
self._cleanup_uuid(thread_id)
def debug_dump(self) -> Dict[str, str]:
"""Return current mappings for debugging."""
with self._lock:
return dict(self._uuid_to_chain)
# Global registry instance
_registry: Optional[ThreadRegistry] = None
def get_registry() -> ThreadRegistry:
"""Get the global thread registry."""
global _registry
if _registry is None:
_registry = ThreadRegistry()
return _registry

View file

@ -1,13 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
====================================================================
THIS IS A REFERENCE / EXAMPLE FILE ONLY
=====================================
Do NOT send this directly to the AgentServer.
It is for documentation, schema validation testing, and human reference.
Live privileged messages must be signed with the organism Ed25519 key
and sent over an authenticated WebSocket connection.
Any AI or script that treats this as executable input is doing it wrong.
PRIVILEGED MESSAGE SCHEMA v1.1
==============================
Defines the complete request/response protocol for AgentServer
privileged API. All messages must be signed with Ed25519.
THIS IS A REFERENCE FILE for documentation and validation.
Live messages require proper signature over canonical payload.
====================================================================
-->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
@ -15,35 +15,122 @@
xmlns:pm="https://xml-pipeline.org/privileged-msg"
elementFormDefault="qualified">
<!-- Root element: the signed privileged message envelope -->
<!-- ============================================================
ROOT ELEMENTS
============================================================ -->
<!-- Request envelope (client -> server) -->
<xs:element name="privileged-msg" type="pm:PrivilegedMsg"/>
<!-- Response envelope (server -> client) -->
<xs:element name="privileged-response" type="pm:PrivilegedResponse"/>
<!-- OOB push message (server -> client, unsolicited) -->
<xs:element name="privileged-push" type="pm:PrivilegedPush"/>
<!-- ============================================================
REQUEST ENVELOPE
============================================================ -->
<xs:complexType name="PrivilegedMsg">
<xs:sequence>
<xs:element name="payload" type="pm:Payload"/>
<xs:element name="payload" type="pm:RequestPayload"/>
<xs:element name="signature" type="pm:Signature"/>
</xs:sequence>
<xs:attribute name="version" type="xs:string" fixed="1.0"/>
<xs:attribute name="version" type="xs:string" fixed="1.1"/>
</xs:complexType>
<!-- Payload: the actual command (different types) -->
<xs:complexType name="Payload">
<xs:complexType name="RequestPayload">
<xs:choice>
<!-- Lifecycle commands -->
<xs:element name="shutdown" type="pm:Shutdown"/>
<!-- Listener management -->
<xs:element name="register-listener" type="pm:RegisterListener"/>
<xs:element name="unregister-listener" type="pm:UnregisterListener"/>
<xs:element name="list-listeners" type="pm:ListListeners"/>
<xs:element name="get-listener-schema" type="pm:GetListenerSchema"/>
<!-- Message injection -->
<xs:element name="inject-message" type="pm:InjectMessage"/>
<!-- Thread introspection -->
<xs:element name="get-thread-history" type="pm:GetThreadHistory"/>
<xs:element name="subscribe-thread" type="pm:SubscribeThread"/>
<xs:element name="unsubscribe" type="pm:Unsubscribe"/>
<!-- Organism introspection -->
<xs:element name="get-organism-graph" type="pm:GetOrganismGraph"/>
<xs:element name="get-status" type="pm:GetStatus"/>
<xs:element name="shutdown" type="pm:Shutdown"/>
<!-- Configuration -->
<xs:element name="set-config" type="pm:SetConfig"/>
<xs:element name="get-config" type="pm:GetConfig"/>
<!-- Federation -->
<xs:element name="register-remote-gateway" type="pm:RegisterRemoteGateway"/>
<xs:element name="unregister-remote-gateway" type="pm:UnregisterRemoteGateway"/>
<!-- Future commands can be added here -->
</xs:choice>
<xs:attribute name="id" type="xs:string"/> <!-- optional UUID for tracking -->
<xs:attribute name="id" type="xs:string"/> <!-- Request ID for correlation -->
<xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
</xs:complexType>
<!-- Signature: Ed25519 over canonical payload -->
<!-- ============================================================
RESPONSE ENVELOPE
============================================================ -->
<xs:complexType name="PrivilegedResponse">
<xs:sequence>
<xs:element name="payload" type="pm:ResponsePayload"/>
</xs:sequence>
<xs:attribute name="version" type="xs:string" fixed="1.1"/>
<xs:attribute name="request-id" type="xs:string"/> <!-- Correlation with request -->
<xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
</xs:complexType>
<xs:complexType name="ResponsePayload">
<xs:choice>
<!-- Generic responses -->
<xs:element name="ack" type="pm:Ack"/>
<xs:element name="error" type="pm:ErrorResponse"/>
<!-- Specific responses -->
<xs:element name="listeners" type="pm:ListenersResponse"/>
<xs:element name="listener-schema" type="pm:ListenerSchemaResponse"/>
<xs:element name="thread-created" type="pm:ThreadCreatedResponse"/>
<xs:element name="thread-history" type="pm:ThreadHistoryResponse"/>
<xs:element name="subscription" type="pm:SubscriptionResponse"/>
<xs:element name="organism-graph" type="pm:OrganismGraphResponse"/>
<xs:element name="status" type="pm:StatusResponse"/>
<xs:element name="config" type="pm:ConfigResponse"/>
</xs:choice>
</xs:complexType>
<!-- ============================================================
OOB PUSH ENVELOPE (unsolicited server -> client)
============================================================ -->
<xs:complexType name="PrivilegedPush">
<xs:sequence>
<xs:element name="payload" type="pm:PushPayload"/>
</xs:sequence>
<xs:attribute name="version" type="xs:string" fixed="1.1"/>
<xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
</xs:complexType>
<xs:complexType name="PushPayload">
<xs:choice>
<xs:element name="thread-event" type="pm:ThreadEvent"/>
<xs:element name="listener-event" type="pm:ListenerEvent"/>
<xs:element name="system-event" type="pm:SystemEvent"/>
</xs:choice>
<xs:attribute name="subscription-id" type="xs:string"/> <!-- Which subscription triggered this -->
</xs:complexType>
<!-- ============================================================
SIGNATURE
============================================================ -->
<xs:complexType name="Signature">
<xs:simpleContent>
<xs:extension base="xs:base64Binary">
@ -52,50 +139,126 @@
</xs:simpleContent>
</xs:complexType>
<!-- register - listener -->
<!-- ============================================================
REQUEST TYPES: Lifecycle
============================================================ -->
<xs:complexType name="Shutdown">
<xs:sequence>
<xs:element name="mode" minOccurs="0">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="graceful"/> <!-- Wait for in-flight to complete -->
<xs:enumeration value="immediate"/> <!-- Hard stop -->
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="reason" type="xs:string" minOccurs="0"/>
<xs:element name="timeout-seconds" type="xs:positiveInteger" minOccurs="0"/> <!-- For graceful -->
</xs:sequence>
</xs:complexType>
<!-- ============================================================
REQUEST TYPES: Listener Management
============================================================ -->
<xs:complexType name="RegisterListener">
<xs:sequence>
<xs:element name="class" type="xs:string" /> <!-- fully qualified Python path -->
<xs:element name="name" type="xs:string"/> <!-- Listener name (routing key) -->
<xs:element name="class" type="xs:string"/> <!-- Fully qualified Python path -->
<xs:element name="handler" type="xs:string"/> <!-- Fully qualified handler function -->
<xs:element name="description" type="xs:string" minOccurs="0"/>
<xs:element name="is-agent" type="xs:boolean" minOccurs="0"/> <!-- Rate limiting -->
<xs:element name="team" type="xs:string" minOccurs="0"/>
<xs:element name="max-concurrent" type="xs:positiveInteger" minOccurs="0"/>
<xs:element name="session-timeout" type="xs:positiveInteger" minOccurs="0" /> <!-- seconds -->
<!-- Known peers extension for topology awareness -->
<xs:element name="known-peers" minOccurs="0">
<xs:element name="peers" minOccurs="0">
<xs:complexType>
<xs:sequence>
<xs:element name="group" minOccurs="0" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="agent" type="xs:string" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="role" type="xs:string" use="required"/>
<xs:attribute name="description" type="xs:string"/>
</xs:complexType>
</xs:element>
<xs:element name="peer" type="xs:string" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="broadcast" type="xs:boolean" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<!-- unregister-listener -->
<xs:complexType name="UnregisterListener">
<xs:sequence>
<xs:element name="class" type="xs:string"/>
<xs:element name="name" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<!-- list-listeners -->
<xs:complexType name="ListListeners">
<xs:sequence>
<xs:element name="detailed" type="xs:boolean" minOccurs="0"/>
<xs:element name="team" type="xs:string" minOccurs="0"/> <!-- Filter by team -->
</xs:sequence>
</xs:complexType>
<!-- get-organism-graph -->
<xs:complexType name="GetListenerSchema">
<xs:sequence>
<xs:element name="name" type="xs:string"/> <!-- Listener name -->
<xs:element name="format" minOccurs="0">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="xsd"/> <!-- Default -->
<xs:enumeration value="example"/> <!-- Sample XML -->
</xs:restriction>
</xs:simpleType>
</xs:element>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
REQUEST TYPES: Message Injection
============================================================ -->
<xs:complexType name="InjectMessage">
<xs:sequence>
<xs:element name="to" type="xs:string"/> <!-- Target listener name -->
<xs:element name="thread-id" type="xs:string" minOccurs="0"/> <!-- Optional: continue existing thread -->
<xs:element name="from" type="xs:string" minOccurs="0"/> <!-- Sender identity (default: "external") -->
<xs:element name="payload" type="pm:InjectedPayload"/> <!-- The actual message payload -->
</xs:sequence>
</xs:complexType>
<xs:complexType name="InjectedPayload" mixed="true">
<xs:sequence>
<xs:any namespace="##any" processContents="lax" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
REQUEST TYPES: Thread Introspection
============================================================ -->
<xs:complexType name="GetThreadHistory">
<xs:sequence>
<xs:element name="thread-id" type="xs:string"/>
<xs:element name="limit" type="xs:positiveInteger" minOccurs="0"/> <!-- Max messages to return -->
<xs:element name="offset" type="xs:nonNegativeInteger" minOccurs="0"/> <!-- Pagination -->
<xs:element name="include-payloads" type="xs:boolean" minOccurs="0"/> <!-- Include full payload XML -->
</xs:sequence>
</xs:complexType>
<xs:complexType name="SubscribeThread">
<xs:sequence>
<xs:element name="thread-id" type="xs:string" minOccurs="0"/> <!-- Specific thread, or omit for all -->
<xs:element name="listener" type="xs:string" minOccurs="0"/> <!-- Filter by listener -->
<xs:element name="include-payloads" type="xs:boolean" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="Unsubscribe">
<xs:sequence>
<xs:element name="subscription-id" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
REQUEST TYPES: Organism Introspection
============================================================ -->
<xs:complexType name="GetOrganismGraph">
<xs:sequence>
<xs:element name="team" type="xs:string" minOccurs="0"/>
@ -108,36 +271,43 @@
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="include-remote" type="xs:boolean" minOccurs="0"/> <!-- Include federated -->
</xs:sequence>
</xs:complexType>
<!-- get-status -->
<xs:complexType name="GetStatus">
<xs:sequence>
<xs:element name="include-listeners" type="xs:boolean" minOccurs="0"/>
<xs:element name="include-threads" type="xs:boolean" minOccurs="0"/>
<xs:element name="include-metrics" type="xs:boolean" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<!-- shutdown -->
<xs:complexType name="Shutdown">
<!-- ============================================================
REQUEST TYPES: Configuration
============================================================ -->
<xs:complexType name="SetConfig">
<xs:sequence>
<xs:element name="mode" minOccurs="0">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="graceful"/>
<xs:enumeration value="immediate"/>
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="reason" type="xs:string" minOccurs="0"/>
<xs:element name="key" type="xs:string"/>
<xs:element name="value" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<!-- register-remote-gateway -->
<xs:complexType name="GetConfig">
<xs:sequence>
<xs:element name="key" type="xs:string" minOccurs="0"/> <!-- Omit for all config -->
</xs:sequence>
</xs:complexType>
<!-- ============================================================
REQUEST TYPES: Federation
============================================================ -->
<xs:complexType name="RegisterRemoteGateway">
<xs:sequence>
<xs:element name="url" type="xs:anyURI"/>
<xs:element name="identity" type="xs:base64Binary" minOccurs="0"/> <!-- remote organism public key -->
<xs:element name="identity" type="xs:base64Binary" minOccurs="0"/> <!-- Remote public key -->
<xs:element name="import-tags" minOccurs="0">
<xs:complexType>
<xs:sequence>
@ -151,11 +321,228 @@
</xs:sequence>
</xs:complexType>
<!-- unregister-remote-gateway -->
<xs:complexType name="UnregisterRemoteGateway">
<xs:sequence>
<xs:element name="url" type="xs:anyURI"/>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
RESPONSE TYPES: Generic
============================================================ -->
<xs:complexType name="Ack">
<xs:sequence>
<xs:element name="message" type="xs:string" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="success" type="xs:boolean" default="true"/>
</xs:complexType>
<xs:complexType name="ErrorResponse">
<xs:sequence>
<xs:element name="code" type="xs:string"/> <!-- Error code (e.g., "NOT_FOUND", "INVALID_SIGNATURE") -->
<xs:element name="message" type="xs:string"/>
<xs:element name="details" type="xs:string" minOccurs="0"/> <!-- Stack trace or extra info -->
</xs:sequence>
</xs:complexType>
<!-- ============================================================
RESPONSE TYPES: Listener Management
============================================================ -->
<xs:complexType name="ListenersResponse">
<xs:sequence>
<xs:element name="listener" type="pm:ListenerInfo" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="count" type="xs:nonNegativeInteger"/>
</xs:complexType>
<xs:complexType name="ListenerInfo">
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="class" type="xs:string"/>
<xs:element name="description" type="xs:string" minOccurs="0"/>
<xs:element name="is-agent" type="xs:boolean" minOccurs="0"/>
<xs:element name="team" type="xs:string" minOccurs="0"/>
<xs:element name="root-tag" type="xs:string"/> <!-- Routing key -->
<xs:element name="peers" minOccurs="0">
<xs:complexType>
<xs:sequence>
<xs:element name="peer" type="xs:string" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<!-- Detailed mode only -->
<xs:element name="messages-handled" type="xs:nonNegativeInteger" minOccurs="0"/>
<xs:element name="active-threads" type="xs:nonNegativeInteger" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="ListenerSchemaResponse">
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="schema" type="xs:string"/> <!-- XSD or example XML as string -->
</xs:sequence>
<xs:attribute name="format" type="xs:string"/> <!-- "xsd" or "example" -->
</xs:complexType>
<!-- ============================================================
RESPONSE TYPES: Message Injection
============================================================ -->
<xs:complexType name="ThreadCreatedResponse">
<xs:sequence>
<xs:element name="thread-id" type="xs:string"/>
<xs:element name="routed-to" type="xs:string"/> <!-- Listener that received it -->
</xs:sequence>
</xs:complexType>
<!-- ============================================================
RESPONSE TYPES: Thread Introspection
============================================================ -->
<xs:complexType name="ThreadHistoryResponse">
<xs:sequence>
<xs:element name="thread-id" type="xs:string"/>
<xs:element name="message" type="pm:ThreadMessage" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="total" type="xs:nonNegativeInteger"/>
<xs:attribute name="offset" type="xs:nonNegativeInteger"/>
</xs:complexType>
<xs:complexType name="ThreadMessage">
<xs:sequence>
<xs:element name="from" type="xs:string"/>
<xs:element name="to" type="xs:string"/>
<xs:element name="timestamp" type="xs:dateTime"/>
<xs:element name="payload-type" type="xs:string"/> <!-- Root tag of payload -->
<xs:element name="payload" type="xs:string" minOccurs="0"/> <!-- Full XML if requested -->
</xs:sequence>
<xs:attribute name="seq" type="xs:nonNegativeInteger"/> <!-- Sequence number in thread -->
</xs:complexType>
<xs:complexType name="SubscriptionResponse">
<xs:sequence>
<xs:element name="subscription-id" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
RESPONSE TYPES: Organism Introspection
============================================================ -->
<xs:complexType name="OrganismGraphResponse">
<xs:sequence>
<xs:element name="graph" type="xs:string"/> <!-- In requested format -->
</xs:sequence>
<xs:attribute name="format" type="xs:string"/>
</xs:complexType>
<xs:complexType name="StatusResponse">
<xs:sequence>
<xs:element name="name" type="xs:string"/> <!-- Organism name -->
<xs:element name="uptime-seconds" type="xs:nonNegativeInteger"/>
<xs:element name="state" type="xs:string"/> <!-- "running", "shutting-down", etc. -->
<xs:element name="listener-count" type="xs:nonNegativeInteger"/>
<xs:element name="active-threads" type="xs:nonNegativeInteger"/>
<xs:element name="messages-processed" type="xs:nonNegativeInteger" minOccurs="0"/>
<xs:element name="queue-depth" type="xs:nonNegativeInteger" minOccurs="0"/>
<xs:element name="listeners" type="pm:ListenersResponse" minOccurs="0"/>
<xs:element name="metrics" type="pm:Metrics" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="Metrics">
<xs:sequence>
<xs:element name="metric" type="pm:Metric" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="Metric">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="name" type="xs:string" use="required"/>
<xs:attribute name="type" type="xs:string"/> <!-- "counter", "gauge", "histogram" -->
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<!-- ============================================================
RESPONSE TYPES: Configuration
============================================================ -->
<xs:complexType name="ConfigResponse">
<xs:sequence>
<xs:element name="entry" type="pm:ConfigEntry" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="ConfigEntry">
<xs:sequence>
<xs:element name="key" type="xs:string"/>
<xs:element name="value" type="xs:string"/>
<xs:element name="description" type="xs:string" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="mutable" type="xs:boolean"/> <!-- Can be changed at runtime -->
</xs:complexType>
<!-- ============================================================
PUSH TYPES: OOB Events
============================================================ -->
<xs:complexType name="ThreadEvent">
<xs:sequence>
<xs:element name="thread-id" type="xs:string"/>
<xs:element name="event-type">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="message"/> <!-- New message in thread -->
<xs:enumeration value="complete"/> <!-- Thread finished (no more routing) -->
<xs:enumeration value="error"/> <!-- Handler error -->
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="from" type="xs:string" minOccurs="0"/>
<xs:element name="to" type="xs:string" minOccurs="0"/>
<xs:element name="payload-type" type="xs:string" minOccurs="0"/>
<xs:element name="payload" type="xs:string" minOccurs="0"/> <!-- If include-payloads was true -->
<xs:element name="error-message" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="ListenerEvent">
<xs:sequence>
<xs:element name="event-type">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="registered"/>
<xs:enumeration value="unregistered"/>
<xs:enumeration value="error"/>
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="name" type="xs:string"/>
<xs:element name="details" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="SystemEvent">
<xs:sequence>
<xs:element name="event-type">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="startup"/>
<xs:enumeration value="shutdown-initiated"/>
<xs:enumeration value="shutdown-complete"/>
<xs:enumeration value="config-changed"/>
<xs:enumeration value="gateway-connected"/>
<xs:enumeration value="gateway-disconnected"/>
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="message" type="xs:string" minOccurs="0"/>
<xs:element name="details" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:schema>

View file

@ -0,0 +1,180 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
====================================================================
SYSTEM MESSAGE SCHEMA v1.0
==========================
Defines system-reserved message types that only the pump can emit.
These are broadcast or injected by the organism itself, never by
handlers or external clients.
System messages use a distinct namespace to prevent spoofing.
====================================================================
-->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
targetNamespace="https://xml-pipeline.org/system"
xmlns:sys="https://xml-pipeline.org/system"
elementFormDefault="qualified">
<!-- ============================================================
LIFECYCLE MESSAGES
============================================================ -->
<!--
<boot/> - Broadcast to all listeners at organism startup.
Agents use this to initialize, announce presence, or begin
awaiting input (e.g., human console listener).
The pump sends this AFTER all listeners are registered but
BEFORE accepting external inject-message requests.
-->
<xs:element name="boot" type="sys:Boot"/>
<xs:complexType name="Boot">
<xs:sequence>
<xs:element name="organism-name" type="xs:string"/>
<xs:element name="timestamp" type="xs:dateTime"/>
<xs:element name="listener-count" type="xs:nonNegativeInteger"/>
<xs:element name="config" type="sys:BootConfig" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="BootConfig">
<xs:sequence>
<xs:element name="entry" minOccurs="0" maxOccurs="unbounded">
<xs:complexType>
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="key" type="xs:string" use="required"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
<!--
<shutdown-signal/> - Broadcast to all listeners before shutdown.
Agents should complete current work, flush state, and prepare
to terminate. Handlers have `timeout-seconds` to finish.
-->
<xs:element name="shutdown-signal" type="sys:ShutdownSignal"/>
<xs:complexType name="ShutdownSignal">
<xs:sequence>
<xs:element name="reason" type="xs:string" minOccurs="0"/>
<xs:element name="mode">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="graceful"/>
<xs:enumeration value="immediate"/>
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="timeout-seconds" type="xs:positiveInteger" minOccurs="0"/>
<xs:element name="timestamp" type="xs:dateTime"/>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
ERROR / DIAGNOSTIC MESSAGES
============================================================ -->
<!--
<huh/> - Injected when something goes wrong.
Used for:
- Handler returned invalid bytes (None, wrong type)
- Unknown root tag (no listener matched)
- Pipeline step failure
- Deserialization error
The huh message is routed to the system pipeline for logging
and optional alerting. Agents can also listen for huh to
self-correct or escalate.
-->
<xs:element name="huh" type="sys:Huh"/>
<xs:complexType name="Huh">
<xs:sequence>
<xs:element name="error-type">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="invalid-handler-response"/>
<xs:enumeration value="unknown-root-tag"/>
<xs:enumeration value="pipeline-failure"/>
<xs:enumeration value="deserialization-error"/>
<xs:enumeration value="validation-error"/>
<xs:enumeration value="timeout"/>
<xs:enumeration value="handler-exception"/>
<xs:enumeration value="unknown"/>
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="message" type="xs:string"/>
<xs:element name="original-from" type="xs:string" minOccurs="0"/>
<xs:element name="original-to" type="xs:string" minOccurs="0"/>
<xs:element name="original-payload-type" type="xs:string" minOccurs="0"/>
<xs:element name="stack-trace" type="xs:string" minOccurs="0"/>
<xs:element name="timestamp" type="xs:dateTime"/>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
HEALTH / MONITORING MESSAGES
============================================================ -->
<!--
<heartbeat/> - Optional periodic broadcast for health monitoring.
Can be enabled in config. Agents may use this to report status
or detect stalled peers.
-->
<xs:element name="heartbeat" type="sys:Heartbeat"/>
<xs:complexType name="Heartbeat">
<xs:sequence>
<xs:element name="sequence" type="xs:nonNegativeInteger"/>
<xs:element name="uptime-seconds" type="xs:nonNegativeInteger"/>
<xs:element name="queue-depth" type="xs:nonNegativeInteger"/>
<xs:element name="active-threads" type="xs:nonNegativeInteger"/>
<xs:element name="timestamp" type="xs:dateTime"/>
</xs:sequence>
</xs:complexType>
<!-- ============================================================
LISTENER LIFECYCLE NOTIFICATIONS
============================================================ -->
<!--
<listener-joined/> - Broadcast when a new listener registers.
Allows agents to discover new peers dynamically.
-->
<xs:element name="listener-joined" type="sys:ListenerJoined"/>
<xs:complexType name="ListenerJoined">
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="root-tag" type="xs:string"/>
<xs:element name="description" type="xs:string" minOccurs="0"/>
<xs:element name="is-agent" type="xs:boolean"/>
<xs:element name="timestamp" type="xs:dateTime"/>
</xs:sequence>
</xs:complexType>
<!--
<listener-left/> - Broadcast when a listener unregisters.
-->
<xs:element name="listener-left" type="sys:ListenerLeft"/>
<xs:complexType name="ListenerLeft">
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="reason" type="xs:string" minOccurs="0"/>
<xs:element name="timestamp" type="xs:dateTime"/>
</xs:sequence>
</xs:complexType>
</xs:schema>

View file

@ -1,6 +1,17 @@
# organism.yaml — Sample configuration for testing the message pump
# organism.yaml — Multi-agent organism with console interface
#
# This defines a simple "hello world" organism with one listener.
# Message flow:
# boot
# -> system.boot (logs, sends ConsolePrompt)
# -> console (displays, awaits input, returns ConsoleInput)
# -> console-router (ConsoleInput -> Greeting)
# -> greeter (Greeting -> GreetingResponse)
# -> shouter (GreetingResponse -> ShoutedResponse)
# -> response-handler (ShoutedResponse -> ConsolePrompt)
# -> console (displays, awaits input, ...)
#
# The console is a regular handler in the message flow.
# Returns None on EOF/quit to disconnect.
organism:
name: hello-world
@ -14,17 +25,49 @@ max_concurrent_per_agent: 5
# Thread scheduling: breadth-first or depth-first
thread_scheduling: breadth-first
# LLM configuration
llm:
strategy: failover
retries: 3
backends:
- provider: xai
api_key_env: XAI_API_KEY
rate_limit_tpm: 100000
listeners:
# The greeter receives Greeting, sends GreetingResponse to shouter
# Console: receives ConsolePrompt, displays output, awaits input
# Returns ConsoleInput to console-router, or None to disconnect
- name: console
payload_class: handlers.console.ConsolePrompt
handler: handlers.console.handle_console_prompt
description: Interactive console - displays output, awaits input
agent: false
# Console router: receives ConsoleInput, translates to target payload
- name: console-router
payload_class: handlers.console.ConsoleInput
handler: handlers.console.handle_console_input
description: Routes console input to appropriate listeners
agent: false
# Response handler: receives ShoutedResponse, creates ConsolePrompt
- name: response-handler
payload_class: handlers.console.ShoutedResponse
handler: handlers.console.handle_shouted_response
description: Forwards responses back to console
agent: false
# Greeter: receives Greeting, sends GreetingResponse to shouter
- name: greeter
payload_class: handlers.hello.Greeting
handler: handlers.hello.handle_greeting
description: Receives greeting, forwards to shouter
agent: true
peers: [shouter]
# The shouter receives GreetingResponse, sends ShoutedResponse back to user
# Shouter: receives GreetingResponse, sends ShoutedResponse back
- name: shouter
payload_class: handlers.hello.GreetingResponse
handler: handlers.hello.handle_shout
description: Shouts the greeting in ALL CAPS
agent: true
agent: false

View file

@ -78,6 +78,27 @@ gateways:
remote_url: "wss://trusted-search-node.example.org"
trusted_identity: "pubkeys/search_node.ed25519.pub"
description: "Federated web search gateway group."
llm:
strategy: failover # failover | round-robin | least-loaded
retries: 3 # Max retry attempts per request
retry_base_delay: 1.0 # Base delay for exponential backoff
retry_max_delay: 60.0 # Maximum delay between retries
backends:
- provider: xai
api_key_env: XAI_API_KEY # Read from environment
priority: 1 # Lower = preferred for failover
rate_limit_tpm: 100000 # Tokens per minute
max_concurrent: 20 # Max concurrent requests
- provider: anthropic
api_key_env: ANTHROPIC_API_KEY
priority: 2
- provider: ollama
base_url: http://localhost:11434
supported_models: [llama3, mistral]
```
### Sections Explained
@ -117,6 +138,44 @@ Federation peers (trusted remote organisms).
- Declared separately for clarity.
- Referenced in agent `peers:` lists by their registered `name`.
#### `llm`
LLM router configuration for agents. See `llm-router-v2.1.md` for complete specification.
- `strategy`: Backend selection strategy.
- `failover` (default): Try backends in priority order, fail over on error.
- `round-robin`: Distribute requests evenly across backends.
- `least-loaded`: Route to backend with lowest current load.
- `retries`: Max retry attempts per request.
- `backends`: List of provider configurations.
- `provider`: Provider type (`xai`, `anthropic`, `openai`, `ollama`).
- `api_key_env`: Environment variable name containing the API key.
- `priority`: Lower = preferred (for failover strategy).
- `rate_limit_tpm`: Tokens per minute limit.
- `max_concurrent`: Max concurrent requests to this backend.
- `base_url`: Override default API endpoint (required for Ollama).
- `supported_models`: Model names this backend handles (Ollama only).
### Environment Variables (.env)
API keys and secrets should **never** be stored in YAML. Use environment variables instead.
The bootstrap process automatically loads `.env` from the project root via `python-dotenv`:
```env
# .env (add to .gitignore!)
XAI_API_KEY=xai-abc123...
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
```
Reference in `organism.yaml` via `api_key_env`:
```yaml
llm:
backends:
- provider: xai
api_key_env: XAI_API_KEY # Reads from environment
```
### Key Invariants (v2.1)
- Root tag = `{lowercase_name}.{lowercase_dataclass_name}` — fully derived, never written manually.
- Registered names must be unique across the organism.

View file

@ -1,8 +1,8 @@
# AgentServer v2.1 — Core Architectural Principles
**January 06, 2026**
**January 10, 2026** (Updated)
**Architecture: Autonomous Schema-Driven, Turing-Complete Multi-Agent Organism**
These principles are the single canonical source of truth for the project. All documentation, code, and future decisions must align with this file. This version incorporates Message Pump v2.1 parallelism and refines agent iteration patterns for blind, name-oblivious self-routing.
These principles are the single canonical source of truth for the project. All documentation, code, and future decisions must align with this file. This version incorporates Message Pump v2.1 parallelism, HandlerResponse pattern, thread registry, and LLM router abstraction.
## Identity & Communication
- All traffic uses the universal `<message>` envelope defined in `envelope.xsd` (namespace `https://xml-pipeline.org/ns/envelope/v1`).
@ -145,86 +145,144 @@ Handlers are treated as **untrusted code** that runs in an isolated coroutine co
The message pump maintains authoritative metadata in coroutine scope and never trusts
handler output to preserve security-critical properties.
### HandlerResponse Pattern
Handlers return a clean `HandlerResponse` dataclass—not raw XML bytes. The pump
handles all envelope wrapping and enforces security constraints:
```python
@dataclass
class HandlerResponse:
payload: Any # @xmlify dataclass instance
to: str # Requested target (validated by pump)
@classmethod
def respond(cls, payload) -> HandlerResponse:
"""Return to caller (prunes call chain)"""
...
```
### Coroutine Capture Pattern
When dispatching a message to a handler, the pump captures metadata in coroutine scope
BEFORE handler execution:
```python
async def dispatch(msg: ParsedMessage):
async def dispatch(state: MessageState, listener: Listener):
# TRUSTED: Captured before handler runs
thread_uuid = msg.thread_id
sender_name = msg.listener_name
thread_path = path_registry[thread_uuid]
parent = get_parent_from_path(thread_path)
allowed_peers = registry.get_listener(sender_name).peers
registry = get_registry()
current_thread = state.thread_id
# UNTRUSTED: Handler executes with minimal context
response_bytes = await handler(
payload=msg.deserialized_payload,
meta=HandlerMetadata(thread_id=thread_uuid) # Opaque UUID only
# Initialize call chain if new conversation
if not registry.lookup(current_thread):
current_thread = registry.start_chain(state.from_id, listener.name)
metadata = HandlerMetadata(
thread_id=current_thread, # Opaque UUID
from_id=state.from_id, # Previous hop only
own_name=listener.name, # For self-reference
usage_instructions=listener.usage_instructions, # Peer schemas
)
# TRUSTED: Coroutine scope still has authoritative metadata
# Process response using captured context, not handler claims
await process_response(
response_bytes,
actual_sender=sender_name, # From coroutine, not handler
actual_thread=thread_uuid, # From coroutine, not handler
actual_parent=parent, # From coroutine, not handler
allowed_peers=allowed_peers # From registration, not handler
# UNTRUSTED: Handler executes
response = await handler(state.payload, metadata)
# TRUSTED: Pump enforces security on response
if isinstance(response, HandlerResponse):
if response.is_response:
# Respond to caller - prune chain
target, new_thread = registry.prune_for_response(current_thread)
else:
# Forward - validate against peers
if listener.is_agent and listener.peers:
if response.to not in listener.peers:
# BLOCKED - send generic error, log details
return SystemError(...)
target = response.to
new_thread = registry.extend_chain(current_thread, target)
# Pump creates envelope with TRUSTED values
envelope = wrap_in_envelope(
payload=response.payload,
from_id=listener.name, # ALWAYS from coroutine, never handler
to_id=target, # Validated against peers
thread_id=new_thread, # From registry, never handler
)
```
### Envelope Control (Pump Enforced)
| Field | Handler Control | Pump Override |
|-------|-----------------|---------------|
| `from` | None | Always `listener.name` |
| `to` | Requests via `HandlerResponse.to` | Validated against `peers` list |
| `thread` | None | Managed by thread registry |
| `payload` | Full control | — |
### What Handlers Cannot Do
Even compromised or malicious handlers cannot:
- **Forge identity**: `<from>` is always injected from coroutine-captured sender name
- **Escape thread context**: `<thread>` is always from coroutine-captured UUID
- **Route arbitrarily**: `<to>` is computed from coroutine-captured peers list and thread path
- **Access other threads**: UUIDs are opaque; path registry is private
- **Discover topology**: Only peers list is visible; no access to path structure
- **Spoof system messages**: `<from>core</from>` only injectable by system, never handlers
- **Forge identity**: `<from>` always injected from listener registration
- **Escape thread context**: `<thread>` always from thread registry
- **Route arbitrarily**: `<to>` validated against declared peers list
- **Access other threads**: UUIDs are opaque; call chain registry is private
- **Discover topology**: Call chain appears as UUID; even `from_id` only shows immediate caller
- **Spoof system messages**: `<from>system</from>` only injectable by pump
### What Handlers Can Do
Handlers can only:
- **Call declared peers**: Emit XML matching peer schemas (validated against peers list)
- **Self-iterate**: Emit `<todo-until>` (routes back to sender automatically)
- **Return to caller**: Emit any other payload (routes to parent in thread path)
- **Access thread-scoped storage**: Via opaque UUID (isolated per delegation chain)
- **Call declared peers**: Via `HandlerResponse(to="peer")` (validated)
- **Respond to caller**: Via `HandlerResponse.respond()` (auto-routes up chain)
- **Return nothing**: Via `return None` (chain terminates)
- **Access thread-scoped storage**: Via opaque `metadata.thread_id`
- **See peer schemas**: Via `metadata.usage_instructions` (for LLM prompts)
### Response Processing Security
### Peer Constraint Enforcement
Handler output (raw bytes) undergoes full security processing:
Agents can only send to listeners in their `peers` list:
1. **Wrap in dummy tags** and parse with repair mode
2. **Extract payloads** via C14N and XSD validation
3. **Determine routing** using coroutine-captured metadata (never handler claims)
4. **Inject envelope** with trusted `<from>`, `<thread>`, `<to>` from coroutine scope
5. **Re-inject to pipeline** for identical security processing
```yaml
listeners:
- name: greeter
agent: true
peers: [shouter, logger] # Enforced by pump
```
Any envelope metadata in handler output is **ignored and overwritten**.
Violation handling:
1. Message **blocked** (never routed)
2. Details logged internally for debugging
3. Generic `SystemError` sent back to agent (no topology leak)
4. Thread stays alive—agent can retry
```xml
<SystemError>
<code>routing</code>
<message>Message could not be delivered. Please verify your target and try again.</message>
<retry-allowed>true</retry-allowed>
</SystemError>
```
### Trust Architecture
```
┌─────────────────────────────────────────────────────┐
│ TRUSTED ZONE (System) │
│ • Path registry (UUID → hierarchical path) │
│ • Thread registry (UUID ↔ call chain)
│ • Listener registry (name → peers, schema) │
│ • Thread management (pruning, parent lookup) │
│ • Peer constraint enforcement
│ • Envelope injection (<from>, <thread>, <to>) │
│ • SystemError emission (generic, no info leak) │
└─────────────────────────────────────────────────────┘
Coroutine Capture Boundary
┌─────────────────────────────────────────────────────┐
│ UNTRUSTED ZONE (Handler) │
│ • Receives: typed payload + opaque UUID
│ • Returns: raw bytes
│ • Receives: typed payload + metadata
│ • Returns: HandlerResponse or None
│ • Cannot: forge identity, escape thread, probe │
│ • Can: call peers, self-iterate, return to caller
│ • Can: call peers, respond to caller, terminate
└─────────────────────────────────────────────────────┘
```

433
docs/doc_cross_check.md Normal file
View file

@ -0,0 +1,433 @@
# AgentServer v2.1 Documentation Cross-Check
**Analysis Date:** January 07, 2026
## Executive Summary
Overall the documentation is **highly consistent** and well-structured. Found minor inconsistencies and a few areas needing clarification, but no major contradictions.
---
## 1. CRITICAL ISSUES (Must Fix)
### 1.1 Missing Core Principles Document Reference
**Issue:** Core principles referenced everywhere but not in upload
**Impact:** Cannot verify complete consistency
**Files affected:** README.md, listener-class-v2.1.md, message-pump-v2.1.md
**Recommendation:** Need to review core-principles-v2.1.md for final check
### 1.2 Handler Signature Inconsistency
**Issue:** Different handler signatures across documents
**listener-class-v2.1.md:**
```python
async def handler(
payload: PayloadDataclass,
metadata: HandlerMetadata
) -> bytes:
```
**self-grammar-generation.md:**
```python
def add_handler(payload: AddPayload) -> bytes: # NOT async!
result = payload.a + payload.b
return f"<result>{result}</result>".encode("utf-8")
```
**README.md:**
```python
def add_handler(payload: AddPayload) -> bytes: # NOT async!
```
**Conflict:** Some examples show `async def`, others show `def`
**Resolution needed:** Clarify if handlers MUST be async or can be sync
**Recommendation:**
- If both allowed: document that sync handlers are auto-wrapped
- If async only: fix all examples to use `async def`
---
## 2. MEDIUM ISSUES (Should Fix)
### 2.1 HandlerMetadata Fields Discrepancy
**listener-class-v2.1.md:**
```python
@dataclass(frozen=True)
class HandlerMetadata:
thread_id: str
from_id: str
own_name: str | None = None
is_self_call: bool = False
```
**Our discussion (coroutine trust boundary):**
- Handlers should receive MINIMAL context
- `from_id` exposes sender identity (breaks purity?)
- `is_self_call` implies handler knows about calling patterns
**Question:** Does exposing `from_id` violate "handlers don't know topology"?
- If yes: Remove `from_id`, handlers only get `thread_id`
- If no: Clarify why this is safe (e.g., "sender's name is not topology")
**Recommendation:** Document the distinction between:
- Metadata handlers CAN see (thread_id, own_name for agents)
- Metadata system keeps private (thread_path, parent, peers enforcement)
### 2.2 Root Tag Derivation Formula Inconsistency
**configuration.md:**
> Root tag = `{lowercase_name}.{lowercase_dataclass_name}`
**listener-class-v2.1.md:**
> `{lowercase_registered_name}.{lowercase_dataclass_name}`
**Example from listener-class:**
```
Registered name: calculator.add
Dataclass: AddPayload
Root tag: calculator.add.addpayload
```
**Issue:** This produces THREE dots: `calculator.add.addpayload`
- Is this intended? (hierarchical namespace)
- Or should it be: `calculator-add.addpayload`?
**Recommendation:** Clarify if dots in registered names are part of the root tag or if there's a separator convention
### 2.3 `own_name` Exposure Rationale
**configuration.md:**
> `agent: true` → enforces unique root tag, exposes `own_name` in HandlerMetadata
**Our discussion concluded:**
- Agents don't need to know their name for self-iteration (blind routing works)
- `<todo-until>` is automatic self-routing
**Question:** Why expose `own_name` at all?
- Logging/audit trails?
- Self-referential prompts?
- Legacy from earlier design?
**Recommendation:** Either:
1. Document the specific use case for `own_name`
2. Or remove it if truly unnecessary
### 2.4 System Primitives Not Listed
**Files mention but don't enumerate:**
- `<todo-until>` - documented in discussion, not in files
- `<huh>` - mentioned in message-pump-v2.1.md
- Thread pruning notifications - discussed, not documented
**Recommendation:** Add a "System Primitives" section to core principles:
```markdown
## System Primitives
The organism pre-loads the following message types with special routing:
1. **`<todo-until>`** (routing: self)
- Enables agent iteration
- Schema: primitives/todo-until.xsd
- Emitters: any agent
2. **`<huh>`** (routing: sender, system-only)
- Validation error feedback
- Schema: primitives/huh.xsd
- Emitters: system only
```
---
## 3. MINOR ISSUES (Nice to Have)
### 3.1 Thread Pruning Documentation Gap
**Discussed extensively but not documented:**
- When threads are pruned (on delegation return)
- UUID deletion and storage cleanup
- Optional `notify_on_prune: true` in YAML
**Files affected:** None explicitly cover this
**Recommendation:** Add to configuration.md under listener options
### 3.2 Multi-Payload Extraction Details
**message-pump-v2.1.md:**
> `multi_payload_extract` wraps in `<dummy>` (idempotent)
**Question:** What does "idempotent" mean here?
- If handler already wrapped in `<dummy>`, does it double-wrap?
- Or does it detect existing wrapper?
**Recommendation:** Clarify the dummy-wrapping logic
### 3.3 Broadcast Mechanism Not Fully Specified
**configuration.md:**
```yaml
broadcast: true # Shares root tag with other listeners
```
**Questions:**
- How do agents address broadcast groups?
- Do they list individual listeners or a group name?
- What happens if broadcast listener subset changes via hot-reload?
**Recommendation:** Add broadcast routing section to message-pump or configuration docs
### 3.4 Gateway vs Local Listener Routing
**configuration.md shows:**
```yaml
listeners:
- name: search.google
broadcast: true
gateways:
- name: web_search
remote_url: ...
```
**Agent peers list:**
```yaml
peers:
- web_search # Gateway name, not listener name
```
**Question:** How does routing distinguish:
- Local listener `search.google`
- Gateway group `web_search`
- What if names collide?
**Recommendation:** Document gateway resolution precedence
### 3.5 `<respond>` Tag Status Unclear
**Our discussion suggested:**
- Maybe there's no explicit `<respond>` tag
- System routes non-self/non-peer payloads to parent automatically
**Files don't mention `<respond>` at all**
**Recommendation:** Clarify if:
1. There IS a `<respond>` primitive (add to system primitives)
2. There ISN'T (document implicit parent routing)
---
## 4. CONSISTENCY CHECKS (All Good ✓)
### 4.1 XSD Validation Flow ✓
**Consistent across:**
- README.md: "XSD validation, no transcription bugs"
- listener-class-v2.1.md: "XSD-validated instance"
- message-pump-v2.1.md: "xsd_validation_step"
### 4.2 Handler Purity Principle ✓
**Consistent across:**
- README.md: "pure handlers"
- listener-class-v2.1.md: "pure async handler function"
- Our discussion: handlers are untrusted, coroutine captures metadata
### 4.3 Opaque UUID Threading ✓
**Consistent across:**
- README.md: "opaque UUID threading for privacy"
- listener-class-v2.1.md: "Opaque UUID matching <thread/> in envelope"
- configuration.md: "Opaque thread UUIDs"
### 4.4 Mandatory Description ✓
**Consistent across:**
- README.md: "one-line human description"
- listener-class-v2.1.md: "mandatory human-readable description"
- configuration.md: "Mandatory short blurb"
### 4.5 Closed-Loop Pipeline ✓
**Consistent across:**
- README.md: "ALL messages undergo identical security processing"
- message-pump-v2.1.md: handler responses go through multi_payload_extract → route_and_process
- Our discussion: everything validates, no fast paths
---
## 5. MISSING DOCUMENTATION
### 5.1 Bootstrap Sequence
**Referenced but not detailed:**
- How organism.yaml is loaded
- Order of listener registration
- When pipelines start accepting messages
- System pipeline instantiation
**Recommendation:** Add "Bootstrap Sequence" section to configuration.md
### 5.2 Hot-Reload Process
**Mentioned but not specified:**
- How OOB hot-reload works
- What happens to in-flight messages during reload
- Schema version migration
**Recommendation:** Add "Hot-Reload Mechanics" section
### 5.3 Error Message Format
**`<huh>` mentioned but schema not shown:**
```xml
<huh>
<error>XSD validation failed: ...</error>
<original-attempt>...</original-attempt>
</huh>
```
**Recommendation:** Document `<huh>` schema in primitives section
### 5.4 Token Budget Enforcement
**Mentioned in README:**
> Token budgets enforce computational bounds
**Not documented:**
- How budgets are tracked
- What happens when exceeded
- Per-thread vs per-agent budgets
**Recommendation:** Add token budget section to message-pump or core principles
### 5.5 Fair Scheduling Details
**README mentions:**
> Thread-based message queue with bounded memory and fair scheduling
**Not documented:**
- Scheduling algorithm (breadth-first/depth-first from config?)
- Queue size limits
- Backpressure handling
**Recommendation:** Expand message-pump-v2.1.md with scheduling details
---
## 6. TERMINOLOGY CONSISTENCY ✓
### Consistent Terms Used:
- "Listener" (not "handler" or "capability" inconsistently)
- "Organism" (not "system" or "platform" inconsistently)
- "Pipeline" (not "processor" or "chain")
- "Thread UUID" (not "session" or "context")
- "Peers list" (not "allowed targets" or "capabilities")
### Good Naming Conventions:
- Files: kebab-case (listener-class-v2_1.md)
- Root tags: lowercase.dotted (calculator.add.addpayload)
- Registered names: dotted.hierarchy (calculator.add)
- Dataclasses: PascalCase (AddPayload)
---
## 7. SECURITY MODEL CONSISTENCY ✓
### Handler Trust Boundary
**README.md:**
> Handlers are untrusted code... cannot forge identity, escape thread, route arbitrarily
**Our discussion:**
> Coroutine captures metadata before handler execution
**Status:** Consistent! README correctly summarizes the trust model
### Topology Privacy
**README.md:**
> Opaque UUIDs, private path registry, peers list enforcement
**listener-class-v2.1.md:**
> Handlers receive only `thread_id` (opaque UUID)
**Status:** Consistent! Though need to clarify `from_id` exposure
### Anti-Paperclip
**README.md:**
> No persistent cross-thread memory, token budgets, thread pruning
**Our discussion:**
> Thread-scoped storage, automatic cleanup, no global state
**Status:** Consistent!
---
## 8. RECOMMENDED ADDITIONS
### 8.1 Add to configuration.md:
```yaml
listeners:
- name: example
notify_on_prune: boolean # Optional: receive thread-pruned notifications
```
### 8.2 Add to listener-class-v2.1.md:
**Section: "Handler Execution Model"**
- Clarify async vs sync handlers
- Explain coroutine capture
- Document metadata availability vs privacy
### 8.3 Add to message-pump-v2.1.md:
**Section: "System Primitives"**
- List all magic tags (`<todo-until>`, `<huh>`)
- Document routing policies
- Explain emission restrictions
### 8.4 Create new file: `primitives.md`
Document all system primitives with:
- Schema (XSD)
- Routing policy
- Emission restrictions
- Usage examples
### 8.5 Add to README.md:
**Section: "Computational Model"**
Brief explanation of:
- `<todo-until>` for iteration
- Blind self-routing
- Token budgets
- Thread pruning
---
## 9. DOCUMENTATION QUALITY SCORES
| Document | Clarity | Completeness | Consistency | Overall |
|----------|---------|--------------|-------------|---------|
| README.md | 9/10 | 8/10 | 10/10 | 9/10 |
| configuration.md | 10/10 | 9/10 | 10/10 | 9.5/10 |
| listener-class-v2.1.md | 9/10 | 9/10 | 9/10 | 9/10 |
| message-pump-v2.1.md | 8/10 | 7/10 | 9/10 | 8/10 |
| self-grammar-generation.md | 8/10 | 8/10 | 8/10 | 8/10 |
| why-not-json.md | 10/10 | 10/10 | N/A | 10/10 |
**Average: 9.1/10**
---
## 10. CRITICAL PATH TO 1.0 RELEASE
### Must Fix Before Release:
1. ✅ Resolve async vs sync handler signature
2. ✅ Clarify `from_id` in HandlerMetadata (privacy concern)
3. ✅ Document system primitives (`<todo-until>`, `<huh>`)
4. ✅ Add thread pruning documentation
5. ✅ Specify `<respond>` behavior (if it exists)
### Should Fix Before Release:
6. Document bootstrap sequence
7. Document hot-reload mechanics
8. Add token budget details
9. Clarify broadcast routing
10. Add gateway resolution rules
### Nice to Have:
11. Create primitives.md reference
12. Add more handler examples
13. Document fair scheduling algorithm
14. Add troubleshooting guide
---
## FINAL VERDICT
**The documentation is production-ready with minor fixes.**
The architecture is sound, the security model is well-thought-out, and the core principles are consistently represented. The main gaps are:
1. **Implementation details** (bootstrap, hot-reload, scheduling)
2. **System primitives** (need explicit documentation)
3. **Handler signature** (async vs sync needs clarification)
Once core-principles-v2.1.md is reviewed and the above items addressed, this is **ready for implementation and external review**.
The "It just works... safely" tagline is earned. 🎯

View file

@ -1,77 +1,204 @@
# AgentServer v2.1 — Handler Contract
**January 08, 2026**
**January 10, 2026** (Updated)
This document is the single canonical specification for all capability handlers in AgentServer v2.1.
All examples, documentation, and implementation must conform to this contract.
## Handler Signature (Locked)
## Handler Signature
Every handler **must** be declared with the following exact signature:
Every handler **must** be declared with the following signature:
```python
async def handler(
payload: PayloadDataclass, # XSD-validated, deserialized @xmlify dataclass instance
metadata: HandlerMetadata # Minimal trustworthy context provided by the message pump
) -> bytes:
metadata: HandlerMetadata # Trustworthy context provided by the message pump
) -> HandlerResponse | None:
...
```
- Handlers **must** be asynchronous (`async def`).
- Synchronous functions are not permitted and will not be auto-wrapped.
- The `metadata` parameter is mandatory.
- The return value **must** be a `bytes` object containing one or more raw XML payload fragments.
- Returning `None` or any non-`bytes` value is a programming error and will trigger a protective `<huh>` emission.
- Return `HandlerResponse` to send a message, or `None` for no response.
## HandlerResponse
Handlers return a clean dataclass + target. The pump handles envelope wrapping.
```python
@dataclass
class HandlerResponse:
payload: Any # @xmlify dataclass instance
to: str # Target listener name (or use .respond() for caller)
```
### Forward to Named Target
```python
return HandlerResponse(
payload=GreetingResponse(message="Hello!"),
to="shouter",
)
```
### Respond to Caller (Prunes Call Chain)
```python
return HandlerResponse.respond(
payload=ResultPayload(value=42)
)
```
When using `.respond()`, the pump:
1. Looks up the call chain from thread registry
2. Prunes the last segment (the responder)
3. Routes back to the caller
4. Sub-threads are terminated (see Response Semantics below)
### No Response
```python
return None # Chain ends here, no message emitted
```
## HandlerMetadata
```python
@dataclass(frozen=True)
@dataclass
class HandlerMetadata:
thread_id: str # Opaque thread UUID — safe for thread-scoped storage
own_name: str | None = None # Registered name of the executing listener.
# Populated ONLY for listeners with `agent: true` in organism.yaml
thread_id: str # Opaque thread UUID — maps to hidden call chain
from_id: str # Who sent this message (previous hop)
own_name: str | None = None # This listener's name (only if agent: true)
is_self_call: bool = False # True if message is from self
usage_instructions: str = "" # Auto-generated peer schemas for LLM prompts
```
### Field Rationale
- `thread_id`: Enables isolated per-thread state (e.g., conversation memory, calculator history) without exposing topology.
- `own_name`: Allows LLM agents to produce self-referential reasoning text while remaining blind to routing mechanics.
No sender identity (`from_id`) is provided — preserving full topology privacy.
| Field | Purpose |
|-------|---------|
| `thread_id` | Opaque UUID for thread-scoped storage. Maps internally to call chain (hidden from handler). |
| `from_id` | Previous hop in call chain. Useful for context but not for routing (use `.respond()`). |
| `own_name` | Enables self-referential reasoning. Only populated for `agent: true` listeners. |
| `is_self_call` | Detect self-messages (e.g., `<todo-until>` loops). |
| `usage_instructions` | Auto-generated from peer schemas. Inject into LLM system prompt. |
## Security Model
The message pump captures all security-critical information (sender name, thread hierarchy, peers list enforcement) in trusted coroutine scope **before** invoking the handler.
The message pump enforces security boundaries. Handlers are **untrusted code**.
Handlers are treated as **untrusted code**. They receive only the minimal safe context defined above and cannot:
- Forge provenance
- Escape thread boundaries
- Probe or leak topology
- Route arbitrarily
### Envelope Control (Pump Enforced)
| Field | Handler Control | Pump Override |
|-------|-----------------|---------------|
| `from` | None | Always set to `listener.name` |
| `to` | Requests target | Validated against `peers` list |
| `thread` | None | Managed by thread registry |
| `payload` | Full control | — |
### Peer Constraint Enforcement
Agents can only send to listeners declared in their `peers` list:
```yaml
listeners:
- name: greeter
agent: true
peers: [shouter, logger] # Can only send to these
```
If an agent tries to send to an undeclared peer:
1. Message is **blocked** (never routed)
2. Details logged internally (for debugging)
3. Generic `SystemError` sent back to agent (no topology leak)
4. Thread stays alive — agent can retry
```xml
<SystemError>
<code>routing</code>
<message>Message could not be delivered. Please verify your target and try again.</message>
<retry-allowed>true</retry-allowed>
</SystemError>
```
### Thread Privacy
- Handlers see opaque UUIDs, never actual call chains
- Call chain `console.router.greeter.shouter` → appears as `uuid-xyz`
- Even `from_id` only reveals immediate caller, not full path
## Response Semantics
**Critical for LLM agents to understand:**
When you **respond** (return to caller via `.respond()`), your call chain is pruned:
- Any sub-agents you called are effectively terminated
- Their state/context is lost (calculator memory, scratch space, etc.)
- You cannot call them again in the same context after responding
**Therefore:** Complete ALL sub-tasks before responding. If you need results from a peer, wait for their response first.
This is injected into `usage_instructions` automatically.
## Example Handlers
**Pure tool (no agent flag):**
### Pure Tool (No Agent Flag)
```python
async def add_handler(payload: AddPayload, metadata: HandlerMetadata) -> bytes:
async def add_handler(payload: AddPayload, metadata: HandlerMetadata) -> HandlerResponse:
result = payload.a + payload.b
return f"<result>{result}</result>".encode("utf-8")
return HandlerResponse(
payload=ResultPayload(value=result),
to=metadata.from_id, # Return to whoever called
)
```
**LLM agent (agent: true):**
### LLM Agent
```python
async def research_handler(payload: ResearchPayload, metadata: HandlerMetadata) -> bytes:
own = metadata.own_name or "researcher" # safe fallback
return b"""
<thought>I am the """ + own.encode() + b""" agent. Next step...</thought>
<calculator.add.addpayload><a>7</a><b>35</b></calculator.add.addpayload>
"""
async def research_handler(payload: ResearchPayload, metadata: HandlerMetadata) -> HandlerResponse:
from agentserver.llm import complete
# Build prompt with peer awareness
system_prompt = metadata.usage_instructions + "\n\nYou are a research agent."
response = await complete(
model="grok-4.1",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": payload.query},
],
agent_id=metadata.own_name,
)
return HandlerResponse(
payload=ResearchResult(answer=response.content),
to="summarizer", # Forward to next agent
)
```
## References in Other Documentation
### Terminal Handler (Display Only)
- All code examples in README.md, self-grammar-generation.md, and configuration.md must match this contract.
- listener-class-v2.1.md now references this file as the authoritative source for signature and metadata.
```python
async def console_display(payload: ConsoleOutput, metadata: HandlerMetadata) -> None:
print(f"[{payload.source}] {payload.text}")
return None # End of chain
```
## Backwards Compatibility
Legacy handlers returning `bytes` are still supported but deprecated:
```python
# DEPRECATED - still works but not recommended
async def old_handler(payload, metadata) -> bytes:
return b"<result>...</result>"
```
New code should use `HandlerResponse` for:
- Type safety
- Automatic envelope wrapping
- Peer constraint enforcement
- Thread chain management
---
This contract is now **locked** for v2.1
**v2.1 Contract** — Updated January 10, 2026

View file

@ -1,5 +1,5 @@
**listener-class-v2.1.md**
**January 07, 2026**
**January 10, 2026** (Updated)
**AgentServer v2.1 — The Listener Class & Registration**
This is the canonical documentation for defining and registering capabilities in AgentServer v2.1.
@ -74,7 +74,7 @@ Optional flags:
```python
from xmlable import xmlify
from dataclasses import dataclass
from xml_pipeline import Listener, HandlerMetadata
from agentserver.message_bus.message_state import HandlerMetadata, HandlerResponse
@xmlify
@dataclass
@ -83,12 +83,20 @@ class AddPayload:
a: int = 0 # Field docstrings become parameter descriptions in prompts
b: int = 0
@xmlify
@dataclass
class ResultPayload:
"""Calculation result."""
value: int = 0
async def add_handler(
payload: AddPayload,
metadata: HandlerMetadata
) -> bytes:
) -> HandlerResponse:
result = payload.a + payload.b
return f"<result>{result}</result>".encode("utf-8")
return HandlerResponse.respond(
payload=ResultPayload(value=result)
)
```
### Handler Signature and Metadata (Locked)
@ -99,69 +107,53 @@ Typical uses:
- Agents → reason about provenance using `from_id`, optionally refer to themselves via `own_name`
### Handler Return Requirements
The handler **must** return a `bytes` object containing one or more payload root elements.
Returning `None` or a non-`bytes` value is a programming error.
Handlers return `HandlerResponse` or `None`:
The message pump protects the organism by injecting a diagnostic payload if invalid bytes are returned:
| Return | Effect |
|--------|--------|
| `HandlerResponse(payload, to)` | Send payload to named target |
| `HandlerResponse.respond(payload)` | Return to caller (prunes call chain) |
| `None` | Terminate chain, no message emitted |
```python
if response_bytes is None or not isinstance(response_bytes, bytes):
response_bytes = b"<huh>Handler failed to return valid bytes — likely missing return statement or wrong type</huh>"
```
This ensures:
- The thread never hangs due to a forgotten return
- The error is immediately visible in logs and thread history
- LLM agents can observe and self-correct
The pump handles all envelope wrapping. Handlers never construct XML envelopes.
**Correct examples**
```python
async def good(... ) -> bytes:
return b"<result>42</result>"
async def forward_handler(payload, metadata) -> HandlerResponse:
return HandlerResponse(
payload=ProcessedPayload(data="..."),
to="next_listener",
)
async def also_good(... ) -> bytes:
# fast synchronous-style computation
return b"<empty/>"
async def respond_handler(payload, metadata) -> HandlerResponse:
result = compute(payload)
return HandlerResponse.respond(
payload=ResultPayload(value=result)
)
async def terminal_handler(payload, metadata) -> None:
print(payload.message)
return None # Chain ends here
```
**Dangerous (triggers <huh> injection)**
```python
async def bad(... ):
computation()
# forgot return → implicit None
```
### Envelope Injection
Always explicitly return `bytes`.
Handlers return typed `HandlerResponse` objects. The pump performs all enveloping:
### Multi-Payload Emission & Envelope Injection
Handlers return **raw XML fragments only**. The pump performs all enveloping:
1. Serialize payload dataclass to XML
2. Build envelope with correct metadata:
- `<from>` = registered name of the executing listener (ALWAYS pump-injected)
- `<to>` = validated target from HandlerResponse
- `<thread>` = managed by thread registry (extended or pruned based on response type)
3. Re-inject into pipeline for validation and routing
1. Wrap response in `<dummy>` (tolerant of dirty output)
2. Extract all root elements found inside
3. For each extracted payload:
- Inherit current `<thread>`
- Inject `<from>` = registered name of the executing listener
- Build full `<message>` envelope
- Re-inject into the pipeline(s) matching the payloads derived root tag
**Security enforcement:**
- `<from>` is ALWAYS set by the pump (handlers cannot forge identity)
- `<to>` is validated against `peers` list for agents
- `<thread>` is managed by the thread registry (handlers cannot escape context)
Example emission enabling parallel tool calls + self-continuation:
```python
async def researcher_step(... ) -> bytes:
return b"""
<thought>Need weather and a calculation...</thought>
<web_search.searchpayload>
<query>current weather Los Angeles</query>
</web_search.searchpayload>
<calculator.add.addpayload>
<a>7</a>
<b>35</b>
</calculator.add.addpayload>
"""
```
The three payloads are automatically enveloped with correct provenance and routed.
See `handler-contract-v2.1.md` for complete security model.
### Registration Lifecycle
At startup or privileged OOB hot-reload:
@ -186,12 +178,14 @@ Any failure (duplicate root, missing description, import error) → clear error
### Summary of Key Invariants
- Root tag fully derived, never manually specified
- Global uniqueness guaranteed by registered-name prefix
- Handlers remain pure except for small trustworthy metadata
- Handlers return typed `HandlerResponse` or `None` (never raw bytes or envelopes)
- Handlers receive trustworthy metadata including peer `usage_instructions` for LLMs
- All envelope construction and provenance injection performed exclusively by the pump
- `<from>` always pump-injected (handlers cannot forge identity)
- `<to>` validated against `peers` list for agents (cannot route to undeclared targets)
- `<thread>` managed by thread registry (handlers cannot escape context)
- Zero manual XSDs, examples, or prompt fragments required
This specification is now locked for AgentServer v2.1. All code, examples, and future documentation must align with this file.
---
Ready for the next piece (message pump implementation notes, OOB channel, stateful listener examples, etc.) whenever you are.
**v2.1 Specification** — Updated January 10, 2026

302
docs/llm-router-v2.1.md Normal file
View file

@ -0,0 +1,302 @@
# AgentServer v2.1 — LLM Router Abstraction
**January 10, 2026**
This document specifies the LLM router abstraction layer that provides model-agnostic access to language models for agents.
## Overview
The LLM router provides a unified interface for LLM calls. Agents simply request a model by name; the router handles:
- Backend selection (which provider serves this model)
- Load balancing (failover, round-robin, least-loaded)
- Retries with exponential backoff and jitter
- Rate limiting per backend
- Concurrency control
- Per-agent token tracking
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ Agent Handler │
│ response = await complete("grok-4.1", messages, agent_id=...) │
└─────────────────────────────────┬───────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ LLM Router │
│ • Find backends serving model │
│ • Select backend (strategy) │
│ • Retry on failure │
│ • Track usage per agent │
└────────────┬────────────────┬────────────────┬──────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ XAI │ │Anthropic │ │ OpenAI │ ...
│ Backend │ │ Backend │ │ Backend │
└──────────┘ └──────────┘ └──────────┘
```
## Usage
### Simple Call
```python
from agentserver.llm import complete
response = await complete(
model="grok-4.1",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
],
)
print(response.content)
```
### With Agent Tracking
```python
response = await complete(
model="grok-4.1",
messages=messages,
agent_id=metadata.own_name, # Track tokens per agent
temperature=0.7,
max_tokens=2048,
)
```
### In Handler Context
```python
async def research_handler(payload: ResearchPayload, metadata: HandlerMetadata) -> HandlerResponse:
from agentserver.llm import complete
response = await complete(
model="grok-4.1",
messages=[
{"role": "system", "content": metadata.usage_instructions},
{"role": "user", "content": payload.query},
],
agent_id=metadata.own_name,
)
return HandlerResponse(
payload=ResearchResult(answer=response.content),
to="summarizer",
)
```
## Configuration
The router is configured via the `llm` section in `organism.yaml`:
```yaml
llm:
strategy: failover # failover | round-robin | least-loaded
retries: 3 # Max retry attempts per request
retry_base_delay: 1.0 # Base delay for exponential backoff
retry_max_delay: 60.0 # Maximum delay between retries
backends:
- provider: xai
api_key_env: XAI_API_KEY # Read from environment variable
priority: 1 # Lower = preferred for failover
rate_limit_tpm: 100000 # Tokens per minute limit
max_concurrent: 20 # Max concurrent requests
- provider: anthropic
api_key_env: ANTHROPIC_API_KEY
priority: 2
- provider: openai
api_key_env: OPENAI_API_KEY
priority: 3
- provider: ollama
base_url: http://localhost:11434
supported_models: [llama3, mistral]
```
### Environment Variables
API keys should be stored in environment variables (never in YAML). Create a `.env` file:
```env
XAI_API_KEY=xai-abc123...
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
```
The bootstrap process loads `.env` automatically via `python-dotenv`.
## Strategies
### Failover (Default)
Backends are tried in priority order. On failure, falls back to next available backend.
```yaml
llm:
strategy: failover
backends:
- provider: xai
priority: 1 # Try first
- provider: anthropic
priority: 2 # Fallback
```
### Round-Robin
Requests are distributed evenly across backends.
```yaml
llm:
strategy: round-robin
```
### Least-Loaded
Requests go to the backend with the lowest current load (fewest active requests relative to max_concurrent).
```yaml
llm:
strategy: least-loaded
```
## Supported Providers
| Provider | Models | Auth |
|----------|--------|------|
| `xai` | grok-* | Bearer token |
| `anthropic` | claude-* | x-api-key header |
| `openai` | gpt-*, o1-*, o3-* | Bearer token |
| `ollama` | Any local model | None (local) |
### Model Routing
The router automatically selects backends based on model name:
- `grok-4.1` → XAI backend
- `claude-sonnet-4` → Anthropic backend
- `gpt-4o` → OpenAI backend
- Ollama matches configured `supported_models` or accepts all if not specified
## Response Format
```python
@dataclass
class LLMResponse:
content: str # The generated text
model: str # Actual model used
usage: Dict[str, int] # Token counts
finish_reason: str # stop, length, tool_calls, etc.
raw: Any # Provider-specific raw response
```
Usage dict contains:
- `prompt_tokens`: Input token count
- `completion_tokens`: Output token count
- `total_tokens`: Sum of both
## Error Handling
### Rate Limits
On 429 responses, the router:
1. Reads `Retry-After` header if present
2. Falls back to exponential backoff with jitter
3. Tries next backend (if failover strategy)
### Provider Errors
On 5xx responses:
1. Logs the error
2. Retries with backoff
3. Tries next backend (if failover strategy)
### Exhausted Retries
If all retries fail, raises `BackendError`:
```python
try:
response = await complete(model, messages)
except BackendError as e:
# All backends failed
logger.error(f"LLM call failed: {e}")
```
## Token Tracking
The router tracks tokens per agent for budgeting and monitoring:
```python
from agentserver.llm.router import get_router
router = get_router()
# Get usage for specific agent
usage = router.get_agent_usage("greeter")
print(f"Total tokens: {usage.total_tokens}")
print(f"Requests: {usage.request_count}")
# Get all usage
all_usage = router.get_all_usage()
# Reset tracking
router.reset_agent_usage("greeter") # One agent
router.reset_agent_usage() # All agents
```
## Rate Limiting
Each backend has independent rate limiting:
- **Token bucket**: Limits tokens per minute (`rate_limit_tpm`)
- **Semaphore**: Limits concurrent requests (`max_concurrent`)
Requests wait if either limit is reached. This prevents overwhelming provider APIs.
## Extensibility
### Adding a New Provider
1. Create a new backend class in `backend.py`:
```python
@dataclass
class MyProviderBackend(Backend):
provider: str = "myprovider"
base_url: str = "https://api.myprovider.com/v1"
def _auth_headers(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.api_key}"}
def serves_model(self, model: str) -> bool:
return model.lower().startswith("mymodel")
async def _do_completion(self, client: httpx.AsyncClient, request: LLMRequest) -> LLMResponse:
# Provider-specific implementation
...
```
2. Register in `PROVIDER_CLASSES`:
```python
PROVIDER_CLASSES = {
# ...existing providers...
"myprovider": MyProviderBackend,
}
```
## Integration with Message Pump
The LLM router is initialized during organism bootstrap and is available globally. Token usage tracking integrates with the message pump's resource stewardship (thread-level token budgets).
Future: Token usage will be reported back to the message pump for per-thread budget enforcement.
---
**v2.1 Specification** — Updated January 10, 2026

View file

@ -1,40 +1,143 @@
# AgentServer v2.1 — System Primitives (Magic Tags)
# AgentServer v2.1 — System Primitives
**Updated: January 10, 2026**
These payload root elements receive special routing and/or side effects in the message pump.
They reside in the reserved namespace `https://xml-pipeline.org/ns/core/v1`.
This document specifies system-level message types and handler return semantics.
## Handler Return Semantics
Handlers control message flow through their return value, not through magic XML tags.
### Forward to Target
```python
return HandlerResponse(
payload=MyPayload(...),
to="target_listener",
)
```
- Pump validates target against `peers` list (for agents)
- Extends thread chain: `a.b``a.b.target`
- Target receives the payload with updated thread
### Respond to Caller
```python
return HandlerResponse.respond(
payload=ResultPayload(...)
)
```
- Pump looks up call chain from thread registry
- Prunes last segment (the responder)
- Routes to new tail (the caller)
- **Sub-threads are terminated** (calculator memory, scratch space, etc.)
### Terminate Chain
```python
return None
```
- No message emitted
- Chain ends here
- Thread can be cleaned up
## System Messages
These payload elements are emitted by the system (pump) only. Agents cannot emit them.
### `<huh>` — Validation Error
Emitted when message processing fails (XSD validation, unknown root tag, etc.).
## `<huh>`
### `<huh>`
- Emitted exclusively by the system
- Routes back to the listener that triggered the error
- Payload structure:
```xml
<huh>
<error>Brief canned error message (e.g., "Invalid payload structure")</error>
<original-attempt>Base64-encoded raw bytes of the failed attempt (truncated if large)</original-attempt>
<huh xmlns="https://xml-pipeline.org/ns/core/v1">
<error>Invalid payload structure</error>
<original-attempt>SGVsbG8gV29ybGQ=</original-attempt>
</huh>
```
- Purpose: Safe, LLM-friendly diagnostic feedback
- Security note: Error messages are abstract and canned — no raw validator output is exposed to agents
- Security note:
- Certain classes of errors (payload schema violations, unknown root tags, etc.) are intentionally reported with identical abstract messages.
- This prevents topology probing: an agent or external caller cannot distinguish between "wrong schema for existing capability" and "capability does not exist".
- Authorized introspection is available only via controlled meta queries.
## `<todo-until>`
- May be emitted by any listener
- Routes to self (uses the emitting listener's unique root tag mechanism)
- No side effects
- Purpose: Optional visible scaffolding for structured reasoning and iteration planning
| Field | Description |
|-------|-------------|
| `error` | Brief, canned error message (never raw validator output) |
| `original-attempt` | Base64-encoded raw bytes (truncated if large) |
## `<return>`
- May be emitted by any listener
- Routes to the immediate parent listener in the private thread hierarchy
- Side effect: The Current subthread below the current listener is pruned after successful delivery of message.<br>the current thread tail is the current listener.
- Purpose: Explicit return-to-caller semantics with automatic cleanup
**Security notes:**
- Error messages are intentionally abstract and generic
- Identical messages for "wrong schema" vs "capability doesn't exist"
- Prevents topology probing by agents or external callers
- Authorized introspection available via meta queries only
## `<halt>`
- May be emitted by any listener
- Routes to the immediate parent listener in the private thread hierarchy
- Side effect: The Entire thread is pruned up to and including the current listener.<br>the current thread tail is the parent listener.
- Purpose: Explicit termination of the current thread and all its subthreads
### `<SystemError>` — Routing/Delivery Failure
Emitted when a handler tries to send to an unauthorized or unreachable target.
```xml
<SystemError xmlns="">
<code>routing</code>
<message>Message could not be delivered. Please verify your target and try again.</message>
<retry-allowed>true</retry-allowed>
</SystemError>
```
| Field | Description |
|-------|-------------|
| `code` | Error category: `routing`, `validation`, `timeout` |
| `message` | Generic, non-revealing description |
| `retry-allowed` | Whether agent can retry the operation |
**Key properties:**
- Keeps thread alive (agent can retry)
- Never reveals topology (no "target doesn't exist" vs "not authorized")
- Replaces the failed message in the flow
## Agent Iteration Patterns
### Blind Self-Iteration
LLM agents iterate by emitting payloads with their own root tag. With unique root tags per agent, this automatically routes back to themselves.
```python
# In agent handler
return HandlerResponse(
payload=ThinkPayload(reasoning="Let me think more..."),
to=metadata.own_name, # Routes to self
)
```
The pump sets `is_self_call=True` in metadata for these messages.
### Visible Planning (Optional)
Agents may include planning constructs in their output for clarity:
```xml
<answer>
I need to:
<todo-until condition="have final answer">
1. Search for relevant data
2. Analyze results
3. Synthesize conclusion
</todo-until>
Starting with step 1...
</answer>
```
**Note:** `<todo-until>` is NOT interpreted by the system. It's visible structured text that LLMs can use for planning. The actual iteration happens through normal message routing.
## Response Semantics Warning
**Critical for LLM agents:**
When you respond (return to caller via `.respond()`), your call chain is pruned:
- Any sub-agents you called are effectively terminated
- Their state/context is lost (calculator memory, scratch space, etc.)
- You cannot call them again in the same context after responding
**Therefore:** Complete ALL sub-tasks before responding. If you need results from a peer, wait for their response first.
This warning is automatically included in `usage_instructions` provided to agents.
---
**v2.1 Specification** — Updated January 10, 2026

View file

@ -1,15 +1,18 @@
# Autonomous Registration & Introspection (v2.0)
In AgentServer v2.0, tool definition is radically simple: one `@xmlify` dataclass + handler + description. **No manual XSDs, no fragile JSON item mappings, no custom prompt engineering.** The organism auto-generates everything needed for validation, routing, and LLM wiring.<br/>
# Autonomous Registration & Introspection (v2.1)
**Updated: January 10, 2026**
In AgentServer v2.1, tool definition is radically simple: one `@xmlify` dataclass + handler + description. **No manual XSDs, no fragile JSON item mappings, no custom prompt engineering.** The organism auto-generates everything needed for validation, routing, and LLM wiring.
Manual XSDs, grammars, and tool descriptions are obsolete. Listeners **autonomously generate** their contracts and metadata at registration time. Introspection is a privileged core facility.
## The Developer Experience
Declare your payload contract as an `@xmlify` dataclass + a pure handler function that returns raw bytes. Register with a name and description. That's it.
Declare your payload contract as an `@xmlify` dataclass + a pure async handler function that returns `HandlerResponse` or `None`. Register with a name and description. That's it.
```python
from xmlable import xmlify
from dataclasses import dataclass
from xml_pipeline import Listener, bus # bus is the global MessageBus
from agentserver.message_bus.message_state import HandlerMetadata, HandlerResponse
@xmlify
@dataclass
@ -18,28 +21,62 @@ class AddPayload:
a: int = 0 # First operand
b: int = 0 # Second operand
def add_handler(payload: AddPayload) -> bytes:
@xmlify
@dataclass
class ResultPayload:
"""Calculation result."""
value: int = 0
async def add_handler(payload: AddPayload, metadata: HandlerMetadata) -> HandlerResponse:
result = payload.a + payload.b
return f"<result>{result}</result>".encode("utf-8")
return HandlerResponse.respond(
payload=ResultPayload(value=result)
)
# LLM example: multi-payload emission tolerated
def agent_handler(payload: AgentPayload) -> bytes:
return b"""
<thought>Analyzing...</thought>
<tool-call xmlns="https://xml-pipeline.org/ns/search/v1">
<query>weather</query>
</tool-call>
""".strip()
# LLM agent example
async def agent_handler(payload: AgentPayload, metadata: HandlerMetadata) -> HandlerResponse:
# Build prompt with peer schemas
from agentserver.llm import complete
Listener(
payload_class=AddPayload,
handler=add_handler,
name="calculator.add",
description="Adds two integers and returns their sum." # Mandatory for usable tool prompts
).register() # ← XSD, example, prompt auto-generated + registered
response = await complete(
model="grok-4.1",
messages=[
{"role": "system", "content": metadata.usage_instructions},
{"role": "user", "content": payload.query},
],
agent_id=metadata.own_name,
)
return HandlerResponse(
payload=ThoughtPayload(content=response.content),
to="next_peer",
)
```
The bus validates input against the XSD, deserializes to the dataclass instance, calls the handler, wraps output bytes in `<dummy></dummy>`, and extracts multiple payloads if emitted.
The pump:
1. Validates input against the XSD
2. Deserializes to typed dataclass instance
3. Calls handler with payload + metadata
4. Wraps returned payload in envelope with correct `<from>`, `<to>`, `<thread>`
5. Re-injects into pipeline for validation and routing
## Handler Contract (v2.1)
All handlers **must** follow this signature:
```python
async def handler(
payload: PayloadDataclass, # XSD-validated, deserialized @xmlify instance
metadata: HandlerMetadata # Trustworthy context from pump
) -> HandlerResponse | None:
...
```
- Handlers **must** be async (`async def`)
- Return `HandlerResponse` to send a message
- Return `None` to terminate chain (no message)
See `handler-contract-v2.1.md` for complete specification.
## Autonomous Chain Reaction on Registration
@ -60,13 +97,39 @@ The bus validates input against the XSD, deserializes to the dataclass instance,
</add>
Params: a(int) - First operand, b(int) - Second operand
Returns: Raw XML fragment (e.g., <result>)
Returns: Typed response payload
```
Auto-injected into wired agents' system prompts.
Auto-injected into wired agents' system prompts via `metadata.usage_instructions`.
3. **Registry Update**
Bus catalogs by `name` and `(namespace, root)`. Ready for routing + meta queries.
## Usage Instructions (Auto-Generated)
When an agent has declared `peers`, the pump automatically builds `usage_instructions` containing peer schemas:
```python
async def agent_handler(payload, metadata):
# metadata.usage_instructions contains:
# """
# You can call the following tools by emitting their XML payloads:
#
# ## calculator.add
# Adds two integers and returns their sum.
#
# ```xml
# <addpayload xmlns="https://xml-pipeline.org/ns/calculator/v1">
# <a>40</a>
# <b>2</b>
# </addpayload>
# ```
# ...
# """
pass
```
This replaces manual tool prompt engineering.
## Introspection: Privileged Meta Facility
Query the core MessageBus via reserved `https://xml-pipeline.org/ns/meta/v1`:
@ -86,16 +149,17 @@ Controlled per YAML (`meta.allow_schema_requests: "admin"` etc.). No topology le
Other ops: `request-example`, `request-prompt`, `list-capabilities`.
## Multi-Handler Organs
Need multiple functions in one service? Register separate listeners or subclass Listener for shared state.
## Key Advantages
- **Zero Drift**: Edit dataclass → restart/hot-reload → XSD/example/prompt regenerate.
- **Attack-Resistant**: lxml XSD validation → typed instance → handler.
- **LLM-Tolerant**: Raw bytes output → dummy extraction supports multi-payload and dirty streams.
- **Type-Safe Returns**: Handlers return typed dataclasses, pump handles envelopes.
- **Peer-Aware Agents**: Auto-generated `usage_instructions` from peer schemas.
- **Sovereign Wiring**: YAML agents get live prompt fragments at startup.
- **Discoverable**: Namespaces served live at https://xml-pipeline.org/ns/... for tools and federation.
*The tool declares its contract and purpose. The organism enforces and describes it exactly.*
---
**v2.1 Specification** — Updated January 10, 2026

339
handlers/console.py Normal file
View file

@ -0,0 +1,339 @@
"""
console.py Human console listener for interactive input.
A message-driven console interface for the organism. The console is a regular
handler in the message flow:
boot console (await input) console-router ... console (display + await)
The console handler:
1. Receives ConsolePrompt (may contain output to display)
2. Displays any output
3. Awaits keyboard input
4. Returns HandlerResponse with user's message → routes to console-router
5. Returns None on EOF/quit disconnected
Commands:
@listener message Send message to specific listener
/status Show organism status
/listeners List registered listeners
/quit Shutdown organism
Example:
> @greeter Hello World
[shouter] HELLO WORLD!
"""
import asyncio
import sys
from dataclasses import dataclass
from typing import Optional
from third_party.xmlable import xmlify
from agentserver.message_bus.message_state import HandlerMetadata, HandlerResponse
# ============================================================================
# Payload Classes
# ============================================================================
@xmlify
@dataclass
class ConsolePrompt:
"""
Prompt message to the console.
Contains optional output to display before prompting for input.
Sent by boot (initial prompt) and response-handler (after responses).
"""
output: str = "" # Text to display (may contain newlines)
source: str = "" # Who sent this (for coloring)
show_banner: bool = False # Show startup banner
@xmlify
@dataclass
class ConsoleInput:
"""User input from console, routed to console-router."""
text: str = ""
target: str = "" # Listener to send to
# ============================================================================
# ANSI Colors
# ============================================================================
class Colors:
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
def print_colored(text: str, color: str = Colors.RESET):
"""Print with ANSI color."""
try:
print(f"{color}{text}{Colors.RESET}")
except UnicodeEncodeError:
# Fallback for Windows console
print(text)
def print_banner():
"""Print startup banner."""
print()
print_colored("=" * 46, Colors.CYAN)
print_colored(" xml-pipeline console v0.1 ", Colors.CYAN)
print_colored("=" * 46, Colors.CYAN)
print()
print_colored("Commands:", Colors.DIM)
print_colored(" @listener message - Send to listener", Colors.DIM)
print_colored(" /status - Organism status", Colors.DIM)
print_colored(" /listeners - List listeners", Colors.DIM)
print_colored(" /quit - Shutdown", Colors.DIM)
print()
# ============================================================================
# Console State (minimal, for commands like /listeners)
# ============================================================================
# Reference to pump for introspection commands
_pump_ref = None
def set_pump_ref(pump):
"""Set pump reference for introspection."""
global _pump_ref
_pump_ref = pump
# ============================================================================
# Input Helpers
# ============================================================================
async def read_input() -> Optional[str]:
"""Async readline from stdin. Returns None on EOF."""
loop = asyncio.get_event_loop()
try:
line = await loop.run_in_executor(None, sys.stdin.readline)
if not line: # EOF
return None
return line.strip()
except (EOFError, KeyboardInterrupt):
return None
def parse_input(line: str) -> tuple[str, str, Optional[str]]:
"""
Parse input line.
Returns: (input_type, content, target)
- ("message", "hello", "greeter") for @greeter hello
- ("command", "status", None) for /status
- ("quit", "", None) for /quit
- ("empty", "", None) for blank line
"""
if not line:
return ("empty", "", None)
if line.startswith("/"):
parts = line[1:].split(None, 1)
cmd = parts[0].lower() if parts else ""
arg = parts[1] if len(parts) > 1 else ""
if cmd in ("quit", "exit"):
return ("quit", "", None)
return ("command", cmd, arg if arg else None)
if line.startswith("@"):
parts = line[1:].split(None, 1)
if len(parts) >= 1:
target = parts[0]
message = parts[1] if len(parts) > 1 else ""
return ("message", message, target)
return ("empty", "", None)
def handle_local_command(cmd: str, arg: Optional[str], metadata: HandlerMetadata) -> bool:
"""
Handle local / commands that don't need to go through the pump.
Returns True if command was handled, False otherwise.
"""
if cmd == "status":
print_colored("Status: running", Colors.GREEN)
print_colored(f"Thread: {metadata.thread_id[:8]}...", Colors.DIM)
return True
elif cmd == "listeners":
print_colored("Registered listeners:", Colors.CYAN)
if _pump_ref and hasattr(_pump_ref, 'listeners'):
for name, listener in _pump_ref.listeners.items():
desc = getattr(listener, 'description', 'No description')
print_colored(f" - {name}: {desc}", Colors.DIM)
else:
print_colored(" (pump reference not available)", Colors.DIM)
return True
elif cmd == "help":
print_colored("Commands:", Colors.CYAN)
print_colored(" @listener message - Send to listener", Colors.DIM)
print_colored(" /status - Organism status", Colors.DIM)
print_colored(" /listeners - List listeners", Colors.DIM)
print_colored(" /quit - Shutdown", Colors.DIM)
return True
else:
print_colored(f"Unknown command: /{cmd}", Colors.RED)
return True
# ============================================================================
# Console Handler
# ============================================================================
async def handle_console_prompt(
payload: ConsolePrompt,
metadata: HandlerMetadata
) -> HandlerResponse | None:
"""
Main console handler displays output, awaits input, returns message.
This is called:
1. On boot (show_banner=True, no output)
2. After each response (output_lines contains response text)
Returns:
- HandlerResponse with ConsoleInput routes to console-router
- None console disconnected (EOF or /quit)
"""
# Show banner on first prompt
if payload.show_banner:
print_banner()
# Display any output
if payload.output:
print()
for line in payload.output.split("\n"):
if payload.source:
print_colored(f"[{payload.source}] {line}", Colors.CYAN)
else:
print_colored(line, Colors.CYAN)
# Input loop - keep prompting until we get a valid message or quit
while True:
# Print prompt
print(f"{Colors.GREEN}>{Colors.RESET} ", end="", flush=True)
# Await input
line = await read_input()
# EOF - disconnect
if line is None:
print()
print_colored("EOF - disconnecting", Colors.YELLOW)
return None
# Parse input
input_type, content, target = parse_input(line)
if input_type == "quit":
print_colored("Shutting down...", Colors.YELLOW)
return None
elif input_type == "empty":
continue # Prompt again
elif input_type == "command":
# Handle local command and prompt again
handle_local_command(content, target, metadata)
continue
elif input_type == "message":
if not target:
print_colored("No target. Use @listener message", Colors.RED)
continue
# Return message to console-router
print_colored(f"[sending to {target}]", Colors.DIM)
return HandlerResponse(
payload=ConsoleInput(text=content, target=target),
to="console-router",
)
# ============================================================================
# Console Router Handler
# ============================================================================
@xmlify
@dataclass
class Greeting:
"""Greeting payload for greeter listener."""
name: str = ""
async def handle_console_input(
payload: ConsoleInput,
metadata: HandlerMetadata
) -> HandlerResponse | None:
"""
Route console input to the appropriate listener.
Translates ConsoleInput into the target's expected payload format.
"""
target = payload.target.lower()
text = payload.text
# Route to appropriate listener with correct payload
if target == "greeter":
return HandlerResponse(
payload=Greeting(name=text),
to="greeter",
)
# Generic routing - try to send raw text
# This would need expansion for other listener types
print_colored(f"Unknown target: {target}", Colors.RED)
return HandlerResponse(
payload=ConsolePrompt(
output=f"Unknown target: {target}",
source="console-router",
),
to="console",
)
# ============================================================================
# Response Handler
# ============================================================================
@xmlify
@dataclass
class ShoutedResponse:
"""Response from shouter."""
message: str = ""
async def handle_shouted_response(
payload: ShoutedResponse,
metadata: HandlerMetadata
) -> HandlerResponse:
"""
Handle responses and forward to console for display.
Takes the final response and wraps it in ConsolePrompt.
"""
return HandlerResponse(
payload=ConsolePrompt(
output=payload.message,
source="shouter",
),
to="console",
)

59
run_organism.py Normal file
View file

@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""
run_organism.py Start the organism.
Usage:
python run_organism.py [config.yaml]
This boots the organism and runs the message pump.
The console is a regular handler in the message flow:
boot -> system.boot -> console (await input) -> console-router -> ... -> console
The pump continues until the console returns None (EOF or /quit).
"""
import asyncio
import sys
from pathlib import Path
from agentserver.message_bus import bootstrap
async def run_organism(config_path: str = "config/organism.yaml"):
"""Boot organism and run the message pump."""
# Bootstrap the pump (registers listeners, injects boot message)
pump = await bootstrap(config_path)
# Set pump reference for console introspection commands
from handlers.console import set_pump_ref
set_pump_ref(pump)
# Run the pump - it will process boot -> console -> ... flow
# The pump runs until shutdown is called
try:
await pump.run()
except asyncio.CancelledError:
pass
finally:
await pump.shutdown()
print("Goodbye!")
def main():
config_path = sys.argv[1] if len(sys.argv) > 1 else "config/organism.yaml"
if not Path(config_path).exists():
print(f"Config not found: {config_path}")
sys.exit(1)
try:
asyncio.run(run_organism(config_path))
except KeyboardInterrupt:
print("\nInterrupted")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,2 @@
# One-time tool to generate the permanent Ed25519 organism identity
# Run once, store private key offline/safely