diff --git a/agentserver/console/client.py b/agentserver/console/client.py index fafe293..2cdd151 100644 --- a/agentserver/console/client.py +++ b/agentserver/console/client.py @@ -133,12 +133,13 @@ class ConsoleClient: Available commands: /help - Show this help /status - Show server status - /listeners - List active listeners + /listeners - List available targets + /targets - Alias for /listeners /quit - Disconnect and exit Send messages: - @listener message - Send message to a listener - message - Send to default listener + @target message - Send message to a target listener + Example: @greeter Hello there! """) async def handle_command(self, line: str) -> bool: @@ -161,22 +162,31 @@ Send messages: if resp: threads = resp.get("threads", 0) print(f"Active threads: {threads}") - elif line == "/listeners": + elif line == "/listeners" or line == "/targets": resp = await self.send_command({"type": "listeners"}) if resp: listeners = resp.get("listeners", []) if listeners: - print("Active listeners:") + print("Available targets:") for name in listeners: print(f" - {name}") else: - print("No active listeners") + print("No targets available (pipeline not running)") elif line.startswith("/"): print(f"Unknown command: {line}") + elif line.startswith("@"): + # Send message to target: @target message + resp = await self.send_command({"type": "send", "raw": line}) + if resp: + if resp.get("type") == "sent": + thread_id = resp.get("thread_id", "")[:8] + target = resp.get("target", "unknown") + print(f"Sent to {target} (thread: {thread_id}...)") + elif resp.get("type") == "error": + print(f"Error: {resp.get('error')}") else: - # Send as message - # TODO: Implement message sending when pump is connected - print(f"Message sending not yet implemented: {line}") + print("Use @target message to send. Example: @greeter Hello!") + print("Type /listeners to see available targets.") return True diff --git a/agentserver/message_bus/__init__.py b/agentserver/message_bus/__init__.py index d391572..1353298 100644 --- a/agentserver/message_bus/__init__.py +++ b/agentserver/message_bus/__init__.py @@ -6,15 +6,20 @@ The message pump handles message flow through the organism: Key classes: StreamPump Main pump class (queue-backed, aiostream-powered) + SystemPipeline Entry point for external messages (console, webhook) ConfigLoader Load organism.yaml and resolve imports Listener Runtime listener with handler and routing info MessageState Message flowing through pipeline steps Usage: - from agentserver.message_bus import StreamPump, bootstrap + from agentserver.message_bus import StreamPump, SystemPipeline, bootstrap pump = await bootstrap("config/organism.yaml") - await pump.inject(initial_message, thread_id, from_id) + system = SystemPipeline(pump) + + # Inject from console + thread_id = await system.inject_console("@greeter Dan", user="admin") + await pump.run() """ @@ -32,6 +37,11 @@ from agentserver.message_bus.message_state import ( HandlerMetadata, ) +from agentserver.message_bus.system_pipeline import ( + SystemPipeline, + ExternalMessage, +) + __all__ = [ "StreamPump", "ConfigLoader", @@ -41,4 +51,6 @@ __all__ = [ "MessageState", "HandlerMetadata", "bootstrap", + "SystemPipeline", + "ExternalMessage", ] diff --git a/agentserver/message_bus/system_pipeline.py b/agentserver/message_bus/system_pipeline.py new file mode 100644 index 0000000..b35cba7 --- /dev/null +++ b/agentserver/message_bus/system_pipeline.py @@ -0,0 +1,333 @@ +""" +SystemPipeline — Entry point for external messages. + +All messages from the outside world flow through this pipeline: +- Console input (@target message) +- Webhook/API calls +- Boot sequence + +The system pipeline transforms raw input into proper XML envelopes +and injects them into the main message pump. + +Architecture: + ┌─────────────────────────────────────────────────────────┐ + │ System Pipeline │ + │ [ingress] → [validate] → [envelope] → [route] │ + └─────────────────────────────────────────────────────────┘ + ↑ ↑ ↑ ↓ + console webhook boot StreamPump +""" + +from __future__ import annotations + +import re +import uuid +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Optional, Callable, Any + +if TYPE_CHECKING: + from .stream_pump import StreamPump + +from agentserver.primitives.text_input import TextInput, TextOutput + +logger = logging.getLogger(__name__) + + +@dataclass +class ExternalMessage: + """ + Raw input from external source before processing. + + This is the intermediate representation in the system pipeline. + """ + content: str + target: Optional[str] = None # Listener name (from @target or explicit) + source: str = "console" # console, webhook, api, boot + user: Optional[str] = None # Authenticated user + timestamp: Optional[datetime] = None + metadata: dict = None + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = datetime.now(timezone.utc) + if self.metadata is None: + self.metadata = {} + + +class SystemPipeline: + """ + Entry point for all external messages. + + Transforms raw input into XML envelopes and injects into the pump. + + Usage: + pump = StreamPump(config) + system = SystemPipeline(pump) + + # From console + thread_id = await system.inject_console("@greeter Dan", user="admin") + + # From webhook + thread_id = await system.inject_webhook(json_data, source="github") + + # Track responses + system.subscribe(thread_id, callback) + """ + + # Pattern for @target message format + TARGET_PATTERN = re.compile(r'^@(\S+)\s+(.+)$', re.DOTALL) + + def __init__(self, pump: 'StreamPump'): + self.pump = pump + + # Response callbacks by thread_id + self._subscribers: dict[str, list[Callable]] = {} + + # Validation rules + self._rate_limits: dict[str, int] = {} # user -> count + self._max_rate: int = 100 # messages per minute + + # ------------------------------------------------------------------ + # Ingress: Accept raw input + # ------------------------------------------------------------------ + + async def inject_console( + self, + raw: str, + user: str, + default_target: Optional[str] = None, + ) -> str: + """ + Inject console input into the pipeline. + + Format: @target message + Or just: message (uses default_target) + + Args: + raw: Raw console input + user: Authenticated username + default_target: Default listener if no @target specified + + Returns: + thread_id for tracking the conversation + """ + raw = raw.strip() + if not raw: + raise ValueError("Empty message") + + # Parse @target format + match = self.TARGET_PATTERN.match(raw) + if match: + target = match.group(1) + content = match.group(2).strip() + elif default_target: + target = default_target + content = raw + else: + raise ValueError("No target specified. Use @target message format.") + + msg = ExternalMessage( + content=content, + target=target, + source="console", + user=user, + ) + + return await self._process(msg) + + async def inject_webhook( + self, + data: dict, + source: str = "webhook", + user: Optional[str] = None, + ) -> str: + """ + Inject webhook/API payload into the pipeline. + + Expected format: + { + "target": "listener_name", + "content": "message text", + "metadata": {...} # optional + } + + Returns: + thread_id for tracking + """ + target = data.get("target") + content = data.get("content", data.get("text", data.get("message", ""))) + + if not target: + raise ValueError("Webhook missing 'target' field") + if not content: + raise ValueError("Webhook missing 'content' field") + + msg = ExternalMessage( + content=content, + target=target, + source=source, + user=user, + metadata=data.get("metadata", {}), + ) + + return await self._process(msg) + + async def inject_raw( + self, + target: str, + content: str, + source: str = "api", + user: Optional[str] = None, + ) -> str: + """ + Direct injection with explicit target and content. + + Useful for programmatic access. + """ + msg = ExternalMessage( + content=content, + target=target, + source=source, + user=user, + ) + return await self._process(msg) + + # ------------------------------------------------------------------ + # Processing Pipeline + # ------------------------------------------------------------------ + + async def _process(self, msg: ExternalMessage) -> str: + """ + Process external message through system pipeline. + + Steps: + 1. Validate (permissions, rate limits) + 2. Create payload (TextInput) + 3. Wrap in envelope + 4. Inject into pump + """ + # Step 1: Validate + await self._validate(msg) + + # Step 2: Create payload + payload = self._create_payload(msg) + + # Step 3: Generate thread ID + thread_id = self._generate_thread_id() + + # Step 4: Wrap in envelope + envelope = self._wrap_envelope(payload, msg.target, thread_id, msg.source, msg.user) + + # Step 5: Inject into pump + from_id = f"{msg.source}:{msg.user}" if msg.user else msg.source + await self.pump.inject(envelope, thread_id=thread_id, from_id=from_id) + + logger.info(f"Injected {msg.source} message to {msg.target}: {thread_id[:8]}...") + return thread_id + + async def _validate(self, msg: ExternalMessage) -> None: + """ + Validate message. + + Checks: + - Target listener exists + - User has permission (if applicable) + - Rate limits not exceeded + """ + # Check target exists + if msg.target not in self.pump.listeners: + available = list(self.pump.listeners.keys()) + raise ValueError(f"Unknown target: {msg.target}. Available: {available}") + + # Rate limiting (simple per-user counter) + if msg.user: + count = self._rate_limits.get(msg.user, 0) + if count >= self._max_rate: + raise ValueError(f"Rate limit exceeded for user {msg.user}") + self._rate_limits[msg.user] = count + 1 + + def _create_payload(self, msg: ExternalMessage) -> TextInput: + """Create TextInput payload from external message.""" + return TextInput( + text=msg.content, + source=msg.source, + user=msg.user, + ) + + def _generate_thread_id(self) -> str: + """Generate unique thread ID for external conversation.""" + return str(uuid.uuid4()) + + def _wrap_envelope( + self, + payload: TextInput, + target: str, + thread_id: str, + source: str, + user: Optional[str], + ) -> bytes: + """Wrap payload in XML envelope.""" + # Use pump's envelope wrapper + from_id = f"{source}:{user}" if user else source + return self.pump._wrap_in_envelope( + payload=payload, + from_id=from_id, + to_id=target, + thread_id=thread_id, + ) + + # ------------------------------------------------------------------ + # Response Tracking + # ------------------------------------------------------------------ + + def subscribe(self, thread_id: str, callback: Callable[[Any], None]) -> None: + """ + Subscribe to responses for a thread. + + The callback will be called when messages are sent back + to the originating source (console, webhook, etc). + """ + if thread_id not in self._subscribers: + self._subscribers[thread_id] = [] + self._subscribers[thread_id].append(callback) + + def unsubscribe(self, thread_id: str, callback: Callable[[Any], None]) -> None: + """Remove subscription.""" + if thread_id in self._subscribers: + self._subscribers[thread_id] = [ + cb for cb in self._subscribers[thread_id] if cb != callback + ] + + async def notify_response(self, thread_id: str, payload: Any) -> None: + """ + Notify subscribers of a response. + + Called by the pump when a message is routed back to an external source. + """ + callbacks = self._subscribers.get(thread_id, []) + for cb in callbacks: + try: + if asyncio.iscoroutinefunction(cb): + await cb(payload) + else: + cb(payload) + except Exception as e: + logger.error(f"Subscriber callback error: {e}") + + # ------------------------------------------------------------------ + # Utilities + # ------------------------------------------------------------------ + + def list_targets(self) -> list[str]: + """List available target listeners.""" + return list(self.pump.listeners.keys()) + + def reset_rate_limits(self) -> None: + """Reset rate limit counters (call periodically).""" + self._rate_limits.clear() + + +# Need asyncio for notify_response +import asyncio diff --git a/agentserver/primitives/__init__.py b/agentserver/primitives/__init__.py index 5d95412..538a5f4 100644 --- a/agentserver/primitives/__init__.py +++ b/agentserver/primitives/__init__.py @@ -14,6 +14,7 @@ from agentserver.primitives.todo import ( handle_todo_until, handle_todo_complete, ) +from agentserver.primitives.text_input import TextInput, TextOutput __all__ = [ "Boot", @@ -24,4 +25,6 @@ __all__ = [ "TodoClosed", "handle_todo_until", "handle_todo_complete", + "TextInput", + "TextOutput", ] diff --git a/agentserver/primitives/text_input.py b/agentserver/primitives/text_input.py new file mode 100644 index 0000000..681cf5c --- /dev/null +++ b/agentserver/primitives/text_input.py @@ -0,0 +1,45 @@ +""" +TextInput — Generic text message for external/human input. + +This primitive allows external sources (console, webhook, API) to send +simple text messages to listeners without needing to know their schema. + +Listeners that want to accept human input should handle TextInput. +""" + +# Note: Do NOT use `from __future__ import annotations` here +# as it breaks the xmlify decorator which needs concrete types + +from dataclasses import dataclass, field +from typing import Optional + +from third_party.xmlable import xmlify + + +@xmlify +@dataclass +class TextInput: + """ + Generic text input from external sources. + + Attributes: + text: The message content + source: Origin of the message (console, webhook, api) + user: Authenticated user who sent it (if any) + """ + text: str + source: str = "console" + user: str = "" # Empty string instead of Optional for xmlify compatibility + + +@xmlify +@dataclass +class TextOutput: + """ + Generic text output for responses to external sources. + + Used when a listener wants to send a simple text response + back to the console/webhook/api. + """ + text: str + status: str = "ok" # ok, error, pending diff --git a/agentserver/server/app.py b/agentserver/server/app.py index daf3903..cd3a83c 100644 --- a/agentserver/server/app.py +++ b/agentserver/server/app.py @@ -1,5 +1,10 @@ """ aiohttp-based HTTP/WebSocket server. + +Provides: +- REST API for authentication +- WebSocket for console/GUI message sending +- Integration with SystemPipeline for message injection """ from __future__ import annotations @@ -22,6 +27,7 @@ from ..auth.sessions import get_session_manager, SessionManager, Session if TYPE_CHECKING: from ..message_bus.stream_pump import StreamPump + from ..message_bus.system_pipeline import SystemPipeline logger = logging.getLogger(__name__) @@ -31,20 +37,20 @@ def auth_middleware(): async def middleware(request, handler): if request.path in ("/auth/login", "/health"): return await handler(request) - + auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return web.json_response({"error": "Missing Authorization"}, status=401) - + token = auth_header[7:] session = request.app["session_manager"].validate(token) - + if not session: return web.json_response({"error": "Invalid token"}, status=401) - + request["session"] = session return await handler(request) - + return middleware @@ -53,17 +59,17 @@ async def handle_login(request): data = await request.json() except: return web.json_response({"error": "Invalid JSON"}, status=400) - + username = data.get("username", "") password = data.get("password", "") - + if not username or not password: return web.json_response({"error": "Credentials required"}, status=400) - + user = request.app["user_store"].authenticate(username, password) if not user: return web.json_response({"error": "Invalid credentials"}, status=401) - + session = request.app["session_manager"].create(user.username, user.role) return web.json_response(session.to_dict()) @@ -90,69 +96,167 @@ async def handle_health(request): async def handle_websocket(request): session = request["session"] pump = request.app.get("pump") - + system_pipeline = request.app.get("system_pipeline") + ws = web.WebSocketResponse() await ws.prepare(request) - + + # Track this WebSocket for response delivery + ws_id = id(ws) + request.app["websockets"][ws_id] = { + "ws": ws, + "user": session.username, + "threads": set(), # Thread IDs this client is subscribed to + } + await ws.send_json({"type": "connected", "username": session.username}) - - async for msg in ws: - if msg.type == WSMsgType.TEXT: - try: - data = json.loads(msg.data) - resp = await handle_ws_msg(data, session, pump) - await ws.send_json(resp) - except Exception as e: - await ws.send_json({"type": "error", "error": str(e)}) - + + try: + async for msg in ws: + if msg.type == WSMsgType.TEXT: + try: + data = json.loads(msg.data) + resp = await handle_ws_msg( + data, session, pump, system_pipeline, + request.app["websockets"][ws_id] + ) + await ws.send_json(resp) + except Exception as e: + logger.exception(f"WebSocket error: {e}") + await ws.send_json({"type": "error", "error": str(e)}) + finally: + # Cleanup on disconnect + del request.app["websockets"][ws_id] + return ws -async def handle_ws_msg(data, session, pump): +async def handle_ws_msg(data, session, pump, system_pipeline, ws_state): + """ + Handle WebSocket message. + + Message types: + ping - Keepalive + status - Get server status + listeners - List available listeners + targets - Alias for listeners + send - Send message to pipeline (@target or explicit) + """ t = data.get("type", "") - + if t == "ping": return {"type": "pong"} + elif t == "status": from ..memory import get_context_buffer stats = get_context_buffer().get_stats() return {"type": "status", "threads": stats["thread_count"]} - elif t == "listeners": + + elif t == "listeners" or t == "targets": if not pump: return {"type": "listeners", "listeners": []} return {"type": "listeners", "listeners": list(pump.listeners.keys())} - - return {"type": "error", "error": f"Unknown: {t}"} + + elif t == "send": + # Send message to pipeline + if not system_pipeline: + return {"type": "error", "error": "Pipeline not available"} + + # Support two formats: + # 1. {"type": "send", "raw": "@greeter Dan"} + # 2. {"type": "send", "target": "greeter", "content": "Dan"} + raw = data.get("raw") + if raw: + # Parse @target message format + try: + thread_id = await system_pipeline.inject_console( + raw=raw, + user=session.username, + ) + except ValueError as e: + return {"type": "error", "error": str(e)} + else: + target = data.get("target") + content = data.get("content", data.get("text", data.get("message", ""))) + + if not target: + return {"type": "error", "error": "Missing target"} + if not content: + return {"type": "error", "error": "Missing content"} + + try: + thread_id = await system_pipeline.inject_raw( + target=target, + content=content, + source="websocket", + user=session.username, + ) + except ValueError as e: + return {"type": "error", "error": str(e)} + + # Track thread for response delivery + ws_state["threads"].add(thread_id) + + return { + "type": "sent", + "thread_id": thread_id, + "target": data.get("target") or raw.split()[0].lstrip("@") if raw else None, + } + + return {"type": "error", "error": f"Unknown message type: {t}"} -def create_app(pump=None): +def create_app(pump=None, system_pipeline=None): + """ + Create the aiohttp application. + + Args: + pump: StreamPump instance (optional) + system_pipeline: SystemPipeline instance (optional, created from pump if not provided) + """ if not AIOHTTP_AVAILABLE: raise RuntimeError("aiohttp not installed") - + app = web.Application(middlewares=[auth_middleware()]) app["user_store"] = get_user_store() app["session_manager"] = get_session_manager() app["pump"] = pump - + app["websockets"] = {} # Track connected WebSocket clients + + # Create SystemPipeline if pump provided but system_pipeline not + if pump and not system_pipeline: + from ..message_bus.system_pipeline import SystemPipeline + system_pipeline = SystemPipeline(pump) + + app["system_pipeline"] = system_pipeline + app.router.add_post("/auth/login", handle_login) app.router.add_post("/auth/logout", handle_logout) app.router.add_get("/auth/me", handle_me) app.router.add_get("/health", handle_health) app.router.add_get("/ws", handle_websocket) - + return app async def run_server(pump=None, host="127.0.0.1", port=8765): + """ + Run the server. + + Args: + pump: StreamPump instance for message handling + host: Bind address + port: Port number + """ app = create_app(pump) runner = web.AppRunner(app) await runner.setup() - + site = web.TCPSite(runner, host, port) await site.start() - + print(f"Server on http://{host}:{port}") - + try: while True: await asyncio.sleep(3600)