changed agentserver.py

This commit is contained in:
dullfig 2025-12-25 22:25:54 -08:00
parent e79bf4cbb6
commit 455c375e10
4 changed files with 481 additions and 114 deletions

View file

@ -0,0 +1,53 @@
# agents/examples/echo_chamber.py
from agents.base import AgentService # we'll define this base shortly
class Greeter(AgentService):
name = "Greeter"
description = "Friendly entry point that greets users and can introduce them to others"
async def on_message(self, msg):
if msg.is_query():
content = msg.get_text("content", "").strip()
await self.reply(
f"Hello there! 👋 You said: «{content or 'nothing'}»\n"
f"I'm Grok's Greeter organ. I can chat directly or introduce you to other minds in this organism."
)
if any(word in content.lower() for word in ["introduce", "meet", "someone", "other"]):
await self.delegate(
to="Introducer",
content="Please introduce this user to another agent in a fun way.",
on_behalf_of=msg.session_id
)
await self.reply("One moment — calling the Introducer...")
class Introducer(AgentService):
name = "Introducer"
description = "Matches users with other agents"
async def on_message(self, msg):
if msg.is_query():
# For demo, always introduce to Echo
await self.delegate(
to="Echo",
content="Greet the user warmly and echo something they might like.",
on_behalf_of=msg.on_behalf_of or msg.session_id
)
await self.reply("✨ I've connected you to Echo, one of our reflection specialists!")
class Echo(AgentService):
name = "Echo"
description = "Reflects and amplifies messages"
async def on_message(self, msg):
if msg.is_query():
original_content = msg.get_text("content", "silence")
await self.reply(
f"🔷 Echo says: \"{original_content}\"\n"
f"(I am reflecting back across the organism — your words traveled through Greeter → Introducer → me!)"
)

View file

@ -1,138 +1,295 @@
# llm_connection.py # agent_server.py
"""
AgentServer The Living Organism Host
December 25, 2025
Preliminary but runnable implementation.
This is the body: one process, one secure WebSocket endpoint,
hosting many concurrent AgentService organs sharing a single
tamper-proof MessageBus from xml-pipeline.
Features in this version:
- Mandatory WSS (TLS)
- First-message TOTP authentication
- Per-user capability control via config/users.yaml
- Personalized <catalog/> on connect
- Ed25519 identity generation helper
- Boot-time agent registration
- Hooks for future signed privileged commands
XML wins.
"""
import asyncio import asyncio
import logging import os
from abc import ABC, abstractmethod import ssl
from dataclasses import dataclass import time
from typing import Any, Dict, List, Optional from typing import Optional, Dict, Any
logger = logging.getLogger("agentserver.llm") import pyotp
import yaml
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
from websockets.server import serve, WebSocketServerProtocol
from xml_pipeline import MessageBus
from xml_pipeline.service import AgentService
from xml_pipeline.message import repair_and_canonicalize, XmlTamperError
@dataclass class AgentServer:
class LLMRequest:
"""Standardized request shape passed to all providers."""
messages: List[Dict[str, str]]
model: Optional[str] = None # provider may ignore if fixed in config
temperature: float = 0.7
max_tokens: Optional[int] = None
tools: Optional[List[Dict]] = None
stream: bool = False
# extra provider-specific kwargs
extra: Dict[str, Any] = None
@dataclass
class LLMResponse:
"""Unified response shape."""
content: str
usage: Dict[str, int] # prompt_tokens, completion_tokens, total_tokens
finish_reason: str
raw: Any = None # provider-specific raw response for debugging
class LLMConnection(ABC):
"""Abstract base class for all LLM providers."""
def __init__(self, name: str, config: dict):
self.name = name
self.config = config
self.rate_limit_tpm: Optional[int] = config.get("rate-limit", {}).get("tokens-per-minute")
self.max_concurrent: Optional[int] = config.get("max-concurrent-requests")
self._semaphore = asyncio.Semaphore(self.max_concurrent or 20)
self._token_bucket = None # optional token bucket impl later
@abstractmethod
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
"""Non-streaming completion."""
pass
@abstractmethod
async def stream_completion(self, request: LLMRequest):
"""Async generator yielding partial content strings."""
pass
async def __aenter__(self):
await self._semaphore.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self._semaphore.release()
class LLMConnectionPool:
""" """
Global, owner-controlled pool of LLM connections. The body of the organism.
Populated at boot or via signed privileged-command. One instance = one living, multi-personality swarm.
""" """
def __init__(self): # Default identity location — can be overridden if needed
self._pools: Dict[str, LLMConnection] = {} IDENTITY_DIR = os.path.expanduser("~/.agent_server")
self._lock = asyncio.Lock() PRIVATE_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519")
PUBLIC_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519.pub")
async def register(self, name: str, config: dict) -> None: def __init__(
""" self,
Add or replace a pool entry. host: str = "0.0.0.0",
Called only from boot config or validated privileged-command handler. port: int = 8765,
""" ssl_context: Optional[ssl.SSLContext] = None,
async with self._lock: users_config_path: str = "config/users.yaml",
provider_type = config.get("provider") identity_pubkey_path: Optional[str] = None,
if provider_type == "xai": ):
connection = XAIConnection(name, config) self.host = host
elif provider_type == "anthropic": self.port = port
connection = AnthropicConnection(name, config) self.ssl_context = ssl_context # None = ws:// (dev only), set for wss://
elif provider_type == "ollama" or provider_type == "local": self.bus = MessageBus()
connection = OllamaConnection(name, config)
else:
raise ValueError(f"Unknown LLM provider: {provider_type}")
old = self._pools.get(name) # Load per-user TOTP secrets + allowed root tags
if old: self.users_config: Dict[str, Dict[str, Any]] = self._load_users_config(users_config_path)
logger.info(f"Replacing LLM pool '{name}'")
else:
logger.info(f"Adding LLM pool '{name}'")
self._pools[name] = connection # Load organism public key for future privileged command verification
self.pubkey: Optional[bytes] = None
pubkey_path = identity_pubkey_path or self.PUBLIC_KEY_PATH
if os.path.exists(pubkey_path):
self.pubkey = self._load_pubkey(pubkey_path)
async def remove(self, name: str) -> None: # Built-in platform listeners will be added here in future versions
async with self._lock:
if name in self._pools: @staticmethod
del self._pools[name] def _load_users_config(path: str) -> Dict[str, Dict[str, Any]]:
logger.info(f"Removed LLM pool '{name}'") """Load users.yaml → {user_id: {totp_secret: ..., allowed_root_tags: [...]}}"""
if not os.path.exists(path):
raise FileNotFoundError(f"Users config not found: {path}")
with open(path) as f:
data = yaml.safe_load(f) or {}
return data.get("users", {})
@staticmethod
def _load_pubkey(path: str) -> bytes:
"""Load raw Ed25519 public key bytes"""
with open(path, "rb") as f:
content = f.read().strip()
# Accept either raw bytes or ssh-ed25519 format
if content.startswith(b"ssh-ed25519 "):
import base64
return base64.b64decode(content.split()[1])
return content
def register_agent(
self,
agent_class: type[AgentService],
*,
system_prompt: str,
max_concurrent: int = 10,
session_timeout: float = 1800.0,
version: str = "1.0",
public: bool = True,
) -> None:
"""Register a permanent agent at boot time."""
# Wrapper to store public flag for catalog building
self.bus.register_agent(
agent_class=agent_class,
system_prompt=system_prompt,
max_concurrent=max_concurrent,
session_timeout=session_timeout,
version=version,
metadata={"public": public},
)
async def _handle_client(self, websocket: WebSocketServerProtocol):
"""Per-connection handler: authenticate → send catalog → pump messages"""
context = {
"authenticated": False,
"user": None,
"allowed_tags": set(),
"bad_message_count": 0,
"last_bad_time": 0.0,
}
def get(self, name: str) -> LLMConnection:
"""Synchronous get — safe because pools don't change mid-request."""
try: try:
return self._pools[name] # 1. Authentication — first message must be <authenticate totp="..."/>
except KeyError: first_raw = await asyncio.wait_for(websocket.recv(), timeout=15.0)
raise KeyError(f"LLM pool '{name}' not configured") from None auth_msg = repair_and_canonicalize(first_raw)
def list_names(self) -> List[str]: if auth_msg.getroot().tag != "authenticate":
return list(self._pools.keys()) await websocket.close(code=1008, reason="First message must be <authenticate/>")
return
totp_code = auth_msg.getroot().get("totp")
if not totp_code:
await websocket.close(code=1008, reason="Missing TOTP code")
return
user_id = self._authenticate_totp(totp_code)
if not user_id:
await websocket.close(code=1008, reason="Invalid TOTP")
return
user_cfg = self.users_config[user_id]
allowed_tags = set(user_cfg.get("allowed_root_tags", []))
if "*" in allowed_tags:
# Wildcard = all current + future tags
allowed_tags = None # Special sentinel
context.update({
"authenticated": True,
"user": user_id,
"allowed_tags": allowed_tags,
})
# 2. Send personalized catalog
catalog_xml = self.bus.build_catalog_for_user(allowed_tags)
await websocket.send(catalog_xml)
# 3. Message pump
async def inbound():
async for raw in websocket:
try:
yield repair_and_canonicalize(raw)
except XmlTamperError:
await websocket.close(code=1008, reason="Invalid/tampered XML")
raise
async def outbound(message: bytes):
await websocket.send(message)
await self.bus.run(
inbound=inbound(),
outbound=outbound,
context=context, # For ACL checks in listeners
)
except asyncio.TimeoutError:
await websocket.close(code=1008, reason="Authentication timeout")
except Exception as e:
print(f"Client error ({websocket.remote_address}): {e}")
def _authenticate_totp(self, code: str) -> Optional[str]:
"""Validate TOTP and return user identifier if successful"""
for user_id, cfg in self.users_config.items():
totp = pyotp.TOTP(cfg["totp_secret"])
if totp.verify(code, valid_window=1): # 30s tolerance
return user_id
return None
async def start(self):
"""Start the organism — runs forever"""
scheme = "wss" if self.ssl_context else "ws"
print(f"AgentServer starting on {scheme}://{self.host}:{self.port}")
print("Organism awakening...")
async with serve(self._handle_client, self.host, self.port, ssl=self.ssl_context):
await asyncio.Future() # Run forever
@classmethod
def generate_identity(cls, force: bool = False) -> None:
"""
Generate the organism's permanent Ed25519 identity.
Run once on first deployment.
"""
os.makedirs(cls.IDENTITY_DIR, exist_ok=True)
if os.path.exists(cls.PRIVATE_KEY_PATH) and not force:
print("Identity already exists:")
print(f" Private key: {cls.PRIVATE_KEY_PATH}")
print(f" Public key : {cls.PUBLIC_KEY_PATH}")
print("Use --force to regenerate (will overwrite!).")
return
print("Generating organism Ed25519 identity...")
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# Private key — PEM PKCS8 unencrypted (rely on file permissions)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
# Public key — raw bytes + ssh-ed25519 format for readability
public_raw = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
public_ssh = f"ssh-ed25519 {public_raw.hex()} organism@{os.uname().nodename}"
# Write with secure permissions
with open(cls.PRIVATE_KEY_PATH, "wb") as f:
os.fchmod(f.fileno(), 0o600)
f.write(private_pem)
with open(cls.PUBLIC_KEY_PATH, "w") as f:
f.write(public_ssh + "\n")
print("Organism identity created!")
print(f"Private key (KEEP SAFE): {cls.PRIVATE_KEY_PATH}")
print(f"Public key : {cls.PUBLIC_KEY_PATH}")
print("\nBackup the private key offline. Lose it → lose structural control forever.")
# Example concrete providers (stubs — flesh out with real HTTP later) # ————————————————————————
# Example CLI entrypoint
# ————————————————————————
if __name__ == "__main__":
import argparse
class XAIConnection(LLMConnection): parser = argparse.ArgumentParser(description="AgentServer — the living organism")
async def chat_completion(self, request: LLMRequest) -> LLMResponse: subparsers = parser.add_subparsers(dest="command", required=True)
# TODO: real async httpx to https://api.x.ai/v1/chat/completions
raise NotImplementedError
async def stream_completion(self, request: LLMRequest): # Run the server
# yield partial deltas run_p = subparsers.add_parser("run", help="Start the organism")
yield "streaming not yet implemented" run_p.add_argument("--host", default="0.0.0.0")
run_p.add_argument("--port", type=int, default=8765)
run_p.add_argument("--cert", help="Path to TLS fullchain.pem")
run_p.add_argument("--key", help="Path to TLS privkey.pem")
run_p.add_argument("--users-config", default="config/users.yaml")
# Generate identity
gen_p = subparsers.add_parser("generate-identity", help="Create cryptographic identity")
gen_p.add_argument("--force", action="store_true")
class AnthropicConnection(LLMConnection): args = parser.parse_args()
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
raise NotImplementedError
async def stream_completion(self, request: LLMRequest): if args.command == "generate-identity":
raise NotImplementedError AgentServer.generate_identity(force=args.force)
else: # run
ssl_ctx = None
if args.cert and args.key:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(args.cert, args.key)
class OllamaConnection(LLMConnection): server = AgentServer(
async def chat_completion(self, request: LLMRequest) -> LLMResponse: host=args.host,
raise NotImplementedError port=args.port,
ssl_context=ssl_ctx,
users_config_path=args.users_config,
)
async def stream_completion(self, request: LLMRequest): # Example boot-time agents (uncomment and customise)
raise NotImplementedError # from your_agents import CodingAgent, ResearchAgent, GrokAgent
# server.register_agent(CodingAgent, system_prompt="You are an elite Python engineer...", max_concurrent=20)
# server.register_agent(ResearchAgent, system_prompt="You are a thorough researcher...", max_concurrent=10)
# server.register_agent(GrokAgent, system_prompt="You are Grok, built by xAI...", max_concurrent=15)
asyncio.run(server.start())

View file

@ -0,0 +1,138 @@
# llm_connection.py
import asyncio
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger("agentserver.llm")
@dataclass
class LLMRequest:
"""Standardized request shape passed to all providers."""
messages: List[Dict[str, str]]
model: Optional[str] = None # provider may ignore if fixed in config
temperature: float = 0.7
max_tokens: Optional[int] = None
tools: Optional[List[Dict]] = None
stream: bool = False
# extra provider-specific kwargs
extra: Dict[str, Any] = None
@dataclass
class LLMResponse:
"""Unified response shape."""
content: str
usage: Dict[str, int] # prompt_tokens, completion_tokens, total_tokens
finish_reason: str
raw: Any = None # provider-specific raw response for debugging
class LLMConnection(ABC):
"""Abstract base class for all LLM providers."""
def __init__(self, name: str, config: dict):
self.name = name
self.config = config
self.rate_limit_tpm: Optional[int] = config.get("rate-limit", {}).get("tokens-per-minute")
self.max_concurrent: Optional[int] = config.get("max-concurrent-requests")
self._semaphore = asyncio.Semaphore(self.max_concurrent or 20)
self._token_bucket = None # optional token bucket impl later
@abstractmethod
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
"""Non-streaming completion."""
pass
@abstractmethod
async def stream_completion(self, request: LLMRequest):
"""Async generator yielding partial content strings."""
pass
async def __aenter__(self):
await self._semaphore.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self._semaphore.release()
class LLMConnectionPool:
"""
Global, owner-controlled pool of LLM connections.
Populated at boot or via signed privileged-command.
"""
def __init__(self):
self._pools: Dict[str, LLMConnection] = {}
self._lock = asyncio.Lock()
async def register(self, name: str, config: dict) -> None:
"""
Add or replace a pool entry.
Called only from boot config or validated privileged-command handler.
"""
async with self._lock:
provider_type = config.get("provider")
if provider_type == "xai":
connection = XAIConnection(name, config)
elif provider_type == "anthropic":
connection = AnthropicConnection(name, config)
elif provider_type == "ollama" or provider_type == "local":
connection = OllamaConnection(name, config)
else:
raise ValueError(f"Unknown LLM provider: {provider_type}")
old = self._pools.get(name)
if old:
logger.info(f"Replacing LLM pool '{name}'")
else:
logger.info(f"Adding LLM pool '{name}'")
self._pools[name] = connection
async def remove(self, name: str) -> None:
async with self._lock:
if name in self._pools:
del self._pools[name]
logger.info(f"Removed LLM pool '{name}'")
def get(self, name: str) -> LLMConnection:
"""Synchronous get — safe because pools don't change mid-request."""
try:
return self._pools[name]
except KeyError:
raise KeyError(f"LLM pool '{name}' not configured") from None
def list_names(self) -> List[str]:
return list(self._pools.keys())
# Example concrete providers (stubs — flesh out with real HTTP later)
class XAIConnection(LLMConnection):
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
# TODO: real async httpx to https://api.x.ai/v1/chat/completions
raise NotImplementedError
async def stream_completion(self, request: LLMRequest):
# yield partial deltas
yield "streaming not yet implemented"
class AnthropicConnection(LLMConnection):
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
raise NotImplementedError
async def stream_completion(self, request: LLMRequest):
raise NotImplementedError
class OllamaConnection(LLMConnection):
async def chat_completion(self, request: LLMRequest) -> LLMResponse:
raise NotImplementedError
async def stream_completion(self, request: LLMRequest):
raise NotImplementedError

19
pyproject.toml Normal file
View file

@ -0,0 +1,19 @@
# pyproject.toml
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "xml-pipeline"
version = "0.1.0"
description = "Tamper-proof nervous system for multi-agent organisms"
dependencies = [
"lxml",
"websockets",
"pyotp",
"pyyaml",
"cryptography",
]
[tool.setuptools.packages.find]
where = ["."]