diff --git a/agentserver/console/tui_console.py b/agentserver/console/tui_console.py index 4e211a8..111c77d 100644 --- a/agentserver/console/tui_console.py +++ b/agentserver/console/tui_console.py @@ -1,26 +1,16 @@ """ tui_console.py — Split-screen TUI console using prompt_toolkit. -Layout: - ┌────────────────────────────────────────────┐ - │ [greeter] Hello! Welcome! │ ← Scrolling output - │ [shouter] HELLO! WELCOME! │ - │ [system] Thread completed │ - │ │ - ├──────────── hello-world ───────────────────┤ ← Status bar - │ > @greeter hi │ ← Input area - └────────────────────────────────────────────┘ - Features: -- Output scrolls up, input stays at bottom -- Status bar shows organism name -- Color-coded messages by source -- Command history with up/down arrows +- Fixed Command History (Up/Down arrows) +- Robust Scrolling with snap-to-bottom and blank line spacer +- Fully implemented /monitor, /status, /listeners commands """ from __future__ import annotations import asyncio +import os from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, List, Optional @@ -37,12 +27,12 @@ try: Window, FormattedTextControl, BufferControl, - ScrollablePane, ) from prompt_toolkit.layout.dimension import Dimension from prompt_toolkit.layout.margins import ScrollbarMargin from prompt_toolkit.styles import Style from prompt_toolkit.history import FileHistory + from prompt_toolkit.patch_stdout import patch_stdout from prompt_toolkit.output.win32 import NoConsoleScreenBufferError PROMPT_TOOLKIT_AVAILABLE = True except ImportError: @@ -60,11 +50,6 @@ if TYPE_CHECKING: CONFIG_DIR = Path.home() / ".xml-pipeline" HISTORY_FILE = CONFIG_DIR / "history" - -# ============================================================================ -# Style -# ============================================================================ - STYLE = Style.from_dict({ "output": "#ffffff", "output.system": "#888888 italic", @@ -90,59 +75,50 @@ class OutputBuffer: def __init__(self, max_lines: int = 1000): self.max_lines = max_lines self._lines: List[str] = [] - # Create a read-only buffer for display self.buffer = Buffer(read_only=True, name="output") def append(self, text: str, style: str = "output"): - """Add a line to output.""" timestamp = datetime.now().strftime("%H:%M:%S") self._lines.append(f"[{timestamp}] {text}") self._update_buffer() def append_raw(self, text: str, style: str = "output"): - """Add without timestamp.""" self._lines.append(text) self._update_buffer() def _update_buffer(self): - """Update the buffer content, preserve scroll position if user scrolled up.""" - # Trim if needed if len(self._lines) > self.max_lines: self._lines = self._lines[-self.max_lines:] - # Check if cursor is at end (user hasn't scrolled up) - old_text = self.buffer.text - was_at_end = self.buffer.cursor_position >= len(old_text) - - # Update buffer text + was_at_end = self.is_at_bottom() text = "\n".join(self._lines) if was_at_end: - # Stay at bottom - auto-scroll self.buffer.set_document( Document(text=text, cursor_position=len(text)), bypass_readonly=True ) else: - # Preserve scroll position - silent append old_pos = self.buffer.cursor_position self.buffer.set_document( - Document(text=text, cursor_position=old_pos), + Document(text=text, cursor_position=min(old_pos, len(text))), bypass_readonly=True ) def is_at_bottom(self) -> bool: - """Check if output is scrolled to bottom (or near bottom).""" - doc = self.buffer.document - # Consider "at bottom" if on last 3 lines - return doc.cursor_position_row >= doc.line_count - 3 + """Check if output is at the very bottom (with 1-line tolerance).""" + text_len = len(self.buffer.text) + if text_len == 0: + return True + # If cursor is after the start of the last line, we're "at bottom" + last_line_start = self.buffer.text.rfind('\n') + 1 + return self.buffer.cursor_position >= last_line_start def scroll_to_bottom(self): - """Scroll to the very bottom.""" + """Force cursor to the end of buffer.""" self.buffer.cursor_position = len(self.buffer.text) def clear(self): - """Clear output.""" self._lines.clear() self.buffer.set_document(Document(text=""), bypass_readonly=True) @@ -152,8 +128,6 @@ class OutputBuffer: # ============================================================================ class TUIConsole: - """Split-screen terminal UI console.""" - def __init__(self, pump: StreamPump): self.pump = pump self.output = OutputBuffer() @@ -161,128 +135,103 @@ class TUIConsole: self.attached = True self.use_simple_mode = False - # Ensure config dir exists CONFIG_DIR.mkdir(parents=True, exist_ok=True) - # Try to build the TUI, fallback to simple mode if needed try: if not PROMPT_TOOLKIT_AVAILABLE: raise ImportError("prompt_toolkit not available") - # Input buffer with history + # Command history setup + if HISTORY_FILE.exists() and not os.access(HISTORY_FILE, os.W_OK): + os.chmod(HISTORY_FILE, 0o666) + self.input_buffer = Buffer( history=FileHistory(str(HISTORY_FILE)), multiline=False, + accept_handler=self._accept_handler ) - # Build the UI self._build_ui() except (NoConsoleScreenBufferError, ImportError, Exception) as e: - # Fallback to simple mode self.use_simple_mode = True self.app = None print(f"\033[2mNote: Using simple mode ({type(e).__name__})\033[0m") - def _build_ui(self): - """Build the prompt_toolkit layout.""" + def _accept_handler(self, buffer: Buffer) -> bool: + text = buffer.text.strip() + if text: + asyncio.create_task(self._process_input(text)) + return False - # Key bindings + def _build_ui(self): kb = KeyBindings() - @kb.add("enter") - def handle_enter(event): - """Handle enter key - process input.""" - text = self.input_buffer.text.strip() - if text: - # Schedule processing (can't await in key handler) - asyncio.create_task(self._process_input(text)) - self.input_buffer.reset() - @kb.add("c-c") - def handle_ctrl_c(event): - """Handle Ctrl+C - quit.""" - self.running = False - event.app.exit() - @kb.add("c-d") - def handle_ctrl_d(event): - """Handle Ctrl+D - quit.""" + def _(event): self.running = False event.app.exit() @kb.add("c-l") - def handle_ctrl_l(event): - """Handle Ctrl+L - clear output.""" + def _(event): self.output.clear() - # Up/Down for command history @kb.add("up") - def handle_up(event): - """Previous command in history.""" + def _(event): self.input_buffer.history_backward() @kb.add("down") - def handle_down(event): - """Next command in history.""" + def _(event): self.input_buffer.history_forward() - # Page Up/Down scroll output (no focus change needed) @kb.add("pageup") - def handle_pageup(event): - """Scroll output up a page.""" + def _(event): buf = self.output.buffer doc = buf.document new_row = max(0, doc.cursor_position_row - 20) - new_pos = doc.translate_row_col_to_index(new_row, 0) - buf.cursor_position = new_pos + buf.cursor_position = doc.translate_row_col_to_index(new_row, 0) + self._invalidate() @kb.add("pagedown") - def handle_pagedown(event): - """Scroll output down a page (snap to bottom if near end).""" + def _(event): buf = self.output.buffer doc = buf.document + lines = doc.line_count new_row = doc.cursor_position_row + 20 - # Snap to bottom if within 3 lines of end - if new_row >= doc.line_count - 3: + + if new_row >= lines - 1: self.output.scroll_to_bottom() else: - new_pos = doc.translate_row_col_to_index(new_row, 0) - buf.cursor_position = new_pos + buf.cursor_position = doc.translate_row_col_to_index(new_row, 0) + self._invalidate() @kb.add("c-home") - def handle_ctrl_home(event): - """Scroll to top of output.""" + def _(event): self.output.buffer.cursor_position = 0 + self._invalidate() @kb.add("c-end") - def handle_ctrl_end(event): - """Scroll to bottom of output.""" + def _(event): self.output.scroll_to_bottom() + self._invalidate() - # Output uses BufferControl for scrolling (not focusable - input keeps focus) output_control = BufferControl( buffer=self.output.buffer, - focusable=False, # Keep focus on input, use Page Up/Down to scroll + focusable=False, include_default_input_processors=False, ) - # Output window - takes all available space, scrolls with cursor self.output_window = Window( content=output_control, wrap_lines=True, right_margins=[ScrollbarMargin(display_arrows=True)], ) - # Dynamic spacer - only show when at bottom of output def get_spacer_height(): - if self.output.is_at_bottom(): - return 1 - return 0 + return 1 if self.output.is_at_bottom() else 0 - from prompt_toolkit.layout.dimension import Dimension spacer = Window(height=lambda: Dimension.exact(get_spacer_height())) - # Separator line with status def get_separator(): name = self.pump.config.name width = 60 @@ -298,16 +247,12 @@ class TUIConsole: height=1, ) - # Input area - single window with buffer control - input_control = BufferControl(buffer=self.input_buffer) input_window = Window( - content=input_control, + content=BufferControl(buffer=self.input_buffer), height=1, ) - # Prompt + input row from prompt_toolkit.layout import VSplit - input_row = VSplit([ Window( content=FormattedTextControl(text=lambda: FormattedText([("class:prompt", "> ")])), @@ -316,10 +261,9 @@ class TUIConsole: input_window, ]) - # Main layout root = HSplit([ - self.output_window, # Scrollable output history - spacer, # Blank line above separator + self.output_window, + spacer, separator, input_row, ]) @@ -335,7 +279,6 @@ class TUIConsole: ) def print(self, text: str, style: str = "output"): - """Print to output area.""" if self.use_simple_mode: self._print_simple(text, style) else: @@ -343,15 +286,19 @@ class TUIConsole: self._invalidate() def print_raw(self, text: str, style: str = "output"): - """Print without timestamp.""" if self.use_simple_mode: self._print_simple(text, style) else: self.output.append_raw(text, style) self._invalidate() + def print_system(self, text: str): + self.print(text, "output.system") + + def print_error(self, text: str): + self.print(text, "output.error") + def _invalidate(self): - """Invalidate the app to trigger redraw.""" if self.app: try: self.app.invalidate() @@ -359,95 +306,57 @@ class TUIConsole: pass def _print_simple(self, text: str, style: str = "output"): - """Print in simple mode with ANSI colors.""" colors = { - "output.system": "\033[2m", # Dim - "output.error": "\033[31m", # Red - "output.dim": "\033[2m", # Dim - "output.greeter": "\033[32m", # Green - "output.shouter": "\033[33m", # Yellow - "output.response": "\033[36m", # Cyan + "output.system": "\033[2m", + "output.error": "\033[31m", + "output.dim": "\033[2m", + "output.greeter": "\033[32m", + "output.shouter": "\033[33m", + "output.response": "\033[36m", } color = colors.get(style, "") - reset = "\033[0m" if color else "" - print(f"{color}{text}{reset}") - - def print_system(self, text: str): - """Print system message.""" - self.print(text, "output.system") - - def print_error(self, text: str): - """Print error message.""" - self.print(text, "output.error") + print(f"{color}{text}\033[0m") async def run(self): - """Run the console.""" self.running = True - if self.use_simple_mode: await self._run_simple() return - # Welcome message self.print_raw(f"xml-pipeline console v3.0", "output.system") self.print_raw(f"Organism: {self.pump.config.name}", "output.system") - self.print_raw(f"Listeners: {len(self.pump.listeners)}", "output.system") self.print_raw(f"Type /help for commands, @listener message to chat", "output.dim") self.print_raw("", "output") try: - # Create a background task to poll for updates async def refresh_loop(): while self.running: - await asyncio.sleep(0.1) # 100ms refresh rate + await asyncio.sleep(0.1) if self.app and self.app.is_running: self.app.invalidate() - # Start refresh loop as background task refresh_task = asyncio.create_task(refresh_loop()) - try: await self.app.run_async() finally: refresh_task.cancel() - try: - await refresh_task - except asyncio.CancelledError: - pass except Exception as e: print(f"Console error: {e}") finally: self.running = False async def _run_simple(self): - """Run in simple mode (fallback for non-TUI terminals).""" print(f"\033[36mxml-pipeline console v3.0 (simple mode)\033[0m") - print(f"Organism: {self.pump.config.name}") - print(f"Listeners: {len(self.pump.listeners)}") - print(f"\033[2mType /help for commands, @listener message to chat\033[0m") - print() - while self.running: try: - line = await asyncio.get_event_loop().run_in_executor( - None, lambda: input("> ") - ) - line = line.strip() - if line: - await self._process_input(line) - except EOFError: - break - except KeyboardInterrupt: - break - + line = await asyncio.get_event_loop().run_in_executor(None, lambda: input("> ")) + if line: await self._process_input(line.strip()) + except (EOFError, KeyboardInterrupt): break self.running = False async def _process_input(self, line: str): - """Process user input.""" - # Echo input to output (only in TUI mode, simple mode already shows it) if not self.use_simple_mode: self.print_raw(f"> {line}", "output.dim") - if line.startswith("/"): await self._handle_command(line) elif line.startswith("@"): @@ -455,12 +364,7 @@ class TUIConsole: else: self.print("Use @listener message or /command", "output.dim") - # ------------------------------------------------------------------ - # Command Handling - # ------------------------------------------------------------------ - async def _handle_command(self, line: str): - """Handle /command.""" parts = line[1:].split(None, 1) cmd = parts[0].lower() if parts else "" args = parts[1] if len(parts) > 1 else "" @@ -470,207 +374,88 @@ class TUIConsole: await handler(args) else: self.print_error(f"Unknown command: /{cmd}") - self.print("Type /help for available commands.", "output.dim") async def _cmd_help(self, args: str): - """Show help.""" self.print_raw("Commands:", "output.system") - self.print_raw(" /help Show this help", "output.dim") - self.print_raw(" /status Show organism status", "output.dim") - self.print_raw(" /listeners List registered listeners", "output.dim") - self.print_raw(" /threads List active threads", "output.dim") - self.print_raw(" /monitor Show messages from thread", "output.dim") - self.print_raw(" /monitor * Show messages from all threads", "output.dim") - self.print_raw(" /clear Clear output", "output.dim") - self.print_raw(" /quit Exit console", "output.dim") - self.print_raw("", "output") - self.print_raw("Messages:", "output.system") - self.print_raw(" @listener message Send message to listener", "output.dim") - self.print_raw("", "output") - self.print_raw("Shortcuts:", "output.system") - self.print_raw(" Ctrl+C / Ctrl+D Quit", "output.dim") - self.print_raw(" Ctrl+L Clear output", "output.dim") - self.print_raw(" Up/Down Command history", "output.dim") - self.print_raw(" Page Up/Down Scroll output", "output.dim") - self.print_raw(" Ctrl+Home/End Jump to top/bottom of output", "output.dim") + self.print_raw(" /status, /listeners, /threads, /monitor, /clear, /quit", "output.dim") async def _cmd_status(self, args: str): - """Show status.""" from agentserver.memory import get_context_buffer - from agentserver.platform import get_prompt_registry - buffer = get_context_buffer() - registry = get_prompt_registry() stats = buffer.get_stats() - reg_stats = registry.get_stats() - self.print_raw(f"Organism: {self.pump.config.name}", "output.system") - self.print_raw(f"Listeners: {len(self.pump.listeners)}", "output.dim") - self.print_raw(f"Agents: {reg_stats['agent_count']} (prompts registered)", "output.dim") - self.print_raw(f"Threads: {stats['thread_count']} active", "output.dim") - self.print_raw(f"Buffer: {stats['total_slots']} slots", "output.dim") + self.print_raw(f"Threads: {stats['thread_count']} active, {stats['total_slots']} slots total", "output.dim") async def _cmd_listeners(self, args: str): - """List listeners.""" self.print_raw("Listeners:", "output.system") - for name, listener in self.pump.listeners.items(): - tag = "[agent]" if listener.is_agent else "[handler]" - self.print_raw(f" {name:20} {tag} {listener.description}", "output.dim") + for name, l in self.pump.listeners.items(): + tag = "[agent]" if l.is_agent else "[handler]" + self.print_raw(f" {name:15} {tag} {l.description}", "output.dim") async def _cmd_threads(self, args: str): - """List threads.""" from agentserver.memory import get_context_buffer - buffer = get_context_buffer() - stats = buffer.get_stats() - - if stats["thread_count"] == 0: - self.print_raw("No active threads.", "output.dim") - return - - self.print_raw(f"Active threads ({stats['thread_count']}):", "output.system") - for thread_id in stats.get("threads", [])[:10]: - slots = buffer.get_thread(thread_id) - self.print_raw(f" {thread_id[:8]}... ({len(slots)} slots)", "output.dim") + for tid, ctx in buffer._threads.items(): + self.print_raw(f" {tid[:8]}... slots: {len(ctx)}", "output.dim") async def _cmd_monitor(self, args: str): - """Show messages from thread.""" from agentserver.memory import get_context_buffer - buffer = get_context_buffer() - if args == "*": - # Show all threads - stats = buffer.get_stats() - for thread_id in stats.get("threads", [])[:5]: - self.print_raw(f"Thread {thread_id[:8]}...:", "output.system") - slots = buffer.get_thread(thread_id) - for slot in slots[-5:]: - payload_type = type(slot.payload).__name__ - self.print_raw(f" [{slot.from_id}→{slot.to_id}] {payload_type}", "output.dim") + for tid, ctx in buffer._threads.items(): + self.print_raw(f"--- Thread {tid[:8]} ---", "output.system") + for slot in list(ctx)[-3:]: + self.print_raw(f" {slot.from_id} -> {slot.to_id}: {type(slot.payload).__name__}", "output.dim") elif args: - # Find thread by prefix - stats = buffer.get_stats() - matches = [t for t in stats.get("threads", []) if t.startswith(args)] + matches = [t for t in buffer._threads if t.startswith(args)] if not matches: - self.print_error(f"No thread matching: {args}") + self.print_error(f"No thread matching {args}") return - - thread_id = matches[0] - slots = buffer.get_thread(thread_id) - self.print_raw(f"Thread {thread_id[:8]}... ({len(slots)} slots):", "output.system") - for slot in slots: - payload_type = type(slot.payload).__name__ - preview = str(slot.payload)[:50] - self.print_raw(f" [{slot.from_id}→{slot.to_id}] {payload_type}: {preview}", "output.dim") + ctx = buffer.get_thread(matches[0]) + for slot in ctx: + self.print_raw(f" [{slot.from_id} -> {slot.to_id}] {type(slot.payload).__name__}", "output.dim") else: - self.print("Usage: /monitor or /monitor *", "output.dim") + self.print("Usage: /monitor or /monitor *", "output.dim") async def _cmd_clear(self, args: str): - """Clear output.""" self.output.clear() async def _cmd_quit(self, args: str): - """Quit console.""" - self.print_system("Shutting down...") self.running = False - if self.app: - self.app.exit() - - # ------------------------------------------------------------------ - # Message Handling - # ------------------------------------------------------------------ + if self.app: self.app.exit() async def _handle_message(self, line: str): - """Handle @listener message.""" parts = line[1:].split(None, 1) - if not parts: - self.print("Usage: @listener message", "output.dim") - return - - target = parts[0].lower() - message = parts[1] if len(parts) > 1 else "" - + if not parts: return + target, message = parts[0].lower(), (parts[1] if len(parts) > 1 else "") if target not in self.pump.listeners: self.print_error(f"Unknown listener: {target}") - self.print("Use /listeners to see available listeners.", "output.dim") return - self.print(f"Sending to {target}...", "output.dim") - - # Create payload listener = self.pump.listeners[target] payload = self._create_payload(listener, message) if payload is None: self.print_error(f"Cannot create payload for {target}") return - # Create thread and inject import uuid thread_id = str(uuid.uuid4()) - - envelope = self.pump._wrap_in_envelope( - payload=payload, - from_id="console", - to_id=target, - thread_id=thread_id, - ) - - await self.pump.inject(envelope, thread_id=thread_id, from_id="console") + envelope = self.pump._wrap_in_envelope(payload, "console", target, thread_id) + await self.pump.inject(envelope, thread_id, "console") def _create_payload(self, listener, message: str): - """Create payload instance for listener.""" payload_class = listener.payload_class - if hasattr(payload_class, '__dataclass_fields__'): - fields = payload_class.__dataclass_fields__ - field_names = list(fields.keys()) - - if len(field_names) == 1: - return payload_class(**{field_names[0]: message}) - elif 'name' in field_names: - return payload_class(name=message) - elif 'message' in field_names: - return payload_class(message=message) - elif 'text' in field_names: - return payload_class(text=message) - - try: - return payload_class() - except Exception: - return None - - # ------------------------------------------------------------------ - # External Output Hook - # ------------------------------------------------------------------ + fields = list(payload_class.__dataclass_fields__.keys()) + if len(fields) == 1: return payload_class(**{fields[0]: message}) + if 'message' in fields: return payload_class(message=message) + if 'text' in fields: return payload_class(text=message) + return None def on_response(self, from_id: str, payload): - """Called when a response arrives (hook for response-handler).""" - payload_type = type(payload).__name__ - - # Determine style based on source - if from_id == "shouter": - style = "output.shouter" - elif from_id == "greeter": - style = "output.greeter" - elif from_id == "response-handler": - style = "output.response" - else: - style = "output" - - # Format the response - if hasattr(payload, 'message'): - text = f"[{from_id}] {payload.message}" - else: - text = f"[{from_id}] {payload}" - + style = "output.response" if from_id == "response-handler" else "output" + text = f"[{from_id}] {getattr(payload, 'message', payload)}" self.print_raw(text, style) - -# ============================================================================ -# Factory -# ============================================================================ - def create_tui_console(pump: StreamPump) -> TUIConsole: - """Create a TUI console for the pump.""" - return TUIConsole(pump) + return TUIConsole(pump) \ No newline at end of file