Add platform-managed PromptRegistry and LLM API

Platform architecture for trusted orchestration:
- PromptRegistry: immutable system prompts per agent, loaded at bootstrap
- platform.complete(): assembles LLM calls (prompt + history + user msg)
- Handlers use platform API, cannot see/modify prompts
- organism.yaml now supports prompt field per listener

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
dullfig 2026-01-11 13:57:51 -08:00
parent d7825335eb
commit 987c6aa214
6 changed files with 420 additions and 20 deletions

View file

@ -48,6 +48,7 @@ class ListenerConfig:
is_agent: bool = False is_agent: bool = False
peers: List[str] = field(default_factory=list) peers: List[str] = field(default_factory=list)
broadcast: bool = False broadcast: bool = False
prompt: str = "" # System prompt for LLM agents (loaded into PromptRegistry)
payload_class: type = field(default=None, repr=False) payload_class: type = field(default=None, repr=False)
handler: Callable = field(default=None, repr=False) handler: Callable = field(default=None, repr=False)
@ -760,6 +761,7 @@ class ConfigLoader:
is_agent=raw.get("agent", False), is_agent=raw.get("agent", False),
peers=raw.get("peers", []), peers=raw.get("peers", []),
broadcast=raw.get("broadcast", False), broadcast=raw.get("broadcast", False),
prompt=raw.get("prompt", ""),
) )
@classmethod @classmethod
@ -784,6 +786,7 @@ async def bootstrap(config_path: str = "config/organism.yaml") -> StreamPump:
TodoUntil, TodoComplete, TodoUntil, TodoComplete,
handle_todo_until, handle_todo_complete, handle_todo_until, handle_todo_complete,
) )
from agentserver.platform import get_prompt_registry
# Load .env file if present # Load .env file if present
load_dotenv() load_dotenv()
@ -833,6 +836,27 @@ async def bootstrap(config_path: str = "config/organism.yaml") -> StreamPump:
# Register all user-defined listeners # Register all user-defined listeners
pump.register_all() pump.register_all()
# Load prompts into PromptRegistry (platform-managed, immutable)
prompt_registry = get_prompt_registry()
prompt_count = 0
for listener in pump.listeners.values():
if listener.is_agent:
# Get prompt from config (may be empty)
lc = next((l for l in config.listeners if l.name == listener.name), None)
system_prompt = lc.prompt if lc else ""
# Register prompt with peer schemas (usage_instructions)
prompt_registry.register(
agent_name=listener.name,
system_prompt=system_prompt,
peer_schemas=listener.usage_instructions,
)
prompt_count += 1
# Freeze registry - no more registrations allowed
prompt_registry.freeze()
print(f"Prompts: {prompt_count} agents registered, registry frozen")
# Configure LLM router if llm section present # Configure LLM router if llm section present
if config.llm_config: if config.llm_config:
from agentserver.llm import configure_router from agentserver.llm import configure_router

View file

@ -0,0 +1,30 @@
"""
platform Trusted orchestration layer for the agent swarm.
The platform manages:
- Prompt registry: immutable system prompts per agent
- LLM call assembly: platform controls what goes to the LLM
- Context buffer access: controlled by platform
Agents are sandboxed. They receive messages and return responses.
They cannot see or modify prompts, and cannot directly access the LLM.
"""
from agentserver.platform.prompt_registry import (
PromptRegistry,
AgentPrompt,
get_prompt_registry,
)
from agentserver.platform.llm_api import (
complete,
platform_complete,
)
__all__ = [
"PromptRegistry",
"AgentPrompt",
"get_prompt_registry",
"complete",
"platform_complete",
]

View file

@ -0,0 +1,171 @@
"""
llm_api.py Platform-controlled LLM interface.
The platform controls all LLM calls. Agents request completions via this API.
The platform assembles the full prompt (system + history + user message)
and enforces rate limits, caching, and cost controls.
Design principles:
- Agent-invisible prompts: agents never see their system prompt
- Thread-scoped history: only messages from the current thread
- Auditable: all calls can be logged/traced
- Rate-limited: platform controls costs
Usage (from handler):
from agentserver.platform import complete
async def handle_greeting(payload, metadata):
response = await complete(
agent_name=metadata.own_name,
thread_id=metadata.thread_id,
user_message=f"Greet {payload.name}",
temperature=0.9,
)
return HandlerResponse(
payload=GreetingResponse(message=response),
to="shouter",
)
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from agentserver.platform.prompt_registry import get_prompt_registry
from agentserver.memory import get_context_buffer
logger = logging.getLogger(__name__)
async def complete(
agent_name: str,
thread_id: str,
user_message: str,
*,
temperature: float = 0.7,
max_tokens: int = 1024,
include_history: bool = True,
**kwargs: Any,
) -> str:
"""
Request an LLM completion for an agent.
The platform assembles the full prompt:
1. System prompt from PromptRegistry (invisible to agent)
2. Peer schemas (what messages agent can send)
3. Thread history from ContextBuffer
4. User's message
Args:
agent_name: The calling agent's name (for prompt lookup)
thread_id: Current thread UUID (for history lookup)
user_message: The user/task message to complete
temperature: LLM temperature (0.0-1.0)
max_tokens: Maximum tokens in response
include_history: Whether to include thread history
**kwargs: Additional LLM parameters
Returns:
The LLM's text response
Raises:
KeyError: If agent has no registered prompt
RuntimeError: If LLM call fails
"""
# Get agent's prompt (agent cannot see this)
prompt_registry = get_prompt_registry()
prompt = prompt_registry.get_required(agent_name)
# Build messages array
messages: List[Dict[str, str]] = []
# System prompt (from registry)
if prompt.system_prompt:
messages.append({
"role": "system",
"content": prompt.system_prompt,
})
# Peer schemas (what messages agent can send)
if prompt.peer_schemas:
messages.append({
"role": "system",
"content": prompt.peer_schemas,
})
# Thread history (agent can read, not modify)
if include_history and thread_id:
context_buffer = get_context_buffer()
history = context_buffer.get_thread(thread_id)
for slot in history:
# Determine role: assistant if from this agent, user otherwise
role = "assistant" if slot.from_id == agent_name else "user"
# Serialize payload for LLM context
content = _serialize_for_llm(slot.payload, slot.from_id)
messages.append({
"role": role,
"content": content,
})
# Current user message
messages.append({
"role": "user",
"content": user_message,
})
# Make LLM call via router
try:
from agentserver.llm import generate
response = await generate(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
**kwargs,
)
logger.debug(
f"platform.complete: agent={agent_name} thread={thread_id[:8]}... "
f"messages={len(messages)} response_len={len(response)}"
)
return response
except Exception as e:
logger.error(f"LLM call failed for {agent_name}: {e}")
raise RuntimeError(f"LLM completion failed: {e}") from e
def _serialize_for_llm(payload: Any, from_id: str) -> str:
"""
Serialize a payload for LLM context.
Converts structured payloads to a readable format for the LLM.
"""
# Try XML serialization first (for xmlify classes)
if hasattr(payload, 'xml_value'):
from lxml import etree
try:
class_name = type(payload).__name__
tree = payload.xml_value(class_name)
xml_str = etree.tostring(tree, encoding='unicode', pretty_print=True)
return f"[From {from_id}]\n{xml_str}"
except Exception:
pass
# Try to_xml for custom classes
if hasattr(payload, 'to_xml'):
try:
return f"[From {from_id}]\n{payload.to_xml()}"
except Exception:
pass
# Fallback to repr
return f"[From {from_id}] {repr(payload)}"
# Alias for cleaner imports
platform_complete = complete

View file

@ -0,0 +1,179 @@
"""
prompt_registry.py Immutable prompt storage for agents.
The PromptRegistry is the trusted store for agent system prompts.
Prompts are loaded at startup and cannot be modified at runtime.
Design principles:
- Immutable: prompts set at startup, never modified
- Invisible: agents cannot see their own prompts
- Auditable: prompts hashed for tracking
- Per-agent: each agent has one system prompt
Usage:
registry = get_prompt_registry()
# At startup (from config loader)
registry.register("greeter", "You are a friendly greeter...")
# At LLM call assembly (platform only)
prompt = registry.get("greeter")
messages = [{"role": "system", "content": prompt.system_prompt}, ...]
"""
from __future__ import annotations
import hashlib
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, Optional
@dataclass(frozen=True)
class AgentPrompt:
"""
Immutable prompt for an agent.
frozen=True ensures prompts cannot be modified after creation.
"""
agent_name: str
system_prompt: str
prompt_hash: str # SHA256 of system_prompt
peer_schemas: str = "" # Generated from peer XSDs
created_at: str = "" # ISO timestamp
@classmethod
def create(
cls,
agent_name: str,
system_prompt: str,
peer_schemas: str = "",
) -> AgentPrompt:
"""Create a new AgentPrompt with computed hash."""
prompt_hash = hashlib.sha256(system_prompt.encode()).hexdigest()[:16]
created_at = datetime.now(timezone.utc).isoformat()
return cls(
agent_name=agent_name,
system_prompt=system_prompt,
prompt_hash=prompt_hash,
peer_schemas=peer_schemas,
created_at=created_at,
)
@property
def full_prompt(self) -> str:
"""Combined system prompt + peer schemas."""
if self.peer_schemas:
return f"{self.system_prompt}\n\n{self.peer_schemas}"
return self.system_prompt
class PromptRegistry:
"""
Immutable registry of agent prompts.
Thread-safe. Prompts can only be registered once per agent.
"""
def __init__(self):
self._prompts: Dict[str, AgentPrompt] = {}
self._lock = threading.Lock()
self._frozen = False
def register(
self,
agent_name: str,
system_prompt: str,
peer_schemas: str = "",
) -> AgentPrompt:
"""
Register a prompt for an agent.
Can only be called during startup (before freeze).
Raises RuntimeError if called after freeze or if agent already registered.
"""
with self._lock:
if self._frozen:
raise RuntimeError(
f"Cannot register prompt for '{agent_name}': registry is frozen"
)
if agent_name in self._prompts:
raise RuntimeError(
f"Cannot register prompt for '{agent_name}': already registered"
)
prompt = AgentPrompt.create(
agent_name=agent_name,
system_prompt=system_prompt,
peer_schemas=peer_schemas,
)
self._prompts[agent_name] = prompt
return prompt
def freeze(self) -> None:
"""
Freeze the registry. No more registrations allowed.
Call this after all prompts are loaded from config.
"""
with self._lock:
self._frozen = True
def get(self, agent_name: str) -> Optional[AgentPrompt]:
"""Get prompt for an agent (None if not registered)."""
with self._lock:
return self._prompts.get(agent_name)
def get_required(self, agent_name: str) -> AgentPrompt:
"""Get prompt for an agent (raises if not found)."""
prompt = self.get(agent_name)
if prompt is None:
raise KeyError(f"No prompt registered for agent: {agent_name}")
return prompt
def has(self, agent_name: str) -> bool:
"""Check if an agent has a registered prompt."""
with self._lock:
return agent_name in self._prompts
def list_agents(self) -> list[str]:
"""List all agents with registered prompts."""
with self._lock:
return list(self._prompts.keys())
def get_stats(self) -> dict:
"""Get registry statistics."""
with self._lock:
return {
"agent_count": len(self._prompts),
"frozen": self._frozen,
"agents": list(self._prompts.keys()),
}
def clear(self) -> None:
"""Clear all prompts (for testing only)."""
with self._lock:
self._prompts.clear()
self._frozen = False
# ============================================================================
# Singleton
# ============================================================================
_registry: Optional[PromptRegistry] = None
_registry_lock = threading.Lock()
def get_prompt_registry() -> PromptRegistry:
"""Get the global PromptRegistry singleton."""
global _registry
if _registry is None:
with _registry_lock:
if _registry is None:
_registry = PromptRegistry()
return _registry

View file

@ -38,6 +38,10 @@ listeners:
description: Greeting agent - forwards to shouter description: Greeting agent - forwards to shouter
agent: true agent: true
peers: [shouter] peers: [shouter]
prompt: |
You are a friendly greeter agent.
When someone sends you a greeting, respond enthusiastically.
Keep your responses short (1-2 sentences).
# Shouter: receives GreetingResponse, sends ShoutedResponse back # Shouter: receives GreetingResponse, sends ShoutedResponse back
- name: shouter - name: shouter

View file

@ -62,8 +62,12 @@ async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> Handl
2. Send GreetingResponse to shouter 2. Send GreetingResponse to shouter
3. When ShoutedResponse appears, eyebrow is raised 3. When ShoutedResponse appears, eyebrow is raised
4. On next invocation, greeter sees nudge and can close the todo 4. On next invocation, greeter sees nudge and can close the todo
NOTE: This handler uses platform.complete() for LLM calls.
The system prompt is managed by the platform (from organism.yaml).
The handler cannot see or modify the prompt.
""" """
from agentserver.llm import complete from agentserver.platform import complete
from agentserver.message_bus.todo_registry import get_todo_registry from agentserver.message_bus.todo_registry import get_todo_registry
# Check for any raised todos and close them # Check for any raised todos and close them
@ -73,10 +77,8 @@ async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> Handl
raised = todo_registry.get_raised_for(metadata.thread_id, metadata.own_name or "greeter") raised = todo_registry.get_raised_for(metadata.thread_id, metadata.own_name or "greeter")
for watcher in raised: for watcher in raised:
todo_registry.close(watcher.id) todo_registry.close(watcher.id)
# In a real scenario, we might log or react to the completed todo
# Register a todo watcher - we want to know when shouter responds # Register a todo watcher - we want to know when shouter responds
# This demonstrates the "await confirmation" pattern
todo_registry.register( todo_registry.register(
thread_id=metadata.thread_id, thread_id=metadata.thread_id,
issuer=metadata.own_name or "greeter", issuer=metadata.own_name or "greeter",
@ -85,30 +87,20 @@ async def handle_greeting(payload: Greeting, metadata: HandlerMetadata) -> Handl
description=f"waiting for shouter to process greeting for {payload.name}", description=f"waiting for shouter to process greeting for {payload.name}",
) )
# Build system prompt with peer awareness # Use platform.complete() for LLM call
system_prompt = "You are a friendly greeter. Respond with ONLY a single short enthusiastic greeting sentence. No XML, no markup, just the greeting text." # The platform assembles: system prompt (from registry) + peer schemas + history + user message
if metadata.usage_instructions: # The handler only provides the user message - no prompt building!
system_prompt = metadata.usage_instructions + "\n\n" + system_prompt
# Include any todo nudges in the prompt (for LLM awareness)
if metadata.todo_nudge:
system_prompt = system_prompt + "\n\n" + metadata.todo_nudge
# Use LLM to generate a creative greeting
llm_response = await complete( llm_response = await complete(
model="grok-3-mini-beta", agent_name=metadata.own_name or "greeter",
messages=[ thread_id=metadata.thread_id,
{"role": "system", "content": system_prompt}, user_message=f"Greet {payload.name} enthusiastically. Respond with ONLY a short greeting sentence.",
{"role": "user", "content": f"Greet {payload.name} enthusiastically."},
],
agent_id=metadata.own_name,
temperature=0.9, temperature=0.9,
) )
# Return clean dataclass + target - pump handles envelope # Return clean dataclass + target - pump handles envelope
return HandlerResponse( return HandlerResponse(
payload=GreetingResponse( payload=GreetingResponse(
message=llm_response.content, message=llm_response,
original_sender="response-handler", original_sender="response-handler",
), ),
to="shouter", to="shouter",