From 987c6aa2144bc757e32d637cc5b0d299341d3e9e Mon Sep 17 00:00:00 2001 From: dullfig Date: Sun, 11 Jan 2026 13:57:51 -0800 Subject: [PATCH] Add platform-managed PromptRegistry and LLM API Platform architecture for trusted orchestration: - PromptRegistry: immutable system prompts per agent, loaded at bootstrap - platform.complete(): assembles LLM calls (prompt + history + user msg) - Handlers use platform API, cannot see/modify prompts - organism.yaml now supports prompt field per listener Co-Authored-By: Claude Opus 4.5 --- agentserver/message_bus/stream_pump.py | 24 ++++ agentserver/platform/__init__.py | 30 ++++ agentserver/platform/llm_api.py | 171 ++++++++++++++++++++++ agentserver/platform/prompt_registry.py | 179 ++++++++++++++++++++++++ config/organism.yaml | 4 + handlers/hello.py | 32 ++--- 6 files changed, 420 insertions(+), 20 deletions(-) create mode 100644 agentserver/platform/__init__.py create mode 100644 agentserver/platform/llm_api.py create mode 100644 agentserver/platform/prompt_registry.py diff --git a/agentserver/message_bus/stream_pump.py b/agentserver/message_bus/stream_pump.py index 7773601..e43b7f7 100644 --- a/agentserver/message_bus/stream_pump.py +++ b/agentserver/message_bus/stream_pump.py @@ -48,6 +48,7 @@ class ListenerConfig: is_agent: bool = False peers: List[str] = field(default_factory=list) broadcast: bool = False + prompt: str = "" # System prompt for LLM agents (loaded into PromptRegistry) payload_class: type = field(default=None, repr=False) handler: Callable = field(default=None, repr=False) @@ -760,6 +761,7 @@ class ConfigLoader: is_agent=raw.get("agent", False), peers=raw.get("peers", []), broadcast=raw.get("broadcast", False), + prompt=raw.get("prompt", ""), ) @classmethod @@ -784,6 +786,7 @@ async def bootstrap(config_path: str = "config/organism.yaml") -> StreamPump: TodoUntil, TodoComplete, handle_todo_until, handle_todo_complete, ) + from agentserver.platform import get_prompt_registry # Load .env file if present load_dotenv() @@ -833,6 +836,27 @@ async def bootstrap(config_path: str = "config/organism.yaml") -> StreamPump: # Register all user-defined listeners pump.register_all() + # Load prompts into PromptRegistry (platform-managed, immutable) + prompt_registry = get_prompt_registry() + prompt_count = 0 + for listener in pump.listeners.values(): + if listener.is_agent: + # Get prompt from config (may be empty) + lc = next((l for l in config.listeners if l.name == listener.name), None) + system_prompt = lc.prompt if lc else "" + + # Register prompt with peer schemas (usage_instructions) + prompt_registry.register( + agent_name=listener.name, + system_prompt=system_prompt, + peer_schemas=listener.usage_instructions, + ) + prompt_count += 1 + + # Freeze registry - no more registrations allowed + prompt_registry.freeze() + print(f"Prompts: {prompt_count} agents registered, registry frozen") + # Configure LLM router if llm section present if config.llm_config: from agentserver.llm import configure_router diff --git a/agentserver/platform/__init__.py b/agentserver/platform/__init__.py new file mode 100644 index 0000000..f25e1c6 --- /dev/null +++ b/agentserver/platform/__init__.py @@ -0,0 +1,30 @@ +""" +platform — Trusted orchestration layer for the agent swarm. + +The platform manages: +- Prompt registry: immutable system prompts per agent +- LLM call assembly: platform controls what goes to the LLM +- Context buffer access: controlled by platform + +Agents are sandboxed. They receive messages and return responses. +They cannot see or modify prompts, and cannot directly access the LLM. +""" + +from agentserver.platform.prompt_registry import ( + PromptRegistry, + AgentPrompt, + get_prompt_registry, +) + +from agentserver.platform.llm_api import ( + complete, + platform_complete, +) + +__all__ = [ + "PromptRegistry", + "AgentPrompt", + "get_prompt_registry", + "complete", + "platform_complete", +] diff --git a/agentserver/platform/llm_api.py b/agentserver/platform/llm_api.py new file mode 100644 index 0000000..0ba46e8 --- /dev/null +++ b/agentserver/platform/llm_api.py @@ -0,0 +1,171 @@ +""" +llm_api.py — Platform-controlled LLM interface. + +The platform controls all LLM calls. Agents request completions via this API. +The platform assembles the full prompt (system + history + user message) +and enforces rate limits, caching, and cost controls. + +Design principles: +- Agent-invisible prompts: agents never see their system prompt +- Thread-scoped history: only messages from the current thread +- Auditable: all calls can be logged/traced +- Rate-limited: platform controls costs + +Usage (from handler): + from agentserver.platform import complete + + async def handle_greeting(payload, metadata): + response = await complete( + agent_name=metadata.own_name, + thread_id=metadata.thread_id, + user_message=f"Greet {payload.name}", + temperature=0.9, + ) + return HandlerResponse( + payload=GreetingResponse(message=response), + to="shouter", + ) +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +from agentserver.platform.prompt_registry import get_prompt_registry +from agentserver.memory import get_context_buffer + +logger = logging.getLogger(__name__) + + +async def complete( + agent_name: str, + thread_id: str, + user_message: str, + *, + temperature: float = 0.7, + max_tokens: int = 1024, + include_history: bool = True, + **kwargs: Any, +) -> str: + """ + Request an LLM completion for an agent. + + The platform assembles the full prompt: + 1. System prompt from PromptRegistry (invisible to agent) + 2. Peer schemas (what messages agent can send) + 3. Thread history from ContextBuffer + 4. User's message + + Args: + agent_name: The calling agent's name (for prompt lookup) + thread_id: Current thread UUID (for history lookup) + user_message: The user/task message to complete + temperature: LLM temperature (0.0-1.0) + max_tokens: Maximum tokens in response + include_history: Whether to include thread history + **kwargs: Additional LLM parameters + + Returns: + The LLM's text response + + Raises: + KeyError: If agent has no registered prompt + RuntimeError: If LLM call fails + """ + # Get agent's prompt (agent cannot see this) + prompt_registry = get_prompt_registry() + prompt = prompt_registry.get_required(agent_name) + + # Build messages array + messages: List[Dict[str, str]] = [] + + # System prompt (from registry) + if prompt.system_prompt: + messages.append({ + "role": "system", + "content": prompt.system_prompt, + }) + + # Peer schemas (what messages agent can send) + if prompt.peer_schemas: + messages.append({ + "role": "system", + "content": prompt.peer_schemas, + }) + + # Thread history (agent can read, not modify) + if include_history and thread_id: + context_buffer = get_context_buffer() + history = context_buffer.get_thread(thread_id) + + for slot in history: + # Determine role: assistant if from this agent, user otherwise + role = "assistant" if slot.from_id == agent_name else "user" + + # Serialize payload for LLM context + content = _serialize_for_llm(slot.payload, slot.from_id) + messages.append({ + "role": role, + "content": content, + }) + + # Current user message + messages.append({ + "role": "user", + "content": user_message, + }) + + # Make LLM call via router + try: + from agentserver.llm import generate + + response = await generate( + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + **kwargs, + ) + + logger.debug( + f"platform.complete: agent={agent_name} thread={thread_id[:8]}... " + f"messages={len(messages)} response_len={len(response)}" + ) + + return response + + except Exception as e: + logger.error(f"LLM call failed for {agent_name}: {e}") + raise RuntimeError(f"LLM completion failed: {e}") from e + + +def _serialize_for_llm(payload: Any, from_id: str) -> str: + """ + Serialize a payload for LLM context. + + Converts structured payloads to a readable format for the LLM. + """ + # Try XML serialization first (for xmlify classes) + if hasattr(payload, 'xml_value'): + from lxml import etree + try: + class_name = type(payload).__name__ + tree = payload.xml_value(class_name) + xml_str = etree.tostring(tree, encoding='unicode', pretty_print=True) + return f"[From {from_id}]\n{xml_str}" + except Exception: + pass + + # Try to_xml for custom classes + if hasattr(payload, 'to_xml'): + try: + return f"[From {from_id}]\n{payload.to_xml()}" + except Exception: + pass + + # Fallback to repr + return f"[From {from_id}] {repr(payload)}" + + +# Alias for cleaner imports +platform_complete = complete diff --git a/agentserver/platform/prompt_registry.py b/agentserver/platform/prompt_registry.py new file mode 100644 index 0000000..779a177 --- /dev/null +++ b/agentserver/platform/prompt_registry.py @@ -0,0 +1,179 @@ +""" +prompt_registry.py — Immutable prompt storage for agents. + +The PromptRegistry is the trusted store for agent system prompts. +Prompts are loaded at startup and cannot be modified at runtime. + +Design principles: +- Immutable: prompts set at startup, never modified +- Invisible: agents cannot see their own prompts +- Auditable: prompts hashed for tracking +- Per-agent: each agent has one system prompt + +Usage: + registry = get_prompt_registry() + + # At startup (from config loader) + registry.register("greeter", "You are a friendly greeter...") + + # At LLM call assembly (platform only) + prompt = registry.get("greeter") + messages = [{"role": "system", "content": prompt.system_prompt}, ...] +""" + +from __future__ import annotations + +import hashlib +import threading +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Dict, Optional + + +@dataclass(frozen=True) +class AgentPrompt: + """ + Immutable prompt for an agent. + + frozen=True ensures prompts cannot be modified after creation. + """ + agent_name: str + system_prompt: str + prompt_hash: str # SHA256 of system_prompt + peer_schemas: str = "" # Generated from peer XSDs + created_at: str = "" # ISO timestamp + + @classmethod + def create( + cls, + agent_name: str, + system_prompt: str, + peer_schemas: str = "", + ) -> AgentPrompt: + """Create a new AgentPrompt with computed hash.""" + prompt_hash = hashlib.sha256(system_prompt.encode()).hexdigest()[:16] + created_at = datetime.now(timezone.utc).isoformat() + + return cls( + agent_name=agent_name, + system_prompt=system_prompt, + prompt_hash=prompt_hash, + peer_schemas=peer_schemas, + created_at=created_at, + ) + + @property + def full_prompt(self) -> str: + """Combined system prompt + peer schemas.""" + if self.peer_schemas: + return f"{self.system_prompt}\n\n{self.peer_schemas}" + return self.system_prompt + + +class PromptRegistry: + """ + Immutable registry of agent prompts. + + Thread-safe. Prompts can only be registered once per agent. + """ + + def __init__(self): + self._prompts: Dict[str, AgentPrompt] = {} + self._lock = threading.Lock() + self._frozen = False + + def register( + self, + agent_name: str, + system_prompt: str, + peer_schemas: str = "", + ) -> AgentPrompt: + """ + Register a prompt for an agent. + + Can only be called during startup (before freeze). + Raises RuntimeError if called after freeze or if agent already registered. + """ + with self._lock: + if self._frozen: + raise RuntimeError( + f"Cannot register prompt for '{agent_name}': registry is frozen" + ) + + if agent_name in self._prompts: + raise RuntimeError( + f"Cannot register prompt for '{agent_name}': already registered" + ) + + prompt = AgentPrompt.create( + agent_name=agent_name, + system_prompt=system_prompt, + peer_schemas=peer_schemas, + ) + + self._prompts[agent_name] = prompt + return prompt + + def freeze(self) -> None: + """ + Freeze the registry. No more registrations allowed. + + Call this after all prompts are loaded from config. + """ + with self._lock: + self._frozen = True + + def get(self, agent_name: str) -> Optional[AgentPrompt]: + """Get prompt for an agent (None if not registered).""" + with self._lock: + return self._prompts.get(agent_name) + + def get_required(self, agent_name: str) -> AgentPrompt: + """Get prompt for an agent (raises if not found).""" + prompt = self.get(agent_name) + if prompt is None: + raise KeyError(f"No prompt registered for agent: {agent_name}") + return prompt + + def has(self, agent_name: str) -> bool: + """Check if an agent has a registered prompt.""" + with self._lock: + return agent_name in self._prompts + + def list_agents(self) -> list[str]: + """List all agents with registered prompts.""" + with self._lock: + return list(self._prompts.keys()) + + def get_stats(self) -> dict: + """Get registry statistics.""" + with self._lock: + return { + "agent_count": len(self._prompts), + "frozen": self._frozen, + "agents": list(self._prompts.keys()), + } + + def clear(self) -> None: + """Clear all prompts (for testing only).""" + with self._lock: + self._prompts.clear() + self._frozen = False + + +# ============================================================================ +# Singleton +# ============================================================================ + +_registry: Optional[PromptRegistry] = None +_registry_lock = threading.Lock() + + +def get_prompt_registry() -> PromptRegistry: + """Get the global PromptRegistry singleton.""" + global _registry + if _registry is None: + with _registry_lock: + if _registry is None: + _registry = PromptRegistry() + return _registry diff --git a/config/organism.yaml b/config/organism.yaml index 8c2e2f8..6e095c6 100644 --- a/config/organism.yaml +++ b/config/organism.yaml @@ -38,6 +38,10 @@ listeners: description: Greeting agent - forwards to shouter agent: true peers: [shouter] + prompt: | + You are a friendly greeter agent. + When someone sends you a greeting, respond enthusiastically. + Keep your responses short (1-2 sentences). # Shouter: receives GreetingResponse, sends ShoutedResponse back - name: shouter diff --git a/handlers/hello.py b/handlers/hello.py index b07adc3..3f1f84d 100644 --- a/handlers/hello.py +++ b/handlers/hello.py @@ -62,8 +62,12 @@ async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> Handl 2. Send GreetingResponse to shouter 3. When ShoutedResponse appears, eyebrow is raised 4. On next invocation, greeter sees nudge and can close the todo + + NOTE: This handler uses platform.complete() for LLM calls. + The system prompt is managed by the platform (from organism.yaml). + The handler cannot see or modify the prompt. """ - from agentserver.llm import complete + from agentserver.platform import complete from agentserver.message_bus.todo_registry import get_todo_registry # Check for any raised todos and close them @@ -73,10 +77,8 @@ async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> Handl raised = todo_registry.get_raised_for(metadata.thread_id, metadata.own_name or "greeter") for watcher in raised: todo_registry.close(watcher.id) - # In a real scenario, we might log or react to the completed todo # Register a todo watcher - we want to know when shouter responds - # This demonstrates the "await confirmation" pattern todo_registry.register( thread_id=metadata.thread_id, issuer=metadata.own_name or "greeter", @@ -85,30 +87,20 @@ async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> Handl description=f"waiting for shouter to process greeting for {payload.name}", ) - # Build system prompt with peer awareness - system_prompt = "You are a friendly greeter. Respond with ONLY a single short enthusiastic greeting sentence. No XML, no markup, just the greeting text." - if metadata.usage_instructions: - system_prompt = metadata.usage_instructions + "\n\n" + system_prompt - - # Include any todo nudges in the prompt (for LLM awareness) - if metadata.todo_nudge: - system_prompt = system_prompt + "\n\n" + metadata.todo_nudge - - # Use LLM to generate a creative greeting + # Use platform.complete() for LLM call + # The platform assembles: system prompt (from registry) + peer schemas + history + user message + # The handler only provides the user message - no prompt building! llm_response = await complete( - model="grok-3-mini-beta", - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": f"Greet {payload.name} enthusiastically."}, - ], - agent_id=metadata.own_name, + agent_name=metadata.own_name or "greeter", + thread_id=metadata.thread_id, + user_message=f"Greet {payload.name} enthusiastically. Respond with ONLY a short greeting sentence.", temperature=0.9, ) # Return clean dataclass + target - pump handles envelope return HandlerResponse( payload=GreetingResponse( - message=llm_response.content, + message=llm_response, original_sender="response-handler", ), to="shouter",