diff --git a/agentserver/console/tui_console.py b/agentserver/console/tui_console.py new file mode 100644 index 0000000..c5ad4bd --- /dev/null +++ b/agentserver/console/tui_console.py @@ -0,0 +1,564 @@ +""" +tui_console.py — Split-screen TUI console using prompt_toolkit. + +Layout: + ┌────────────────────────────────────────────┐ + │ [greeter] Hello! Welcome! │ ← Scrolling output + │ [shouter] HELLO! WELCOME! │ + │ [system] Thread completed │ + │ │ + ├──────────── hello-world ───────────────────┤ ← Status bar + │ > @greeter hi │ ← Input area + └────────────────────────────────────────────┘ + +Features: +- Output scrolls up, input stays at bottom +- Status bar shows organism name +- Color-coded messages by source +- Command history with up/down arrows +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING, List, Optional + +try: + from prompt_toolkit import Application + from prompt_toolkit.buffer import Buffer + from prompt_toolkit.document import Document + from prompt_toolkit.formatted_text import FormattedText, HTML + from prompt_toolkit.key_binding import KeyBindings + from prompt_toolkit.layout import ( + Layout, + HSplit, + Window, + FormattedTextControl, + BufferControl, + ScrollablePane, + ) + from prompt_toolkit.layout.dimension import Dimension + from prompt_toolkit.layout.margins import ScrollbarMargin + from prompt_toolkit.styles import Style + from prompt_toolkit.history import FileHistory + from prompt_toolkit.output.win32 import NoConsoleScreenBufferError + PROMPT_TOOLKIT_AVAILABLE = True +except ImportError: + PROMPT_TOOLKIT_AVAILABLE = False + NoConsoleScreenBufferError = Exception + +if TYPE_CHECKING: + from agentserver.message_bus.stream_pump import StreamPump + + +# ============================================================================ +# Constants +# ============================================================================ + +CONFIG_DIR = Path.home() / ".xml-pipeline" +HISTORY_FILE = CONFIG_DIR / "history" + + +# ============================================================================ +# Style +# ============================================================================ + +STYLE = Style.from_dict({ + "output": "#ffffff", + "output.system": "#888888 italic", + "output.greeter": "#00ff00", + "output.shouter": "#ffff00", + "output.response": "#00ffff", + "output.error": "#ff0000", + "output.dim": "#666666", + "separator": "#444444", + "separator.text": "#888888", + "input": "#ffffff", + "prompt": "#00ff00 bold", +}) + + +# ============================================================================ +# Output Buffer +# ============================================================================ + +class OutputBuffer: + """Manages scrolling output history.""" + + def __init__(self, max_lines: int = 1000): + self.lines: List[tuple] = [] # (style_class, text) + self.max_lines = max_lines + + def append(self, text: str, style: str = "output"): + """Add a line to output.""" + timestamp = datetime.now().strftime("%H:%M:%S") + self.lines.append((style, f"[{timestamp}] {text}")) + if len(self.lines) > self.max_lines: + self.lines = self.lines[-self.max_lines:] + + def append_raw(self, text: str, style: str = "output"): + """Add without timestamp.""" + self.lines.append((style, text)) + if len(self.lines) > self.max_lines: + self.lines = self.lines[-self.max_lines:] + + def get_formatted_text(self) -> FormattedText: + """Get formatted text for display.""" + result = [] + for style, text in self.lines: + result.append((f"class:{style}", text + "\n")) + return FormattedText(result) + + def clear(self): + """Clear output.""" + self.lines.clear() + + +# ============================================================================ +# TUI Console +# ============================================================================ + +class TUIConsole: + """Split-screen terminal UI console.""" + + def __init__(self, pump: StreamPump): + self.pump = pump + self.output = OutputBuffer() + self.running = False + self.attached = True + self.use_simple_mode = False + + # Ensure config dir exists + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + + # Try to build the TUI, fallback to simple mode if needed + try: + if not PROMPT_TOOLKIT_AVAILABLE: + raise ImportError("prompt_toolkit not available") + + # Input buffer with history + self.input_buffer = Buffer( + history=FileHistory(str(HISTORY_FILE)), + multiline=False, + ) + + # Build the UI + self._build_ui() + except (NoConsoleScreenBufferError, ImportError, Exception) as e: + # Fallback to simple mode + self.use_simple_mode = True + self.app = None + print(f"\033[2mNote: Using simple mode ({type(e).__name__})\033[0m") + + def _build_ui(self): + """Build the prompt_toolkit layout.""" + + # Key bindings + kb = KeyBindings() + + @kb.add("enter") + def handle_enter(event): + """Handle enter key - process input.""" + text = self.input_buffer.text.strip() + if text: + # Schedule processing (can't await in key handler) + asyncio.create_task(self._process_input(text)) + self.input_buffer.reset() + + @kb.add("c-c") + def handle_ctrl_c(event): + """Handle Ctrl+C - quit.""" + self.running = False + event.app.exit() + + @kb.add("c-d") + def handle_ctrl_d(event): + """Handle Ctrl+D - quit.""" + self.running = False + event.app.exit() + + @kb.add("c-l") + def handle_ctrl_l(event): + """Handle Ctrl+L - clear output.""" + self.output.clear() + + # Output area (scrolling) + output_control = FormattedTextControl( + text=lambda: self.output.get_formatted_text(), + focusable=False, + ) + + output_window = Window( + content=output_control, + wrap_lines=True, + right_margins=[ScrollbarMargin(display_arrows=True)], + ) + + # Separator line with status + def get_separator(): + name = self.pump.config.name + width = 60 + padding = "─" * ((width - len(name) - 4) // 2) + return FormattedText([ + ("class:separator", padding), + ("class:separator.text", f" {name} "), + ("class:separator", padding), + ]) + + separator = Window( + content=FormattedTextControl(text=get_separator), + height=1, + ) + + # Input area - single window with buffer control + input_control = BufferControl(buffer=self.input_buffer) + input_window = Window( + content=input_control, + height=1, + ) + + # Prompt + input row + from prompt_toolkit.layout import VSplit + + input_row = VSplit([ + Window( + content=FormattedTextControl(text=lambda: FormattedText([("class:prompt", "> ")])), + width=2, + ), + input_window, + ]) + + # Main layout + root = HSplit([ + output_window, + separator, + input_row, + ]) + + self.layout = Layout(root, focused_element=input_window) + + self.app = Application( + layout=self.layout, + key_bindings=kb, + style=STYLE, + full_screen=True, + mouse_support=True, + ) + + def print(self, text: str, style: str = "output"): + """Print to output area.""" + if self.use_simple_mode: + self._print_simple(text, style) + else: + self.output.append(text, style) + if self.app and self.app.is_running: + self.app.invalidate() + + def print_raw(self, text: str, style: str = "output"): + """Print without timestamp.""" + if self.use_simple_mode: + self._print_simple(text, style) + else: + self.output.append_raw(text, style) + if self.app and self.app.is_running: + self.app.invalidate() + + def _print_simple(self, text: str, style: str = "output"): + """Print in simple mode with ANSI colors.""" + colors = { + "output.system": "\033[2m", # Dim + "output.error": "\033[31m", # Red + "output.dim": "\033[2m", # Dim + "output.greeter": "\033[32m", # Green + "output.shouter": "\033[33m", # Yellow + "output.response": "\033[36m", # Cyan + } + color = colors.get(style, "") + reset = "\033[0m" if color else "" + print(f"{color}{text}{reset}") + + def print_system(self, text: str): + """Print system message.""" + self.print(text, "output.system") + + def print_error(self, text: str): + """Print error message.""" + self.print(text, "output.error") + + async def run(self): + """Run the console.""" + self.running = True + + if self.use_simple_mode: + await self._run_simple() + return + + # Welcome message + self.print_raw(f"xml-pipeline console v3.0", "output.system") + self.print_raw(f"Organism: {self.pump.config.name}", "output.system") + self.print_raw(f"Listeners: {len(self.pump.listeners)}", "output.system") + self.print_raw(f"Type /help for commands, @listener message to chat", "output.dim") + self.print_raw("", "output") + + try: + await self.app.run_async() + except Exception as e: + print(f"Console error: {e}") + finally: + self.running = False + + async def _run_simple(self): + """Run in simple mode (fallback for non-TUI terminals).""" + print(f"\033[36mxml-pipeline console v3.0 (simple mode)\033[0m") + print(f"Organism: {self.pump.config.name}") + print(f"Listeners: {len(self.pump.listeners)}") + print(f"\033[2mType /help for commands, @listener message to chat\033[0m") + print() + + while self.running: + try: + line = await asyncio.get_event_loop().run_in_executor( + None, lambda: input("> ") + ) + line = line.strip() + if line: + await self._process_input(line) + except EOFError: + break + except KeyboardInterrupt: + break + + self.running = False + + async def _process_input(self, line: str): + """Process user input.""" + # Echo input to output (only in TUI mode, simple mode already shows it) + if not self.use_simple_mode: + self.print_raw(f"> {line}", "output.dim") + + if line.startswith("/"): + await self._handle_command(line) + elif line.startswith("@"): + await self._handle_message(line) + else: + self.print("Use @listener message or /command", "output.dim") + + # ------------------------------------------------------------------ + # Command Handling + # ------------------------------------------------------------------ + + async def _handle_command(self, line: str): + """Handle /command.""" + parts = line[1:].split(None, 1) + cmd = parts[0].lower() if parts else "" + args = parts[1] if len(parts) > 1 else "" + + handler = getattr(self, f"_cmd_{cmd}", None) + if handler: + await handler(args) + else: + self.print_error(f"Unknown command: /{cmd}") + self.print("Type /help for available commands.", "output.dim") + + async def _cmd_help(self, args: str): + """Show help.""" + self.print_raw("Commands:", "output.system") + self.print_raw(" /help Show this help", "output.dim") + self.print_raw(" /status Show organism status", "output.dim") + self.print_raw(" /listeners List registered listeners", "output.dim") + self.print_raw(" /threads List active threads", "output.dim") + self.print_raw(" /monitor Show messages from thread", "output.dim") + self.print_raw(" /monitor * Show messages from all threads", "output.dim") + self.print_raw(" /clear Clear output", "output.dim") + self.print_raw(" /quit Exit console", "output.dim") + self.print_raw("", "output") + self.print_raw("Messages:", "output.system") + self.print_raw(" @listener message Send message to listener", "output.dim") + self.print_raw("", "output") + self.print_raw("Shortcuts:", "output.system") + self.print_raw(" Ctrl+C / Ctrl+D Quit", "output.dim") + self.print_raw(" Ctrl+L Clear output", "output.dim") + self.print_raw(" Up/Down Command history", "output.dim") + + async def _cmd_status(self, args: str): + """Show status.""" + from agentserver.memory import get_context_buffer + from agentserver.platform import get_prompt_registry + + buffer = get_context_buffer() + registry = get_prompt_registry() + stats = buffer.get_stats() + reg_stats = registry.get_stats() + + self.print_raw(f"Organism: {self.pump.config.name}", "output.system") + self.print_raw(f"Listeners: {len(self.pump.listeners)}", "output.dim") + self.print_raw(f"Agents: {reg_stats['agent_count']} (prompts registered)", "output.dim") + self.print_raw(f"Threads: {stats['thread_count']} active", "output.dim") + self.print_raw(f"Buffer: {stats['total_slots']} slots", "output.dim") + + async def _cmd_listeners(self, args: str): + """List listeners.""" + self.print_raw("Listeners:", "output.system") + for name, listener in self.pump.listeners.items(): + tag = "[agent]" if listener.is_agent else "[handler]" + self.print_raw(f" {name:20} {tag} {listener.description}", "output.dim") + + async def _cmd_threads(self, args: str): + """List threads.""" + from agentserver.memory import get_context_buffer + + buffer = get_context_buffer() + stats = buffer.get_stats() + + if stats["thread_count"] == 0: + self.print_raw("No active threads.", "output.dim") + return + + self.print_raw(f"Active threads ({stats['thread_count']}):", "output.system") + for thread_id in stats.get("threads", [])[:10]: + slots = buffer.get_thread(thread_id) + self.print_raw(f" {thread_id[:8]}... ({len(slots)} slots)", "output.dim") + + async def _cmd_monitor(self, args: str): + """Show messages from thread.""" + from agentserver.memory import get_context_buffer + + buffer = get_context_buffer() + + if args == "*": + # Show all threads + stats = buffer.get_stats() + for thread_id in stats.get("threads", [])[:5]: + self.print_raw(f"Thread {thread_id[:8]}...:", "output.system") + slots = buffer.get_thread(thread_id) + for slot in slots[-5:]: + payload_type = type(slot.payload).__name__ + self.print_raw(f" [{slot.from_id}→{slot.to_id}] {payload_type}", "output.dim") + elif args: + # Find thread by prefix + stats = buffer.get_stats() + matches = [t for t in stats.get("threads", []) if t.startswith(args)] + if not matches: + self.print_error(f"No thread matching: {args}") + return + + thread_id = matches[0] + slots = buffer.get_thread(thread_id) + self.print_raw(f"Thread {thread_id[:8]}... ({len(slots)} slots):", "output.system") + for slot in slots: + payload_type = type(slot.payload).__name__ + preview = str(slot.payload)[:50] + self.print_raw(f" [{slot.from_id}→{slot.to_id}] {payload_type}: {preview}", "output.dim") + else: + self.print("Usage: /monitor or /monitor *", "output.dim") + + async def _cmd_clear(self, args: str): + """Clear output.""" + self.output.clear() + + async def _cmd_quit(self, args: str): + """Quit console.""" + self.print_system("Shutting down...") + self.running = False + if self.app: + self.app.exit() + + # ------------------------------------------------------------------ + # Message Handling + # ------------------------------------------------------------------ + + async def _handle_message(self, line: str): + """Handle @listener message.""" + parts = line[1:].split(None, 1) + if not parts: + self.print("Usage: @listener message", "output.dim") + return + + target = parts[0].lower() + message = parts[1] if len(parts) > 1 else "" + + if target not in self.pump.listeners: + self.print_error(f"Unknown listener: {target}") + self.print("Use /listeners to see available listeners.", "output.dim") + return + + self.print(f"Sending to {target}...", "output.dim") + + # Create payload + listener = self.pump.listeners[target] + payload = self._create_payload(listener, message) + if payload is None: + self.print_error(f"Cannot create payload for {target}") + return + + # Create thread and inject + import uuid + thread_id = str(uuid.uuid4()) + + envelope = self.pump._wrap_in_envelope( + payload=payload, + from_id="console", + to_id=target, + thread_id=thread_id, + ) + + await self.pump.inject(envelope, thread_id=thread_id, from_id="console") + + def _create_payload(self, listener, message: str): + """Create payload instance for listener.""" + payload_class = listener.payload_class + + if hasattr(payload_class, '__dataclass_fields__'): + fields = payload_class.__dataclass_fields__ + field_names = list(fields.keys()) + + if len(field_names) == 1: + return payload_class(**{field_names[0]: message}) + elif 'name' in field_names: + return payload_class(name=message) + elif 'message' in field_names: + return payload_class(message=message) + elif 'text' in field_names: + return payload_class(text=message) + + try: + return payload_class() + except Exception: + return None + + # ------------------------------------------------------------------ + # External Output Hook + # ------------------------------------------------------------------ + + def on_response(self, from_id: str, payload): + """Called when a response arrives (hook for response-handler).""" + payload_type = type(payload).__name__ + + # Determine style based on source + if from_id == "shouter": + style = "output.shouter" + elif from_id == "greeter": + style = "output.greeter" + elif from_id == "response-handler": + style = "output.response" + else: + style = "output" + + # Format the response + if hasattr(payload, 'message'): + text = f"[{from_id}] {payload.message}" + else: + text = f"[{from_id}] {payload}" + + self.print_raw(text, style) + + +# ============================================================================ +# Factory +# ============================================================================ + +def create_tui_console(pump: StreamPump) -> TUIConsole: + """Create a TUI console for the pump.""" + return TUIConsole(pump) diff --git a/handlers/hello.py b/handlers/hello.py index 3f1f84d..0fb086e 100644 --- a/handlers/hello.py +++ b/handlers/hello.py @@ -122,11 +122,19 @@ async def handle_shout(payload: GreetingResponse, metadata: HandlerMetadata) -> async def handle_response_print(payload: ShoutedResponse, metadata: HandlerMetadata) -> None: """ - Print the final response to stdout. + Print the final response to the console. - This is a simple terminal handler for the SecureConsole flow. + Routes output to the TUI console if available, otherwise prints to stdout. """ - # Print on fresh line with color formatting, then reprint prompt + try: + from run_organism import get_console + console = get_console() + if console and hasattr(console, 'on_response'): + console.on_response("response", payload) + return None + except ImportError: + pass + + # Fallback: print to stdout print(f"\n\033[36m[response] {payload.message}\033[0m") - print("> ", end="", flush=True) # Reprint prompt return None diff --git a/run_organism.py b/run_organism.py index fc29c9c..eeed289 100644 --- a/run_organism.py +++ b/run_organism.py @@ -1,17 +1,20 @@ #!/usr/bin/env python3 """ -run_organism.py — Start the organism with secure console. +run_organism.py — Start the organism with TUI console. Usage: python run_organism.py [config.yaml] + python run_organism.py --simple [config.yaml] # Use simple console -This boots the organism with a password-protected console. -The secure console handles privileged operations via local keyboard only. +This boots the organism with a split-screen terminal UI: +- Scrolling output area above +- Status bar separator +- Input area below Flow: - 1. Password authentication - 2. Pump starts processing messagest - 3. Console handles commands and @messages + 1. Bootstrap organism + 2. Start pump in background + 3. Run TUI console 4. /quit shuts down gracefully """ @@ -20,50 +23,78 @@ import sys from pathlib import Path from agentserver.message_bus import bootstrap -from agentserver.console import SecureConsole -async def run_organism(config_path: str = "config/organism.yaml"): - """Boot organism with secure console.""" +# Global console reference for response handler +_console = None - # Bootstrap the pump (registers listeners, but DON'T start yet) + +def get_console(): + """Get the current console instance.""" + return _console + + +async def run_organism(config_path: str = "config/organism.yaml", use_simple: bool = False): + """Boot organism with TUI console.""" + global _console + + # Bootstrap the pump pump = await bootstrap(config_path) - # Create secure console and authenticate FIRST - console = SecureConsole(pump) + if use_simple: + # Use old SecureConsole for compatibility + from agentserver.console import SecureConsole + console = SecureConsole(pump) + if not await console.authenticate(): + print("Authentication failed.") + return + _console = None - # Authenticate before starting pump - if not await console.authenticate(): - print("Authentication failed.") - return - - # Now start the pump in background - pump_task = asyncio.create_task(pump.run()) - - try: - # Run console command loop (already authenticated) - await console.run_command_loop() - finally: - # Ensure pump is shut down - pump_task.cancel() + pump_task = asyncio.create_task(pump.run()) try: - await pump_task - except asyncio.CancelledError: - pass - await pump.shutdown() + await console.run_command_loop() + finally: + pump_task.cancel() + try: + await pump_task + except asyncio.CancelledError: + pass + await pump.shutdown() + print("Goodbye!") + else: + # Use new TUI console + from agentserver.console.tui_console import TUIConsole + console = TUIConsole(pump) + _console = console - print("Goodbye!") + # Start pump in background + pump_task = asyncio.create_task(pump.run()) + + try: + await console.run() + finally: + pump_task.cancel() + try: + await pump_task + except asyncio.CancelledError: + pass + await pump.shutdown() def main(): - config_path = sys.argv[1] if len(sys.argv) > 1 else "config/organism.yaml" + args = sys.argv[1:] + use_simple = "--simple" in args + if use_simple: + args.remove("--simple") + + config_path = args[0] if args else "config/organism.yaml" if not Path(config_path).exists(): print(f"Config not found: {config_path}") sys.exit(1) try: - asyncio.run(run_organism(config_path)) + asyncio.run(run_organism(config_path, use_simple=use_simple)) except KeyboardInterrupt: print("\nInterrupted")