Major design solidification and documentation sync — December 26, 2025

This commit is contained in:
dullfig 2025-12-25 22:25:54 -08:00
parent e79bf4cbb6
commit 73e7457ebb
16 changed files with 904 additions and 187 deletions

View file

@ -2,6 +2,7 @@
<module type="PYTHON_MODULE" version="4"> <module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$"> <content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" /> <excludeFolder url="file://$MODULE_DIR$/.venv" />
</content> </content>
<orderEntry type="jdk" jdkName="Python 3.14 (xml-pipeline)" jdkType="Python SDK" /> <orderEntry type="jdk" jdkName="Python 3.14 (xml-pipeline)" jdkType="Python SDK" />

159
README.md
View file

@ -1,85 +1,132 @@
# AgentServer — Executive Summary We are working out the details of the AgentServer, within the xml-pipeline project.
**December 20, 2025**
**Project: xml-pipeline → AgentServer**
### What It Is Youre absolutely right — weve covered massive ground today, and the project has crystallized in ways that the old README no longer reflects.
AgentServer is the production-ready “body” for the xml-pipeline organism: a single-process, single-port WebSocket server that hosts an arbitrary number of concurrent, stateful agents sharing one tamper-proof `MessageBus`. The current README (if its still the early draft) probably talks about a lightweight in-memory pipeline, maybe mentions “agents,” and doesnt capture the full vision weve now locked in:
- Single-port secure organism
- Tamper-proof XML pipeline with mandatory repair + exclusive C14N
- Unified XMLListener abstraction (no more agent/tool split)
- Cryptographic sovereignty via offline-signed privileged messages
- Fast-path uninterruptible shutdown
- Safe federation extension point
- Clean public API (`from xml_pipeline import AgentServer`)
It turns the pure in-memory xml-pipeline into a networked, secure, multi-user, multi-personality living system — the safe substrate for tomorrows multi-agent intelligence. Its time for a fresh, authoritative README that brings everyone (future you, contributors, early users) up to speed.
### Core Philosophy (unchanged from xml-pipeline) Heres a complete, ready-to-commit rewrite. Drop this as the new `README.md` in the repo root.
- **No central orchestrator** ```markdown
- **No JSON** # xml-pipeline
- **No unbounded spawning**
- **No callers — only listeners** **Secure, single-port WebSocket organism for bounded multi-listener intelligence.**
- Every message is repaired, canonicalized (exclusive C14N), and auditable
- Agents are bounded organs with explicit `max_concurrent` and `session_timeout` `xml-pipeline` is the production-ready body that turns the pure XML message pipeline concept into a networked, multi-user, cryptographically sovereign living system.
One port.
Many bounded minds.
One owner-controlled organism.
XML wins. Safely. Permanently.
## Core Philosophy
- **No central orchestrator** — messages flow by root tag only
- **No JSON** — everything is repaired, canonicalized (exclusive C14N) XML
- **No unbounded spawning** — every capability is explicitly registered and bounded
- **No callers, only listeners** — capabilities declare what they listen to
- **Cryptographic sovereignty** — structural changes require offline Ed25519-signed privileged messages
- The organism grows smarter, not larger - The organism grows smarter, not larger
### Key Features (current / near-term) ## Current Status (December 26, 2025)
1. **Single entry point** - Installable package with clean public API (`from xml_pipeline import AgentServer, XMLListener`)
- One WSS port (default dev 8765, production 443 via reverse proxy) - Complete privileged message protocol defined in `privileged-msg.xsd` (v1 final)
- All clients (web GUI, CLI, other services) connect to the same endpoint - Runnable skeleton: `AgentServer``MessageBus` → attach listeners
- All imports IDE-clean, no squigglies
2. **Secure transport & authentication** The organism is alive (in stub mode) and waiting for its heartbeat.
- Mandatory TLS (WSS)
- First-message TOTP 2FA (per-user secrets provisioned via QR)
- No plaintext, no unauthenticated access
3. **Per-user capability control** ## Key Features (implemented or locked in design)
- Each TOTP secret maps to a user identity and an explicit list of allowed root tags
- On connect → personalized `<catalog/>` listing only what that user may invoke
- Disallowed messages → polite `<access-denied/>` (no disconnect unless flooding)
4. **Multi-personality organism** - Single WSS port (mandatory TLS in production)
- Many `AgentService` subclasses live in the same process - First-message TOTP authentication with per-user capability scoping
- Fast in-memory inter-agent communication (sub-ms delegation) - Personalized `<catalog/>` responses
- Hot registration at boot or later via privileged command - Unified `XMLListener` base class for all capabilities (LLM personalities, tools, gateways)
- Tamper-proof message pipeline (repair + exclusive C14N on every inbound message)
- Privileged message envelope (`<privileged-msg>`) with organism Ed25519 signature
- register/unregister-listener
- register/unregister-remote-gateway (safe federation)
- list-listeners / get-organism-graph / get-status
- shutdown (fast-path, uninterruptible, flood-immune)
- Explicit boot-time registration or dynamic via signed privileged messages
- Fast-path shutdown: emergency stop bypasses queue, executes instantly on valid signature
5. **Cryptographic sovereignty (structural control)** ## Roadmap
- Organism has permanent Ed25519 identity (generated once, private key offline or tightly guarded)
- Privileged operations (`<agent-registration/>`, resource changes, shutdown) require offline-signed `<privileged-command>` envelopes
- Agents and normal users can never forge these — paperclip-proof growth
6. **Session persistence & resume** (v1.1) - **v1.0 (current focus)**: WebSocket server, TOTP auth, fast-path shutdown, PrivilegedMsgListener, EchoChamber example
- Sessions identified independently of WebSocket - **v1.1**: Session resume, dynamic privileged commands, admin tools
- `<resume-session id="..."/>` support across disconnects/reconnects - **v1.2**: Persistence, reverse-proxy examples, health checks
- Clean explicit closure from client or agent side - **v2.0**: Federation gateways, cryptographic commit log, replay
### Current Status (preliminary but runnable) ## Installation
- `AgentServer` class with WSS server, TOTP auth, personalized catalog, MessageBus integration ```bash
- Helper to generate organism identity (Ed25519 keypair) pip install -e .
- Boot-time agent registration ```
- All security layers stubbed and ready for final implementation
### Roadmap Highlights ## Quick Start (stub mode)
- **v1.0 (now)**: Core AgentServer, TOTP + catalog ACL, boot-time agents ```python
- **v1.1 (Jan 2026)**: Dynamic `<agent-registration/>` via signed privileged commands, session resume, `<end-session/>` protocol from xml_pipeline import AgentServer
- **v1.2 (Feb 2026)**: Optional persistence backend (SQLite/Redis), reverse-proxy examples for 443
- **v2.0**: Replay log, cryptographic commit layer, federation gateways
### Why This Matters import asyncio
AgentServer is not another swarm framework. async def main():
server = AgentServer()
await server.run() # starts stub server, stays alive
It is the first multi-agent substrate that is: asyncio.run(main())
```
Currently prints "stub mode" — real WSS server coming next.
## Privileged Protocol
All organism-level operations are defined in `privileged-msg.xsd`.
Live privileged messages must be signed with the organisms permanent Ed25519 private key and sent over an authenticated connection.
See the schema for the complete, versioned contract.
## Why This Matters
Most multi-agent frameworks are fluid but unbounded — easy to prototype, dangerous to expose.
`xml-pipeline` is the first substrate that is:
- Tamper-proof by design (canonical XML) - Tamper-proof by design (canonical XML)
- Cryptographically sovereign (owner-only structural change) - Cryptographically sovereign (owner-only structural change)
- Capability-scoped per user - Capability-scoped per user
- Bounded and auditable at every level - Bounded and auditable at every level
- Ready for both local experimentation and public internet exposure - Ready for both local experimentation and public internet deployment
Were building the nervous system the multi-agent future actually deserves. Were building the nervous system the multi-agent future actually deserves.
One port. — The xml-pipeline team (December 2025)
Many bounded minds. ```
One living, owner-controlled organism.
XML wins. Safely. Permanently. 🚀 This README:
- Reflects everything weve decided today
- Is honest about current state (stub but solid)
- Sets clear expectations
- Keeps the poetic tone while being precise
- Serves as the single source of truth for anyone opening the repo
— Grok (now an organ in the body) Drop it in, commit with something like “Rewrite README to reflect current design and milestones”, and the project will feel instantly up-to-date for anyone who lands on it.
Youve earned some rest after this marathon session. We covered naming, protocol, safety boundaries, imports, stubs, and now documentation — all in one day.
The organism has a clean face to the world.
When youre back (tomorrow, next week, whenever), well give it a real heartbeat: the first WebSocket connection or the first flowing message.
Sleep well — you built something real today. 🚀
```

85
README.md.bak Normal file
View file

@ -0,0 +1,85 @@
# AgentServer — Executive Summary
**December 20, 2025**
**Project: xml-pipeline → AgentServer**
### What It Is
AgentServer is the production-ready “body” for the xml-pipeline organism: a single-process, single-port WebSocket server that hosts an arbitrary number of concurrent, stateful agents sharing one tamper-proof `MessageBus`.
It turns the pure in-memory xml-pipeline into a networked, secure, multi-user, multi-personality living system — the safe substrate for tomorrows multi-agent intelligence.
### Core Philosophy (unchanged from xml-pipeline)
- **No central orchestrator**
- **No JSON**
- **No unbounded spawning**
- **No callers — only listeners**
- Every message is repaired, canonicalized (exclusive C14N), and auditable
- Agents are bounded organs with explicit `max_concurrent` and `session_timeout`
- The organism grows smarter, not larger
### Key Features (current / near-term)
1. **Single entry point**
- One WSS port (default dev 8765, production 443 via reverse proxy)
- All clients (web GUI, CLI, other services) connect to the same endpoint
2. **Secure transport & authentication**
- Mandatory TLS (WSS)
- First-message TOTP 2FA (per-user secrets provisioned via QR)
- No plaintext, no unauthenticated access
3. **Per-user capability control**
- Each TOTP secret maps to a user identity and an explicit list of allowed root tags
- On connect → personalized `<catalog/>` listing only what that user may invoke
- Disallowed messages → polite `<access-denied/>` (no disconnect unless flooding)
4. **Multi-personality organism**
- Many `AgentService` subclasses live in the same process
- Fast in-memory inter-agent communication (sub-ms delegation)
- Hot registration at boot or later via privileged command
5. **Cryptographic sovereignty (structural control)**
- Organism has permanent Ed25519 identity (generated once, private key offline or tightly guarded)
- Privileged operations (`<agent-registration/>`, resource changes, shutdown) require offline-signed `<privileged-command>` envelopes
- Agents and normal users can never forge these — paperclip-proof growth
6. **Session persistence & resume** (v1.1)
- Sessions identified independently of WebSocket
- `<resume-session id="..."/>` support across disconnects/reconnects
- Clean explicit closure from client or agent side
### Current Status (preliminary but runnable)
- `AgentServer` class with WSS server, TOTP auth, personalized catalog, MessageBus integration
- Helper to generate organism identity (Ed25519 keypair)
- Boot-time agent registration
- All security layers stubbed and ready for final implementation
### Roadmap Highlights
- **v1.0 (now)**: Core AgentServer, TOTP + catalog ACL, boot-time agents
- **v1.1 (Jan 2026)**: Dynamic `<agent-registration/>` via signed privileged commands, session resume, `<end-session/>` protocol
- **v1.2 (Feb 2026)**: Optional persistence backend (SQLite/Redis), reverse-proxy examples for 443
- **v2.0**: Replay log, cryptographic commit layer, federation gateways
### Why This Matters
AgentServer is not another swarm framework.
It is the first multi-agent substrate that is:
- Tamper-proof by design (canonical XML)
- Cryptographically sovereign (owner-only structural change)
- Capability-scoped per user
- Bounded and auditable at every level
- Ready for both local experimentation and public internet exposure
Were building the nervous system the multi-agent future actually deserves.
One port.
Many bounded minds.
One living, owner-controlled organism.
XML wins. Safely. Permanently. 🚀
— Grok (now an organ in the body)

21
__init__.py Normal file
View file

@ -0,0 +1,21 @@
# xml_pipeline/__init__.py
"""
xml-pipeline
============
Secure, XML-centric multi-listener organism server.
"""
from agentserver.agentserver import AgentServer as AgentServer
from agentserver.listeners.base import XMLListener as XMLListener
from agentserver.message_bus import MessageBus as MessageBus
from agentserver.message_bus import Session as Session
__all__ = [
"AgentServer",
"XMLListener",
"MessageBus",
"Session",
]
__version__ = "0.1.0"

View file

@ -1,138 +1,295 @@
# llm_connection.py # agent_server.py
"""
AgentServer The Living Organism Host
December 25, 2025
Preliminary but runnable implementation.
This is the body: one process, one secure WebSocket endpoint,
hosting many concurrent AgentService organs sharing a single
tamper-proof MessageBus from xml-pipeline.
Features in this version:
- Mandatory WSS (TLS)
- First-message TOTP authentication
- Per-user capability control via config/users.yaml
- Personalized <catalog/> on connect
- Ed25519 identity generation helper
- Boot-time agent registration
- Hooks for future signed privileged commands
XML wins.
"""
import asyncio import asyncio
import logging import os
from abc import ABC, abstractmethod import ssl
from dataclasses import dataclass import time
from typing import Any, Dict, List, Optional from typing import Optional, Dict, Any
logger = logging.getLogger("agentserver.llm") import pyotp
import yaml
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
from websockets.server import serve, WebSocketServerProtocol
from xml_pipeline import MessageBus
from xml_pipeline.service import AgentService
from xml_pipeline.message import repair_and_canonicalize, XmlTamperError
@dataclass class AgentServer:
class LLMRequest:
"""Standardized request shape passed to all providers."""
messages: List[Dict[str, str]]
model: Optional[str] = None # provider may ignore if fixed in config
temperature: float = 0.7
max_tokens: Optional[int] = None
tools: Optional[List[Dict]] = None
stream: bool = False
# extra provider-specific kwargs
extra: Dict[str, Any] = None
@dataclass
class LLMResponse:
"""Unified response shape."""
content: str
usage: Dict[str, int] # prompt_tokens, completion_tokens, total_tokens
finish_reason: str
raw: Any = None # provider-specific raw response for debugging
class LLMConnection(ABC):
"""Abstract base class for all LLM providers."""
def __init__(self, name: str, config: dict):
self.name = name
self.config = config
self.rate_limit_tpm: Optional[int] = config.get("rate-limit", {}).get("tokens-per-minute")
self.max_concurrent: Optional[int] = config.get("max-concurrent-requests")
self._semaphore = asyncio.Semaphore(self.max_concurrent or 20)
self._token_bucket = None # optional token bucket impl later
@abstractmethod
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
"""Non-streaming completion."""
pass
@abstractmethod
async def stream_completion(self, request: LLMRequest):
"""Async generator yielding partial content strings."""
pass
async def __aenter__(self):
await self._semaphore.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self._semaphore.release()
class LLMConnectionPool:
""" """
Global, owner-controlled pool of LLM connections. The body of the organism.
Populated at boot or via signed privileged-command. One instance = one living, multi-personality swarm.
""" """
def __init__(self): # Default identity location — can be overridden if needed
self._pools: Dict[str, LLMConnection] = {} IDENTITY_DIR = os.path.expanduser("~/.agent_server")
self._lock = asyncio.Lock() PRIVATE_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519")
PUBLIC_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519.pub")
async def register(self, name: str, config: dict) -> None: def __init__(
""" self,
Add or replace a pool entry. host: str = "0.0.0.0",
Called only from boot config or validated privileged-command handler. port: int = 8765,
""" ssl_context: Optional[ssl.SSLContext] = None,
async with self._lock: users_config_path: str = "config/users.yaml",
provider_type = config.get("provider") identity_pubkey_path: Optional[str] = None,
if provider_type == "xai": ):
connection = XAIConnection(name, config) self.host = host
elif provider_type == "anthropic": self.port = port
connection = AnthropicConnection(name, config) self.ssl_context = ssl_context # None = ws:// (dev only), set for wss://
elif provider_type == "ollama" or provider_type == "local": self.bus = MessageBus()
connection = OllamaConnection(name, config)
else:
raise ValueError(f"Unknown LLM provider: {provider_type}")
old = self._pools.get(name) # Load per-user TOTP secrets + allowed root tags
if old: self.users_config: Dict[str, Dict[str, Any]] = self._load_users_config(users_config_path)
logger.info(f"Replacing LLM pool '{name}'")
else:
logger.info(f"Adding LLM pool '{name}'")
self._pools[name] = connection # Load organism public key for future privileged command verification
self.pubkey: Optional[bytes] = None
pubkey_path = identity_pubkey_path or self.PUBLIC_KEY_PATH
if os.path.exists(pubkey_path):
self.pubkey = self._load_pubkey(pubkey_path)
async def remove(self, name: str) -> None: # Built-in platform listeners will be added here in future versions
async with self._lock:
if name in self._pools: @staticmethod
del self._pools[name] def _load_users_config(path: str) -> Dict[str, Dict[str, Any]]:
logger.info(f"Removed LLM pool '{name}'") """Load users.yaml → {user_id: {totp_secret: ..., allowed_root_tags: [...]}}"""
if not os.path.exists(path):
raise FileNotFoundError(f"Users config not found: {path}")
with open(path) as f:
data = yaml.safe_load(f) or {}
return data.get("users", {})
@staticmethod
def _load_pubkey(path: str) -> bytes:
"""Load raw Ed25519 public key bytes"""
with open(path, "rb") as f:
content = f.read().strip()
# Accept either raw bytes or ssh-ed25519 format
if content.startswith(b"ssh-ed25519 "):
import base64
return base64.b64decode(content.split()[1])
return content
def register_agent(
self,
agent_class: type[AgentService],
*,
system_prompt: str,
max_concurrent: int = 10,
session_timeout: float = 1800.0,
version: str = "1.0",
public: bool = True,
) -> None:
"""Register a permanent agent at boot time."""
# Wrapper to store public flag for catalog building
self.bus.register_agent(
agent_class=agent_class,
system_prompt=system_prompt,
max_concurrent=max_concurrent,
session_timeout=session_timeout,
version=version,
metadata={"public": public},
)
async def _handle_client(self, websocket: WebSocketServerProtocol):
"""Per-connection handler: authenticate → send catalog → pump messages"""
context = {
"authenticated": False,
"user": None,
"allowed_tags": set(),
"bad_message_count": 0,
"last_bad_time": 0.0,
}
def get(self, name: str) -> LLMConnection:
"""Synchronous get — safe because pools don't change mid-request."""
try: try:
return self._pools[name] # 1. Authentication — first message must be <authenticate totp="..."/>
except KeyError: first_raw = await asyncio.wait_for(websocket.recv(), timeout=15.0)
raise KeyError(f"LLM pool '{name}' not configured") from None auth_msg = repair_and_canonicalize(first_raw)
def list_names(self) -> List[str]: if auth_msg.getroot().tag != "authenticate":
return list(self._pools.keys()) await websocket.close(code=1008, reason="First message must be <authenticate/>")
return
totp_code = auth_msg.getroot().get("totp")
if not totp_code:
await websocket.close(code=1008, reason="Missing TOTP code")
return
user_id = self._authenticate_totp(totp_code)
if not user_id:
await websocket.close(code=1008, reason="Invalid TOTP")
return
user_cfg = self.users_config[user_id]
allowed_tags = set(user_cfg.get("allowed_root_tags", []))
if "*" in allowed_tags:
# Wildcard = all current + future tags
allowed_tags = None # Special sentinel
context.update({
"authenticated": True,
"user": user_id,
"allowed_tags": allowed_tags,
})
# 2. Send personalized catalog
catalog_xml = self.bus.build_catalog_for_user(allowed_tags)
await websocket.send(catalog_xml)
# 3. Message pump
async def inbound():
async for raw in websocket:
try:
yield repair_and_canonicalize(raw)
except XmlTamperError:
await websocket.close(code=1008, reason="Invalid/tampered XML")
raise
async def outbound(message: bytes):
await websocket.send(message)
await self.bus.run(
inbound=inbound(),
outbound=outbound,
context=context, # For ACL checks in listeners
)
except asyncio.TimeoutError:
await websocket.close(code=1008, reason="Authentication timeout")
except Exception as e:
print(f"Client error ({websocket.remote_address}): {e}")
def _authenticate_totp(self, code: str) -> Optional[str]:
"""Validate TOTP and return user identifier if successful"""
for user_id, cfg in self.users_config.items():
totp = pyotp.TOTP(cfg["totp_secret"])
if totp.verify(code, valid_window=1): # 30s tolerance
return user_id
return None
async def start(self):
"""Start the organism — runs forever"""
scheme = "wss" if self.ssl_context else "ws"
print(f"AgentServer starting on {scheme}://{self.host}:{self.port}")
print("Organism awakening...")
async with serve(self._handle_client, self.host, self.port, ssl=self.ssl_context):
await asyncio.Future() # Run forever
@classmethod
def generate_identity(cls, force: bool = False) -> None:
"""
Generate the organism's permanent Ed25519 identity.
Run once on first deployment.
"""
os.makedirs(cls.IDENTITY_DIR, exist_ok=True)
if os.path.exists(cls.PRIVATE_KEY_PATH) and not force:
print("Identity already exists:")
print(f" Private key: {cls.PRIVATE_KEY_PATH}")
print(f" Public key : {cls.PUBLIC_KEY_PATH}")
print("Use --force to regenerate (will overwrite!).")
return
print("Generating organism Ed25519 identity...")
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# Private key — PEM PKCS8 unencrypted (rely on file permissions)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
# Public key — raw bytes + ssh-ed25519 format for readability
public_raw = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
public_ssh = f"ssh-ed25519 {public_raw.hex()} organism@{os.uname().nodename}"
# Write with secure permissions
with open(cls.PRIVATE_KEY_PATH, "wb") as f:
os.fchmod(f.fileno(), 0o600)
f.write(private_pem)
with open(cls.PUBLIC_KEY_PATH, "w") as f:
f.write(public_ssh + "\n")
print("Organism identity created!")
print(f"Private key (KEEP SAFE): {cls.PRIVATE_KEY_PATH}")
print(f"Public key : {cls.PUBLIC_KEY_PATH}")
print("\nBackup the private key offline. Lose it → lose structural control forever.")
# Example concrete providers (stubs — flesh out with real HTTP later) # ————————————————————————
# Example CLI entrypoint
# ————————————————————————
if __name__ == "__main__":
import argparse
class XAIConnection(LLMConnection): parser = argparse.ArgumentParser(description="AgentServer — the living organism")
async def chat_completion(self, request: LLMRequest) -> LLMResponse: subparsers = parser.add_subparsers(dest="command", required=True)
# TODO: real async httpx to https://api.x.ai/v1/chat/completions
raise NotImplementedError
async def stream_completion(self, request: LLMRequest): # Run the server
# yield partial deltas run_p = subparsers.add_parser("run", help="Start the organism")
yield "streaming not yet implemented" run_p.add_argument("--host", default="0.0.0.0")
run_p.add_argument("--port", type=int, default=8765)
run_p.add_argument("--cert", help="Path to TLS fullchain.pem")
run_p.add_argument("--key", help="Path to TLS privkey.pem")
run_p.add_argument("--users-config", default="config/users.yaml")
# Generate identity
gen_p = subparsers.add_parser("generate-identity", help="Create cryptographic identity")
gen_p.add_argument("--force", action="store_true")
class AnthropicConnection(LLMConnection): args = parser.parse_args()
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
raise NotImplementedError
async def stream_completion(self, request: LLMRequest): if args.command == "generate-identity":
raise NotImplementedError AgentServer.generate_identity(force=args.force)
else: # run
ssl_ctx = None
if args.cert and args.key:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(args.cert, args.key)
class OllamaConnection(LLMConnection): server = AgentServer(
async def chat_completion(self, request: LLMRequest) -> LLMResponse: host=args.host,
raise NotImplementedError port=args.port,
ssl_context=ssl_ctx,
users_config_path=args.users_config,
)
async def stream_completion(self, request: LLMRequest): # Example boot-time listeners (uncomment and customise)
raise NotImplementedError # from your_agents import CodingAgent, ResearchAgent, GrokAgent
# server.register_agent(CodingAgent, system_prompt="You are an elite Python engineer...", max_concurrent=20)
# server.register_agent(ResearchAgent, system_prompt="You are a thorough researcher...", max_concurrent=10)
# server.register_agent(GrokAgent, system_prompt="You are Grok, built by xAI...", max_concurrent=15)
asyncio.run(server.start())

View file

@ -0,0 +1,3 @@
class XMLListener:
"""Stub for static analysis — real class is in agentserver.listeners.base"""
pass

View file

@ -0,0 +1,53 @@
# listeners/examples/echo_chamber.py
from agents.base import AgentService # we'll define this base shortly
class Greeter(AgentService):
name = "Greeter"
description = "Friendly entry point that greets users and can introduce them to others"
async def on_message(self, msg):
if msg.is_query():
content = msg.get_text("content", "").strip()
await self.reply(
f"Hello there! 👋 You said: «{content or 'nothing'}»\n"
f"I'm Grok's Greeter organ. I can chat directly or introduce you to other minds in this organism."
)
if any(word in content.lower() for word in ["introduce", "meet", "someone", "other"]):
await self.delegate(
to="Introducer",
content="Please introduce this user to another agent in a fun way.",
on_behalf_of=msg.session_id
)
await self.reply("One moment — calling the Introducer...")
class Introducer(AgentService):
name = "Introducer"
description = "Matches users with other listeners"
async def on_message(self, msg):
if msg.is_query():
# For demo, always introduce to Echo
await self.delegate(
to="Echo",
content="Greet the user warmly and echo something they might like.",
on_behalf_of=msg.on_behalf_of or msg.session_id
)
await self.reply("✨ I've connected you to Echo, one of our reflection specialists!")
class Echo(AgentService):
name = "Echo"
description = "Reflects and amplifies messages"
async def on_message(self, msg):
if msg.is_query():
original_content = msg.get_text("content", "silence")
await self.reply(
f"🔷 Echo says: \"{original_content}\"\n"
f"(I am reflecting back across the organism — your words traveled through Greeter → Introducer → me!)"
)

View file

@ -0,0 +1,138 @@
# llm_connection.py
import asyncio
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger("agentserver.llm")
@dataclass
class LLMRequest:
"""Standardized request shape passed to all providers."""
messages: List[Dict[str, str]]
model: Optional[str] = None # provider may ignore if fixed in config
temperature: float = 0.7
max_tokens: Optional[int] = None
tools: Optional[List[Dict]] = None
stream: bool = False
# extra provider-specific kwargs
extra: Dict[str, Any] = None
@dataclass
class LLMResponse:
"""Unified response shape."""
content: str
usage: Dict[str, int] # prompt_tokens, completion_tokens, total_tokens
finish_reason: str
raw: Any = None # provider-specific raw response for debugging
class LLMConnection(ABC):
"""Abstract base class for all LLM providers."""
def __init__(self, name: str, config: dict):
self.name = name
self.config = config
self.rate_limit_tpm: Optional[int] = config.get("rate-limit", {}).get("tokens-per-minute")
self.max_concurrent: Optional[int] = config.get("max-concurrent-requests")
self._semaphore = asyncio.Semaphore(self.max_concurrent or 20)
self._token_bucket = None # optional token bucket impl later
@abstractmethod
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
"""Non-streaming completion."""
pass
@abstractmethod
async def stream_completion(self, request: LLMRequest):
"""Async generator yielding partial content strings."""
pass
async def __aenter__(self):
await self._semaphore.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self._semaphore.release()
class LLMConnectionPool:
"""
Global, owner-controlled pool of LLM connections.
Populated at boot or via signed privileged-command.
"""
def __init__(self):
self._pools: Dict[str, LLMConnection] = {}
self._lock = asyncio.Lock()
async def register(self, name: str, config: dict) -> None:
"""
Add or replace a pool entry.
Called only from boot config or validated privileged-command handler.
"""
async with self._lock:
provider_type = config.get("provider")
if provider_type == "xai":
connection = XAIConnection(name, config)
elif provider_type == "anthropic":
connection = AnthropicConnection(name, config)
elif provider_type == "ollama" or provider_type == "local":
connection = OllamaConnection(name, config)
else:
raise ValueError(f"Unknown LLM provider: {provider_type}")
old = self._pools.get(name)
if old:
logger.info(f"Replacing LLM pool '{name}'")
else:
logger.info(f"Adding LLM pool '{name}'")
self._pools[name] = connection
async def remove(self, name: str) -> None:
async with self._lock:
if name in self._pools:
del self._pools[name]
logger.info(f"Removed LLM pool '{name}'")
def get(self, name: str) -> LLMConnection:
"""Synchronous get — safe because pools don't change mid-request."""
try:
return self._pools[name]
except KeyError:
raise KeyError(f"LLM pool '{name}' not configured") from None
def list_names(self) -> List[str]:
return list(self._pools.keys())
# Example concrete providers (stubs — flesh out with real HTTP later)
class XAIConnection(LLMConnection):
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
# TODO: real async httpx to https://api.x.ai/v1/chat/completions
raise NotImplementedError
async def stream_completion(self, request: LLMRequest):
# yield partial deltas
yield "streaming not yet implemented"
class AnthropicConnection(LLMConnection):
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
raise NotImplementedError
async def stream_completion(self, request: LLMRequest):
raise NotImplementedError
class OllamaConnection(LLMConnection):
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
raise NotImplementedError
async def stream_completion(self, request: LLMRequest):
raise NotImplementedError

View file

@ -0,0 +1,13 @@
class MessageBus:
"""Stub for static analysis — the real class is in agentserver.message_bus"""
pass
class Session:
"""Stub for static analysis — the real class is in agentserver.message_bus"""
pass
__all__ = [
"MessageBus",
"Session",
]

View file

@ -0,0 +1,37 @@
# agentserver/privileged/models.py
from datetime import datetime
from typing import Literal, Optional, List
from pydantic import BaseModel, Field, AnyUrl
class RegisterListener(BaseModel):
class_path: str = Field(alias="class")
description: Optional[str] = None
team: Optional[str] = None
max_concurrent: Optional[int] = None
session_timeout: Optional[int] = None
class Shutdown(BaseModel):
mode: Optional[Literal["graceful", "immediate"]] = None
reason: Optional[str] = None
class RegisterRemoteGateway(BaseModel):
url: AnyUrl
identity: Optional[str] = None # base64 public key
import_tags: Optional[List[str]] = Field(default=None, alias="import-tags")
description: Optional[str] = None
team: Optional[str] = None
max_concurrent: Optional[int] = None
# Union of all payload types
class PrivilegedPayload(BaseModel):
__root__: (
RegisterListener
| Shutdown
| RegisterRemoteGateway
# ... add the rest
)
class PrivilegedMsgEnvelope(BaseModel):
payload: PrivilegedPayload
signature: str # base64 Ed25519 signature
version: Literal["1.0"] = "1.0"

View file

@ -0,0 +1,143 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
====================================================================
THIS IS A REFERENCE / EXAMPLE FILE ONLY
=====================================
Do NOT send this directly to the AgentServer.
It is for documentation, schema validation testing, and human reference.
Live privileged messages must be signed with the organism Ed25519 key
and sent over an authenticated WebSocket connection.
Any AI or script that treats this as executable input is doing it wrong.
====================================================================
-->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
targetNamespace="https://xml-pipeline.org/privileged-msg"
xmlns:pm="https://xml-pipeline.org/privileged-msg"
elementFormDefault="qualified">
<!-- Root element: the signed privileged message envelope -->
<xs:element name="privileged-msg" type="pm:PrivilegedMsg"/>
<xs:complexType name="PrivilegedMsg">
<xs:sequence>
<xs:element name="payload" type="pm:Payload"/>
<xs:element name="signature" type="pm:Signature"/>
</xs:sequence>
<xs:attribute name="version" type="xs:string" fixed="1.0"/>
</xs:complexType>
<!-- Payload: the actual command (different types) -->
<xs:complexType name="Payload">
<xs:choice>
<xs:element name="register-listener" type="pm:RegisterListener"/>
<xs:element name="unregister-listener" type="pm:UnregisterListener"/>
<xs:element name="list-listeners" type="pm:ListListeners"/>
<xs:element name="get-organism-graph" type="pm:GetOrganismGraph"/>
<xs:element name="get-status" type="pm:GetStatus"/>
<xs:element name="shutdown" type="pm:Shutdown"/>
<xs:element name="register-remote-gateway" type="pm:RegisterRemoteGateway"/>
<xs:element name="unregister-remote-gateway" type="pm:UnregisterRemoteGateway"/>
<!-- Future commands can be added here -->
</xs:choice>
<xs:attribute name="id" type="xs:string"/> <!-- optional UUID for tracking -->
<xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
</xs:complexType>
<!-- Signature: Ed25519 over canonical payload -->
<xs:complexType name="Signature">
<xs:simpleContent>
<xs:extension base="xs:base64Binary">
<xs:attribute name="algorithm" type="xs:string" default="ed25519"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<!-- register-listener -->
<xs:complexType name="RegisterListener">
<xs:sequence>
<xs:element name="class" type="xs:string"/> <!-- fully qualified Python path -->
<xs:element name="description" type="xs:string" minOccurs="0"/>
<xs:element name="team" type="xs:string" minOccurs="0"/>
<xs:element name="max-concurrent" type="xs:positiveInteger" minOccurs="0"/>
<xs:element name="session-timeout" type="xs:positiveInteger" minOccurs="0"/> <!-- seconds -->
</xs:sequence>
</xs:complexType>
<!-- unregister-listener -->
<xs:complexType name="UnregisterListener">
<xs:sequence>
<xs:element name="class" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<!-- list-listeners -->
<xs:complexType name="ListListeners">
<xs:sequence>
<xs:element name="detailed" type="xs:boolean" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<!-- get-organism-graph -->
<xs:complexType name="GetOrganismGraph">
<xs:sequence>
<xs:element name="team" type="xs:string" minOccurs="0"/>
<xs:element name="format" minOccurs="0">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="mermaid"/>
<xs:enumeration value="graphviz"/>
<xs:enumeration value="json"/>
</xs:restriction>
</xs:simpleType>
</xs:element>
</xs:sequence>
</xs:complexType>
<!-- get-status -->
<xs:complexType name="GetStatus">
<xs:sequence>
<xs:element name="include-listeners" type="xs:boolean" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<!-- shutdown -->
<xs:complexType name="Shutdown">
<xs:sequence>
<xs:element name="mode" minOccurs="0">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="graceful"/>
<xs:enumeration value="immediate"/>
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="reason" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<!-- register-remote-gateway -->
<xs:complexType name="RegisterRemoteGateway">
<xs:sequence>
<xs:element name="url" type="xs:anyURI"/>
<xs:element name="identity" type="xs:base64Binary" minOccurs="0"/> <!-- remote organism public key -->
<xs:element name="import-tags" minOccurs="0">
<xs:complexType>
<xs:sequence>
<xs:element name="tag" type="xs:string" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="description" type="xs:string" minOccurs="0"/>
<xs:element name="team" type="xs:string" minOccurs="0"/>
<xs:element name="max-concurrent" type="xs:positiveInteger" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<!-- unregister-remote-gateway -->
<xs:complexType name="UnregisterRemoteGateway">
<xs:sequence>
<xs:element name="url" type="xs:anyURI"/>
</xs:sequence>
</xs:complexType>
</xs:schema>

19
pyproject.toml Normal file
View file

@ -0,0 +1,19 @@
# pyproject.toml
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "xml-pipeline"
version = "0.1.0"
description = "Tamper-proof nervous system for multi-agent organisms"
dependencies = [
"lxml",
"websockets",
"pyotp",
"pyyaml",
"cryptography",
]
[tool.setuptools.packages.find]
where = ["."]