From f87d9f80e9151ac14980736d9ba885eca4a46748 Mon Sep 17 00:00:00 2001 From: dullfig Date: Mon, 19 Jan 2026 22:37:21 -0800 Subject: [PATCH] Move console, auth, server to Nextra (v0.4.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These modules are now proprietary and live in the Nextra SaaS product. xml-pipeline remains the OSS core with: - Message pump and pipeline steps - Handler contract and responses - LLM router abstraction - Native tools - Config loading - Memory/context buffer Removed: - xml_pipeline/console/ → nextra/console/ - xml_pipeline/auth/ → nextra/auth/ - xml_pipeline/server/ → nextra/server/ - Legacy files: agentserver.py, main.py, xml_listener.py The simple console example remains in examples/console/. Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 2 +- xml_pipeline/__init__.py | 2 +- xml_pipeline/agentserver.py | 295 ------- xml_pipeline/auth/__init__.py | 19 - xml_pipeline/auth/sessions.py | 197 ----- xml_pipeline/auth/totp.py | 0 xml_pipeline/auth/users.py | 227 ------ xml_pipeline/config/features.py | 15 +- xml_pipeline/console/__init__.py | 12 - xml_pipeline/console/client.py | 367 --------- xml_pipeline/console/console_registry.py | 19 - xml_pipeline/console/editor.py | 752 ----------------- xml_pipeline/console/lsp/__init__.py | 63 -- xml_pipeline/console/lsp/asls_client.py | 527 ------------ xml_pipeline/console/lsp/bridge.py | 314 -------- xml_pipeline/console/lsp/client.py | 538 ------------- xml_pipeline/console/lsp/manager.py | 211 ----- xml_pipeline/console/secure_console.py | 980 ----------------------- xml_pipeline/console/tui_console.py | 469 ----------- xml_pipeline/main.py | 0 xml_pipeline/server/__init__.py | 11 - xml_pipeline/server/app.py | 266 ------ xml_pipeline/xml_listener.py | 83 -- 23 files changed, 5 insertions(+), 5364 deletions(-) delete mode 100644 xml_pipeline/agentserver.py delete mode 100644 xml_pipeline/auth/__init__.py delete mode 100644 xml_pipeline/auth/sessions.py delete mode 100644 xml_pipeline/auth/totp.py delete mode 100644 xml_pipeline/auth/users.py delete mode 100644 xml_pipeline/console/__init__.py delete mode 100644 xml_pipeline/console/client.py delete mode 100644 xml_pipeline/console/console_registry.py delete mode 100644 xml_pipeline/console/editor.py delete mode 100644 xml_pipeline/console/lsp/__init__.py delete mode 100644 xml_pipeline/console/lsp/asls_client.py delete mode 100644 xml_pipeline/console/lsp/bridge.py delete mode 100644 xml_pipeline/console/lsp/client.py delete mode 100644 xml_pipeline/console/lsp/manager.py delete mode 100644 xml_pipeline/console/secure_console.py delete mode 100644 xml_pipeline/console/tui_console.py delete mode 100644 xml_pipeline/main.py delete mode 100644 xml_pipeline/server/__init__.py delete mode 100644 xml_pipeline/server/app.py delete mode 100644 xml_pipeline/xml_listener.py diff --git a/pyproject.toml b/pyproject.toml index 48ee1bc..7fc5ff4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "xml-pipeline" -version = "0.3.1" +version = "0.4.0" description = "Schema-driven XML message bus for multi-agent systems" readme = "README.md" requires-python = ">=3.11" diff --git a/xml_pipeline/__init__.py b/xml_pipeline/__init__.py index 286c785..f00b2e4 100644 --- a/xml_pipeline/__init__.py +++ b/xml_pipeline/__init__.py @@ -2,4 +2,4 @@ xml-pipeline: Tamper-proof nervous system for multi-agent organisms. """ -__version__ = "0.3.1" +__version__ = "0.4.0" diff --git a/xml_pipeline/agentserver.py b/xml_pipeline/agentserver.py deleted file mode 100644 index 2de3779..0000000 --- a/xml_pipeline/agentserver.py +++ /dev/null @@ -1,295 +0,0 @@ -# agent_server.py -""" -AgentServer — The Living Organism Host -December 25, 2025 - -Preliminary but runnable implementation. - -This is the body: one process, one secure WebSocket endpoint, -hosting many concurrent AgentService organs sharing a single -tamper-proof MessageBus from xml-pipeline. - -Features in this version: -- Mandatory WSS (TLS) -- First-message TOTP authentication -- Per-user capability control via config/users.yaml -- Personalized on connect -- Ed25519 identity generation helper -- Boot-time agent registration -- Hooks for future signed privileged commands - -XML wins. -""" - -import asyncio -import os -import ssl -import time -from typing import Optional, Dict, Any - -import pyotp -import yaml -from cryptography.hazmat.primitives.asymmetric import ed25519 -from cryptography.hazmat.primitives import serialization -from websockets.server import serve, WebSocketServerProtocol - -from xml_pipeline import MessageBus -from xml_pipeline.service import AgentService -from xml_pipeline.message import repair_and_canonicalize, XmlTamperError - - -class AgentServer: - """ - The body of the organism. - One instance = one living, multi-personality swarm. - """ - - # Default identity location — can be overridden if needed - IDENTITY_DIR = os.path.expanduser("~/.agent_server") - PRIVATE_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519") - PUBLIC_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519.pub") - - def __init__( - self, - host: str = "0.0.0.0", - port: int = 8765, - ssl_context: Optional[ssl.SSLContext] = None, - users_config_path: str = "config/users.yaml", - identity_pubkey_path: Optional[str] = None, - ): - self.host = host - self.port = port - self.ssl_context = ssl_context # None = ws:// (dev only), set for wss:// - self.bus = MessageBus() - - # Load per-user TOTP secrets + allowed root tags - self.users_config: Dict[str, Dict[str, Any]] = self._load_users_config(users_config_path) - - # Load organism public key for future privileged command verification - self.pubkey: Optional[bytes] = None - pubkey_path = identity_pubkey_path or self.PUBLIC_KEY_PATH - if os.path.exists(pubkey_path): - self.pubkey = self._load_pubkey(pubkey_path) - - # Built-in platform listeners will be added here in future versions - - @staticmethod - def _load_users_config(path: str) -> Dict[str, Dict[str, Any]]: - """Load users.yaml → {user_id: {totp_secret: ..., allowed_root_tags: [...]}}""" - if not os.path.exists(path): - raise FileNotFoundError(f"Users config not found: {path}") - with open(path) as f: - data = yaml.safe_load(f) or {} - return data.get("users", {}) - - @staticmethod - def _load_pubkey(path: str) -> bytes: - """Load raw Ed25519 public key bytes""" - with open(path, "rb") as f: - content = f.read().strip() - # Accept either raw bytes or ssh-ed25519 format - if content.startswith(b"ssh-ed25519 "): - import base64 - return base64.b64decode(content.split()[1]) - return content - - def register_agent( - self, - agent_class: type[AgentService], - *, - system_prompt: str, - max_concurrent: int = 10, - session_timeout: float = 1800.0, - version: str = "1.0", - public: bool = True, - ) -> None: - """Register a permanent agent at boot time.""" - # Wrapper to store public flag for catalog building - self.bus.register_agent( - agent_class=agent_class, - system_prompt=system_prompt, - max_concurrent=max_concurrent, - session_timeout=session_timeout, - version=version, - metadata={"public": public}, - ) - - async def _handle_client(self, websocket: WebSocketServerProtocol): - """Per-connection handler: authenticate → send catalog → pump messages""" - context = { - "authenticated": False, - "user": None, - "allowed_tags": set(), - "bad_message_count": 0, - "last_bad_time": 0.0, - } - - try: - # 1. Authentication — first message must be - first_raw = await asyncio.wait_for(websocket.recv(), timeout=15.0) - auth_msg = repair_and_canonicalize(first_raw) - - if auth_msg.getroot().tag != "authenticate": - await websocket.close(code=1008, reason="First message must be ") - return - - totp_code = auth_msg.getroot().get("totp") - if not totp_code: - await websocket.close(code=1008, reason="Missing TOTP code") - return - - user_id = self._authenticate_totp(totp_code) - if not user_id: - await websocket.close(code=1008, reason="Invalid TOTP") - return - - user_cfg = self.users_config[user_id] - allowed_tags = set(user_cfg.get("allowed_root_tags", [])) - if "*" in allowed_tags: - # Wildcard = all current + future tags - allowed_tags = None # Special sentinel - - context.update({ - "authenticated": True, - "user": user_id, - "allowed_tags": allowed_tags, - }) - - # 2. Send personalized catalog - catalog_xml = self.bus.build_catalog_for_user(allowed_tags) - await websocket.send(catalog_xml) - - # 3. Message pump - async def inbound(): - async for raw in websocket: - try: - yield repair_and_canonicalize(raw) - except XmlTamperError: - await websocket.close(code=1008, reason="Invalid/tampered XML") - raise - - async def outbound(message: bytes): - await websocket.send(message) - - await self.bus.run( - inbound=inbound(), - outbound=outbound, - context=context, # For ACL checks in listeners - ) - - except asyncio.TimeoutError: - await websocket.close(code=1008, reason="Authentication timeout") - except Exception as e: - print(f"Client error ({websocket.remote_address}): {e}") - - def _authenticate_totp(self, code: str) -> Optional[str]: - """Validate TOTP and return user identifier if successful""" - for user_id, cfg in self.users_config.items(): - totp = pyotp.TOTP(cfg["totp_secret"]) - if totp.verify(code, valid_window=1): # 30s tolerance - return user_id - return None - - async def start(self): - """Start the organism — runs forever""" - scheme = "wss" if self.ssl_context else "ws" - print(f"AgentServer starting on {scheme}://{self.host}:{self.port}") - print("Organism awakening...") - - async with serve(self._handle_client, self.host, self.port, ssl=self.ssl_context): - await asyncio.Future() # Run forever - - @classmethod - def generate_identity(cls, force: bool = False) -> None: - """ - Generate the organism's permanent Ed25519 identity. - Run once on first deployment. - """ - os.makedirs(cls.IDENTITY_DIR, exist_ok=True) - - if os.path.exists(cls.PRIVATE_KEY_PATH) and not force: - print("Identity already exists:") - print(f" Private key: {cls.PRIVATE_KEY_PATH}") - print(f" Public key : {cls.PUBLIC_KEY_PATH}") - print("Use --force to regenerate (will overwrite!).") - return - - print("Generating organism Ed25519 identity...") - - private_key = ed25519.Ed25519PrivateKey.generate() - public_key = private_key.public_key() - - # Private key — PEM PKCS8 unencrypted (rely on file permissions) - private_pem = private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ) - - # Public key — raw bytes + ssh-ed25519 format for readability - public_raw = public_key.public_bytes( - encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw, - ) - public_ssh = f"ssh-ed25519 {public_raw.hex()} organism@{os.uname().nodename}" - - # Write with secure permissions - with open(cls.PRIVATE_KEY_PATH, "wb") as f: - os.fchmod(f.fileno(), 0o600) - f.write(private_pem) - - with open(cls.PUBLIC_KEY_PATH, "w") as f: - f.write(public_ssh + "\n") - - print("Organism identity created!") - print(f"Private key (KEEP SAFE): {cls.PRIVATE_KEY_PATH}") - print(f"Public key : {cls.PUBLIC_KEY_PATH}") - print("\nBackup the private key offline. Lose it → lose structural control forever.") - - -# ———————————————————————— -# Example CLI entrypoint -# ———————————————————————— -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description="AgentServer — the living organism") - subparsers = parser.add_subparsers(dest="command", required=True) - - # Run the server - run_p = subparsers.add_parser("run", help="Start the organism") - run_p.add_argument("--host", default="0.0.0.0") - run_p.add_argument("--port", type=int, default=8765) - run_p.add_argument("--cert", help="Path to TLS fullchain.pem") - run_p.add_argument("--key", help="Path to TLS privkey.pem") - run_p.add_argument("--users-config", default="config/users.yaml") - - # Generate identity - gen_p = subparsers.add_parser("generate-identity", help="Create cryptographic identity") - gen_p.add_argument("--force", action="store_true") - - args = parser.parse_args() - - if args.command == "generate-identity": - AgentServer.generate_identity(force=args.force) - - else: # run - ssl_ctx = None - if args.cert and args.key: - ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - ssl_ctx.load_cert_chain(args.cert, args.key) - - server = AgentServer( - host=args.host, - port=args.port, - ssl_context=ssl_ctx, - users_config_path=args.users_config, - ) - - # Example boot-time listeners (uncomment and customise) - # from your_agents import CodingAgent, ResearchAgent, GrokAgent - # server.register_agent(CodingAgent, system_prompt="You are an elite Python engineer...", max_concurrent=20) - # server.register_agent(ResearchAgent, system_prompt="You are a thorough researcher...", max_concurrent=10) - # server.register_agent(GrokAgent, system_prompt="You are Grok, built by xAI...", max_concurrent=15) - - asyncio.run(server.start()) diff --git a/xml_pipeline/auth/__init__.py b/xml_pipeline/auth/__init__.py deleted file mode 100644 index 90e34f2..0000000 --- a/xml_pipeline/auth/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -Authentication and authorization for xml-pipeline. - -Provides: -- UserStore: User management with Argon2id password hashing -- SessionManager: Token-based session management -""" - -from .users import User, UserStore, get_user_store -from .sessions import Session, SessionManager, get_session_manager - -__all__ = [ - "User", - "UserStore", - "get_user_store", - "Session", - "SessionManager", - "get_session_manager", -] diff --git a/xml_pipeline/auth/sessions.py b/xml_pipeline/auth/sessions.py deleted file mode 100644 index 994abd8..0000000 --- a/xml_pipeline/auth/sessions.py +++ /dev/null @@ -1,197 +0,0 @@ -""" -Session management with token-based authentication. - -Tokens are random hex strings stored in memory with expiry. -""" - -from __future__ import annotations - -import secrets -import threading -from dataclasses import dataclass -from datetime import datetime, timedelta, timezone -from typing import Optional - - -# Default session lifetime -DEFAULT_SESSION_LIFETIME = timedelta(hours=8) - -# Token length in bytes (32 bytes = 64 hex chars) -TOKEN_BYTES = 32 - - -@dataclass -class Session: - """An authenticated session.""" - token: str - username: str - role: str - created_at: datetime - expires_at: datetime - last_activity: datetime - - def is_expired(self) -> bool: - """Check if session has expired.""" - return datetime.now(timezone.utc) > self.expires_at - - def touch(self) -> None: - """Update last activity time.""" - self.last_activity = datetime.now(timezone.utc) - - def to_dict(self) -> dict: - """Convert to dict for API responses.""" - return { - "token": self.token, - "username": self.username, - "role": self.role, - "expires_at": self.expires_at.isoformat(), - } - - -class SessionManager: - """ - Manages authenticated sessions. - - Thread-safe for concurrent access. - - Usage: - manager = SessionManager() - - # Create session after successful login - session = manager.create("admin", "admin") - - # Validate token on subsequent requests - session = manager.validate(token) - if session: - print(f"Welcome back {session.username}") - - # Logout - manager.revoke(token) - """ - - def __init__(self, lifetime: timedelta = DEFAULT_SESSION_LIFETIME): - self.lifetime = lifetime - self._sessions: dict[str, Session] = {} - self._lock = threading.Lock() - - def create( - self, - username: str, - role: str, - lifetime: Optional[timedelta] = None, - ) -> Session: - """ - Create a new session. - - Args: - username: Authenticated username - role: User's role - lifetime: Optional custom lifetime - - Returns: - New Session with token - """ - token = secrets.token_hex(TOKEN_BYTES) - now = datetime.now(timezone.utc) - expires = now + (lifetime or self.lifetime) - - session = Session( - token=token, - username=username, - role=role, - created_at=now, - expires_at=expires, - last_activity=now, - ) - - with self._lock: - self._sessions[token] = session - self._cleanup_expired() - - return session - - def validate(self, token: str) -> Optional[Session]: - """ - Validate a session token. - - Args: - token: Session token from client - - Returns: - Session if valid, None if invalid/expired - """ - with self._lock: - session = self._sessions.get(token) - if not session: - return None - - if session.is_expired(): - del self._sessions[token] - return None - - session.touch() - return session - - def revoke(self, token: str) -> bool: - """ - Revoke a session (logout). - - Returns: - True if session was revoked, False if not found - """ - with self._lock: - if token in self._sessions: - del self._sessions[token] - return True - return False - - def revoke_user(self, username: str) -> int: - """ - Revoke all sessions for a user. - - Returns: - Number of sessions revoked - """ - with self._lock: - to_revoke = [ - token for token, session in self._sessions.items() - if session.username == username - ] - for token in to_revoke: - del self._sessions[token] - return len(to_revoke) - - def get_user_sessions(self, username: str) -> list[Session]: - """Get all active sessions for a user.""" - with self._lock: - return [ - s for s in self._sessions.values() - if s.username == username and not s.is_expired() - ] - - def _cleanup_expired(self) -> None: - """Remove expired sessions. Must hold lock.""" - expired = [ - token for token, session in self._sessions.items() - if session.is_expired() - ] - for token in expired: - del self._sessions[token] - - def active_count(self) -> int: - """Count active sessions.""" - with self._lock: - self._cleanup_expired() - return len(self._sessions) - - -# Global instance -_manager: Optional[SessionManager] = None - - -def get_session_manager() -> SessionManager: - """Get the global session manager.""" - global _manager - if _manager is None: - _manager = SessionManager() - return _manager diff --git a/xml_pipeline/auth/totp.py b/xml_pipeline/auth/totp.py deleted file mode 100644 index e69de29..0000000 diff --git a/xml_pipeline/auth/users.py b/xml_pipeline/auth/users.py deleted file mode 100644 index bdbd9a0..0000000 --- a/xml_pipeline/auth/users.py +++ /dev/null @@ -1,227 +0,0 @@ -""" -User store with Argon2id password hashing. - -Users are stored in ~/.xml-pipeline/users.yaml with hashed passwords. -""" - -from __future__ import annotations - -import os -import stat -import sys -from dataclasses import dataclass, field -from datetime import datetime, timezone -from pathlib import Path -from typing import Optional - -import yaml -from argon2 import PasswordHasher -from argon2.exceptions import VerifyMismatchError - - -CONFIG_DIR = Path.home() / ".xml-pipeline" -USERS_FILE = CONFIG_DIR / "users.yaml" - - -@dataclass -class User: - """A user account.""" - username: str - password_hash: str - role: str = "operator" # admin, operator, viewer - created_at: str = "" - last_login: Optional[str] = None - - def to_dict(self) -> dict: - return { - "username": self.username, - "password_hash": self.password_hash, - "role": self.role, - "created_at": self.created_at, - "last_login": self.last_login, - } - - @classmethod - def from_dict(cls, data: dict) -> User: - return cls( - username=data["username"], - password_hash=data["password_hash"], - role=data.get("role", "operator"), - created_at=data.get("created_at", ""), - last_login=data.get("last_login"), - ) - - -class UserStore: - """ - Manages user accounts with secure password storage. - - Usage: - store = UserStore() - store.create_user("admin", "secretpass", role="admin") - - user = store.authenticate("admin", "secretpass") - if user: - print(f"Welcome {user.username}!") - """ - - def __init__(self, users_file: Path = USERS_FILE): - self.users_file = users_file - self.hasher = PasswordHasher() - self._users: dict[str, User] = {} - self._load() - - def _ensure_dir(self) -> None: - """Create config directory if needed.""" - self.users_file.parent.mkdir(parents=True, exist_ok=True) - - def _load(self) -> None: - """Load users from file.""" - if not self.users_file.exists(): - return - try: - with open(self.users_file) as f: - data = yaml.safe_load(f) or {} - for username, user_data in data.get("users", {}).items(): - user_data["username"] = username - self._users[username] = User.from_dict(user_data) - except Exception: - pass - - def _save(self) -> None: - """Save users to file.""" - self._ensure_dir() - - data = { - "users": { - username: { - "password_hash": user.password_hash, - "role": user.role, - "created_at": user.created_at, - "last_login": user.last_login, - } - for username, user in self._users.items() - } - } - - with open(self.users_file, "w") as f: - yaml.dump(data, f, default_flow_style=False) - - # Set file permissions to 600 - if sys.platform != "win32": - os.chmod(self.users_file, stat.S_IRUSR | stat.S_IWUSR) - - def has_users(self) -> bool: - """Check if any users exist.""" - return len(self._users) > 0 - - def get_user(self, username: str) -> Optional[User]: - """Get user by username.""" - return self._users.get(username) - - def list_users(self) -> list[str]: - """List all usernames.""" - return list(self._users.keys()) - - def create_user( - self, - username: str, - password: str, - role: str = "operator", - ) -> User: - """ - Create a new user. - - Args: - username: Unique username - password: Plain text password (will be hashed) - role: User role (admin, operator, viewer) - - Returns: - The created User - - Raises: - ValueError: If username already exists - """ - if username in self._users: - raise ValueError(f"User already exists: {username}") - - if len(password) < 4: - raise ValueError("Password must be at least 4 characters") - - user = User( - username=username, - password_hash=self.hasher.hash(password), - role=role, - created_at=datetime.now(timezone.utc).isoformat(), - ) - - self._users[username] = user - self._save() - return user - - def authenticate(self, username: str, password: str) -> Optional[User]: - """ - Authenticate user with password. - - Returns: - User if authentication successful, None otherwise - """ - user = self._users.get(username) - if not user: - return None - - try: - self.hasher.verify(user.password_hash, password) - - # Update last login - user.last_login = datetime.now(timezone.utc).isoformat() - self._save() - - return user - except VerifyMismatchError: - return None - - def change_password(self, username: str, new_password: str) -> bool: - """Change user's password.""" - user = self._users.get(username) - if not user: - return False - - if len(new_password) < 4: - raise ValueError("Password must be at least 4 characters") - - user.password_hash = self.hasher.hash(new_password) - self._save() - return True - - def delete_user(self, username: str) -> bool: - """Delete a user.""" - if username not in self._users: - return False - - del self._users[username] - self._save() - return True - - def set_role(self, username: str, role: str) -> bool: - """Change user's role.""" - user = self._users.get(username) - if not user: - return False - - user.role = role - self._save() - return True - - -# Global instance -_store: Optional[UserStore] = None - - -def get_user_store() -> UserStore: - """Get the global user store.""" - global _store - if _store is None: - _store = UserStore() - return _store diff --git a/xml_pipeline/config/features.py b/xml_pipeline/config/features.py index 55211a7..0ed6cd6 100644 --- a/xml_pipeline/config/features.py +++ b/xml_pipeline/config/features.py @@ -16,14 +16,13 @@ def _check_import(module: str) -> bool: # Feature registry: feature_name -> (check_function, description) +# Note: auth, server, lsp moved to Nextra (proprietary) FEATURES: dict[str, tuple[Callable[[], bool], str]] = { "anthropic": (lambda: _check_import("anthropic"), "Anthropic Claude SDK"), "openai": (lambda: _check_import("openai"), "OpenAI SDK"), "redis": (lambda: _check_import("redis"), "Redis for distributed keyvalue"), "search": (lambda: _check_import("duckduckgo_search"), "DuckDuckGo search"), - "auth": (lambda: _check_import("pyotp") and _check_import("argon2"), "TOTP auth"), - "server": (lambda: _check_import("websockets"), "WebSocket server"), - "lsp": (lambda: _check_import("lsp_client"), "LSP client for config editor"), + "console": (lambda: _check_import("prompt_toolkit"), "Interactive console example"), } @@ -80,15 +79,7 @@ def check_features(config) -> FeatureCheck: # This would need more sophisticated detection based on tool config pass - # Check if auth is needed (multi-tenant mode) - if getattr(config, "auth", None): - if not result.available.get("auth"): - result.missing["auth"] = "Config has auth enabled" - - # Check if websocket server is needed - if getattr(config, "server", None): - if not result.available.get("server"): - result.missing["server"] = "Config has server enabled" + # Note: auth/server config sections are read but implemented in Nextra return result diff --git a/xml_pipeline/console/__init__.py b/xml_pipeline/console/__init__.py deleted file mode 100644 index 2f275ad..0000000 --- a/xml_pipeline/console/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -""" -console — Console interfaces for xml-pipeline. - -Provides: -- SecureConsole: Local keyboard-only console (no network) -- ConsoleClient: Network client connecting to server with auth -""" - -from xml_pipeline.console.secure_console import SecureConsole, PasswordManager -from xml_pipeline.console.client import ConsoleClient - -__all__ = ["SecureConsole", "PasswordManager", "ConsoleClient"] diff --git a/xml_pipeline/console/client.py b/xml_pipeline/console/client.py deleted file mode 100644 index 65cbfc0..0000000 --- a/xml_pipeline/console/client.py +++ /dev/null @@ -1,367 +0,0 @@ -""" -Console client that connects to the agent server. - -Provides SSH-style login with username/password authentication. -""" - -from __future__ import annotations - -import asyncio -import getpass -import json -import sys -from pathlib import Path -from typing import Optional - -try: - import aiohttp - AIOHTTP_AVAILABLE = True -except ImportError: - AIOHTTP_AVAILABLE = False - -try: - from prompt_toolkit import PromptSession - from prompt_toolkit.history import InMemoryHistory - from prompt_toolkit.styles import Style - PROMPT_TOOLKIT_AVAILABLE = True -except ImportError: - PROMPT_TOOLKIT_AVAILABLE = False - -from ..config import get_agent_config_store, CONFIG_DIR - - -DEFAULT_HOST = "127.0.0.1" -DEFAULT_PORT = 8765 -MAX_LOGIN_ATTEMPTS = 3 - -# Default organism config path -DEFAULT_ORGANISM_CONFIG = Path("config/organism.yaml") - - -class ConsoleClient: - """ - Text-based console client for the agent server. - - Usage: - client = ConsoleClient() - asyncio.run(client.run()) - """ - - def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT): - self.host = host - self.port = port - self.base_url = f"http://{host}:{port}" - self.ws_url = f"ws://{host}:{port}/ws" - self.token: Optional[str] = None - self.username: Optional[str] = None - self.session: Optional[aiohttp.ClientSession] = None - self.ws: Optional[aiohttp.ClientWebSocketResponse] = None - self.running = False - - async def login(self) -> bool: - """ - Perform SSH-style login. - - Returns: - True if login successful, False otherwise - """ - print(f"Connecting to {self.host}:{self.port}...") - - for attempt in range(1, MAX_LOGIN_ATTEMPTS + 1): - try: - username = input("Username: ") - password = getpass.getpass("Password: ") - except (EOFError, KeyboardInterrupt): - print("\nLogin cancelled.") - return False - - if not username or not password: - print("Username and password required.") - continue - - try: - async with aiohttp.ClientSession() as session: - async with session.post( - f"{self.base_url}/auth/login", - json={"username": username, "password": password}, - ) as resp: - data = await resp.json() - - if resp.status == 200: - self.token = data["token"] - self.username = username - print(f"Welcome, {username}!") - return True - else: - error = data.get("error", "Authentication failed") - remaining = MAX_LOGIN_ATTEMPTS - attempt - if remaining > 0: - print(f"{error}. {remaining} attempt(s) remaining.") - else: - print(f"{error}. No attempts remaining.") - except aiohttp.ClientError as e: - print(f"Connection error: {e}") - return False - - return False - - async def connect_ws(self) -> bool: - """Connect to WebSocket after authentication.""" - if not self.token: - return False - - try: - self.session = aiohttp.ClientSession( - headers={"Authorization": f"Bearer {self.token}"} - ) - self.ws = await self.session.ws_connect(self.ws_url) - - # Wait for connected message - msg = await self.ws.receive_json() - if msg.get("type") == "connected": - return True - return False - except Exception as e: - print(f"WebSocket connection failed: {e}") - return False - - async def send_command(self, cmd: str) -> Optional[dict]: - """Send a command via WebSocket and get response.""" - if not self.ws: - return None - - await self.ws.send_json(cmd) - return await self.ws.receive_json() - - def print_help(self): - """Print available commands.""" - print(""" -Available commands: - /help - Show this help - /status - Show server status - /listeners - List available targets - /targets - Alias for /listeners - /configure - Edit organism.yaml (swarm wiring) - /configure @agent - Edit agent config (prompt, model, etc.) - /quit - Disconnect and exit - -Send messages: - @target message - Send message to a target listener - Example: @greeter Hello there! -""") - - async def handle_command(self, line: str) -> bool: - """ - Handle a command line. - - Returns: - False if should quit, True otherwise - """ - line = line.strip() - if not line: - return True - - if line == "/help": - self.print_help() - elif line == "/quit" or line == "/exit": - return False - elif line == "/status": - resp = await self.send_command({"type": "status"}) - if resp: - threads = resp.get("threads", 0) - print(f"Active threads: {threads}") - elif line == "/listeners" or line == "/targets": - resp = await self.send_command({"type": "listeners"}) - if resp: - listeners = resp.get("listeners", []) - if listeners: - print("Available targets:") - for name in listeners: - print(f" - {name}") - else: - print("No targets available (pipeline not running)") - elif line == "/configure": - # Edit organism.yaml - await self._configure_organism() - elif line.startswith("/configure @"): - # Edit agent config: /configure @agent - agent_name = line[12:].strip() - if agent_name: - await self._configure_agent(agent_name) - else: - print("Usage: /configure @agent_name") - elif line.startswith("/configure "): - # Also support /configure agent without @ - agent_name = line[11:].strip() - if agent_name: - await self._configure_agent(agent_name) - else: - print("Usage: /configure @agent_name") - elif line.startswith("/"): - print(f"Unknown command: {line}") - elif line.startswith("@"): - # Send message to target: @target message - resp = await self.send_command({"type": "send", "raw": line}) - if resp: - if resp.get("type") == "sent": - thread_id = resp.get("thread_id", "")[:8] - target = resp.get("target", "unknown") - print(f"Sent to {target} (thread: {thread_id}...)") - elif resp.get("type") == "error": - print(f"Error: {resp.get('error')}") - else: - print("Use @target message to send. Example: @greeter Hello!") - print("Type /listeners to see available targets.") - - return True - - async def _configure_organism(self): - """Open organism.yaml in editor.""" - from .editor import edit_text, PROMPT_TOOLKIT_AVAILABLE as EDITOR_AVAILABLE - - # Find organism.yaml - organism_path = DEFAULT_ORGANISM_CONFIG - if not organism_path.exists(): - # Try absolute path in config dir - organism_path = CONFIG_DIR / "organism.yaml" - - if not organism_path.exists(): - print(f"organism.yaml not found at {organism_path}") - print("Create one or specify path in configuration.") - return - - # Load content - content = organism_path.read_text() - - if EDITOR_AVAILABLE: - # Use built-in editor - edited, saved = await asyncio.get_event_loop().run_in_executor( - None, - lambda: edit_text(content, title=f"organism.yaml") - ) - - if saved and edited is not None: - organism_path.write_text(edited) - print(f"Saved {organism_path}") - print("Note: Restart server to apply changes.") - else: - print("Cancelled.") - else: - print(f"Edit manually: {organism_path}") - - async def _configure_agent(self, agent_name: str): - """Open agent config in editor.""" - from .editor import edit_text, PROMPT_TOOLKIT_AVAILABLE as EDITOR_AVAILABLE - - store = get_agent_config_store() - - # Load or create config content - content = store.load_yaml(agent_name) - config_path = store.path_for(agent_name) - - if EDITOR_AVAILABLE: - # Use built-in editor - edited, saved = await asyncio.get_event_loop().run_in_executor( - None, - lambda: edit_text(content, title=f"Agent: {agent_name}") - ) - - if saved and edited is not None: - try: - store.save_yaml(agent_name, edited) - print(f"Saved {config_path}") - except Exception as e: - print(f"Error saving: {e}") - else: - print("Cancelled.") - else: - # Fallback: show path - if not config_path.exists(): - # Create default - store.save_yaml(agent_name, content) - print(f"Edit manually: {config_path}") - - async def run(self): - """Main client loop.""" - if not AIOHTTP_AVAILABLE: - print("Error: aiohttp not installed") - sys.exit(1) - - # Login - if not await self.login(): - print("Authentication failed.") - sys.exit(1) - - # Connect WebSocket - if not await self.connect_ws(): - print("Failed to connect to server.") - sys.exit(1) - - print("Connected. Type /help for commands, /quit to exit.") - - self.running = True - - try: - if PROMPT_TOOLKIT_AVAILABLE: - await self._run_prompt_toolkit() - else: - await self._run_simple() - finally: - await self.cleanup() - - async def _run_prompt_toolkit(self): - """Run with prompt_toolkit for better UX.""" - style = Style.from_dict({ - "prompt": "ansicyan bold", - }) - - session = PromptSession( - history=InMemoryHistory(), - style=style, - ) - - while self.running: - try: - line = await asyncio.get_event_loop().run_in_executor( - None, - lambda: session.prompt(f"{self.username}> ") - ) - if not await self.handle_command(line): - break - except (EOFError, KeyboardInterrupt): - break - - async def _run_simple(self): - """Run with simple input (fallback).""" - while self.running: - try: - line = input(f"{self.username}> ") - if not await self.handle_command(line): - break - except (EOFError, KeyboardInterrupt): - break - - async def cleanup(self): - """Clean up connections.""" - if self.ws: - await self.ws.close() - if self.session: - await self.session.close() - print("Disconnected.") - - -def main(): - """Entry point.""" - import argparse - - parser = argparse.ArgumentParser(description="XML Pipeline Console") - parser.add_argument("--host", default=DEFAULT_HOST, help="Server host") - parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Server port") - args = parser.parse_args() - - client = ConsoleClient(host=args.host, port=args.port) - asyncio.run(client.run()) - - -if __name__ == "__main__": - main() diff --git a/xml_pipeline/console/console_registry.py b/xml_pipeline/console/console_registry.py deleted file mode 100644 index 6136f05..0000000 --- a/xml_pipeline/console/console_registry.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -console_registry.py — Global console reference for handlers. - -This module provides a central place to register and retrieve -the active console instance, avoiding Python module import issues. -""" - -_console = None - - -def set_console(console): - """Set the active console instance.""" - global _console - _console = console - - -def get_console(): - """Get the active console instance (or None).""" - return _console diff --git a/xml_pipeline/console/editor.py b/xml_pipeline/console/editor.py deleted file mode 100644 index f87395b..0000000 --- a/xml_pipeline/console/editor.py +++ /dev/null @@ -1,752 +0,0 @@ -""" -Full-screen text editor using prompt_toolkit. - -Provides a vim-like editing experience for configuration files. -Supports optional LSP integration for YAML files. -""" - -from __future__ import annotations - -import asyncio -import logging -from pathlib import Path -from typing import Optional, Tuple, TYPE_CHECKING - -if TYPE_CHECKING: - from xml_pipeline.console.lsp import YAMLLSPClient, ASLSClient - from typing import Union - LSPClientType = Union[YAMLLSPClient, ASLSClient] - -logger = logging.getLogger(__name__) - -try: - from prompt_toolkit import Application - from prompt_toolkit.buffer import Buffer - from prompt_toolkit.layout import Layout, HSplit, VSplit - from prompt_toolkit.layout.containers import Window, ConditionalContainer, Float, FloatContainer - from prompt_toolkit.layout.controls import BufferControl, FormattedTextControl - from prompt_toolkit.layout.menus import CompletionsMenu - from prompt_toolkit.key_binding import KeyBindings - from prompt_toolkit.filters import Condition - from prompt_toolkit.styles import Style - from prompt_toolkit.lexers import PygmentsLexer - from prompt_toolkit.completion import Completer, Completion - from prompt_toolkit.document import Document - PROMPT_TOOLKIT_AVAILABLE = True -except ImportError: - PROMPT_TOOLKIT_AVAILABLE = False - -try: - from pygments.lexers.data import YamlLexer - from pygments.lexers.javascript import TypeScriptLexer - PYGMENTS_AVAILABLE = True -except ImportError: - PYGMENTS_AVAILABLE = False - - -# Supported syntax types and their lexers -SYNTAX_LEXERS = { - "yaml": "YamlLexer", - "typescript": "TypeScriptLexer", - "assemblyscript": "TypeScriptLexer", # AS uses TS syntax - "ts": "TypeScriptLexer", - "as": "TypeScriptLexer", -} - - -def get_lexer_for_syntax(syntax: str) -> Optional[object]: - """ - Get a Pygments lexer for the given syntax type. - - Args: - syntax: Syntax name ("yaml", "typescript", "ts", "as", "assemblyscript") - - Returns: - PygmentsLexer instance or None - """ - if not PYGMENTS_AVAILABLE: - return None - - syntax_lower = syntax.lower() - - if syntax_lower in ("yaml", "yml"): - return PygmentsLexer(YamlLexer) - elif syntax_lower in ("typescript", "ts", "assemblyscript", "as"): - return PygmentsLexer(TypeScriptLexer) - else: - return None - - -def detect_syntax_from_path(path: str | Path) -> str: - """ - Detect syntax type from file extension. - - Returns: - Syntax name for use with get_lexer_for_syntax() - """ - ext = Path(path).suffix.lower() - - extension_map = { - ".yaml": "yaml", - ".yml": "yaml", - ".ts": "typescript", - ".as": "assemblyscript", - } - - return extension_map.get(ext, "text") - - -def edit_text( - initial_text: str, - title: str = "Editor", - syntax: str = "yaml", -) -> Tuple[Optional[str], bool]: - """ - Open full-screen editor for text. - - Args: - initial_text: Text to edit - title: Title shown in header - syntax: Syntax highlighting ("yaml", "typescript", "ts", "as", "text") - - Returns: - (edited_text, saved) - edited_text is None if cancelled - """ - if not PROMPT_TOOLKIT_AVAILABLE: - print("Error: prompt_toolkit not installed") - return None, False - - # State - result = {"text": None, "saved": False} - - # Create buffer with initial text - buffer = Buffer( - multiline=True, - name="editor", - ) - buffer.text = initial_text - - # Key bindings - kb = KeyBindings() - - @kb.add("c-s") # Ctrl+S to save - def save(event): - result["text"] = buffer.text - result["saved"] = True - event.app.exit() - - @kb.add("c-q") # Ctrl+Q to quit without saving - def quit_nosave(event): - result["text"] = None - result["saved"] = False - event.app.exit() - - @kb.add("escape") # Escape to quit - def escape(event): - result["text"] = None - result["saved"] = False - event.app.exit() - - # Syntax highlighting - lexer = get_lexer_for_syntax(syntax) - - # Layout - header = Window( - height=1, - content=FormattedTextControl( - lambda: [ - ("class:header", f" {title} "), - ("class:header.key", " Ctrl+S"), - ("class:header", "=Save "), - ("class:header.key", " Ctrl+Q"), - ("class:header", "=Quit "), - ] - ), - style="class:header", - ) - - editor_window = Window( - content=BufferControl( - buffer=buffer, - lexer=lexer, - ), - ) - - # Status bar showing cursor position - def get_status(): - row = buffer.document.cursor_position_row + 1 - col = buffer.document.cursor_position_col + 1 - lines = len(buffer.text.split("\n")) - return [ - ("class:status", f" Line {row}/{lines}, Col {col} "), - ] - - status_bar = Window( - height=1, - content=FormattedTextControl(get_status), - style="class:status", - ) - - layout = Layout( - HSplit([ - header, - editor_window, - status_bar, - ]) - ) - - # Styles - style = Style.from_dict({ - "header": "bg:#005f87 #ffffff", - "header.key": "bg:#005f87 #ffff00 bold", - "status": "bg:#444444 #ffffff", - }) - - # Create and run application - app = Application( - layout=layout, - key_bindings=kb, - style=style, - full_screen=True, - mouse_support=True, - ) - - app.run() - - return result["text"], result["saved"] - - -def edit_file(filepath: str, title: Optional[str] = None) -> bool: - """ - Edit a file in the full-screen editor. - - Args: - filepath: Path to file - title: Optional title (defaults to filename) - - Returns: - True if saved, False if cancelled - """ - from pathlib import Path - - path = Path(filepath) - title = title or path.name - - # Load existing content or empty - if path.exists(): - initial_text = path.read_text() - else: - initial_text = "" - - # Edit - edited_text, saved = edit_text(initial_text, title=title, syntax="yaml") - - # Save if requested - if saved and edited_text is not None: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(edited_text) - return True - - return False - - -# Fallback: use system editor via subprocess -def edit_with_system_editor(filepath: str) -> bool: - """ - Edit file using system's default editor ($EDITOR or fallback). - - Returns True if file was modified. - """ - import os - import subprocess - from pathlib import Path - - path = Path(filepath) - - # Get editor from environment - editor = os.environ.get("EDITOR", os.environ.get("VISUAL", "")) - - if not editor: - # Fallback based on platform - import platform - if platform.system() == "Windows": - editor = "notepad" - else: - editor = "nano" # Most likely available - - # Get modification time before edit - mtime_before = path.stat().st_mtime if path.exists() else None - - # Open editor - try: - subprocess.run([editor, str(path)], check=True) - except subprocess.CalledProcessError: - return False - except FileNotFoundError: - print(f"Editor not found: {editor}") - return False - - # Check if modified - if path.exists(): - mtime_after = path.stat().st_mtime - return mtime_before is None or mtime_after > mtime_before - - return False - - -# ============================================================================= -# LSP-Enhanced Editor -# ============================================================================= - - -class LSPEditor: - """ - Full-screen editor with optional LSP integration. - - Provides: - - Syntax highlighting (YAML, TypeScript/AssemblyScript via Pygments) - - Autocompletion (LSP when available) - - Inline diagnostics (LSP when available) - - Hover documentation on F1 (LSP when available) - - Usage: - # YAML config editing - editor = LSPEditor(schema_type="listener") - edited_text, saved = await editor.edit(initial_text, title="greeter.yaml") - - # AssemblyScript listener editing - editor = LSPEditor(syntax="assemblyscript") - edited_text, saved = await editor.edit(source_code, title="handler.ts") - """ - - def __init__( - self, - schema_type: Optional[str] = None, - schema_uri: Optional[str] = None, - syntax: str = "yaml", - ): - """ - Initialize the LSP editor. - - Args: - schema_type: Schema type ("organism" or "listener") for YAML modeline - schema_uri: Explicit schema URI to use - syntax: Syntax highlighting ("yaml", "typescript", "assemblyscript", "ts", "as") - """ - self.schema_type = schema_type - self.schema_uri = schema_uri - self.syntax = syntax - self._lsp_client: Optional["LSPClientType"] = None - self._lsp_type: Optional[str] = None # "yaml" or "assemblyscript" - self._diagnostics_text = "" - self._hover_text = "" - self._show_hover = False - self._document_version = 0 - - async def edit( - self, - initial_text: str, - title: str = "Editor", - document_uri: Optional[str] = None, - ) -> Tuple[Optional[str], bool]: - """ - Open the editor with LSP support. - - Args: - initial_text: Initial content to edit - title: Title shown in header - document_uri: URI for LSP (auto-generated if not provided) - - Returns: - (edited_text, saved) - edited_text is None if cancelled - """ - if not PROMPT_TOOLKIT_AVAILABLE: - print("Error: prompt_toolkit not installed") - return None, False - - # Determine LSP type based on syntax - syntax_lower = self.syntax.lower() - if syntax_lower in ("yaml", "yml"): - self._lsp_type = "yaml" - elif syntax_lower in ("typescript", "ts", "assemblyscript", "as"): - self._lsp_type = "assemblyscript" - else: - self._lsp_type = None - - # Try to get appropriate LSP client - try: - from xml_pipeline.console.lsp import get_lsp_manager, LSPServerType - manager = get_lsp_manager() - - if self._lsp_type == "yaml": - self._lsp_client = await manager.get_yaml_client() - elif self._lsp_type == "assemblyscript": - self._lsp_client = await manager.get_asls_client() - else: - self._lsp_client = None - - except Exception as e: - logger.debug(f"LSP not available: {e}") - self._lsp_client = None - - # Generate document URI with appropriate extension - if document_uri is None: - ext = ".yaml" if self._lsp_type == "yaml" else ".ts" - document_uri = f"file:///temp/{title.replace(' ', '_')}{ext}" - - # Ensure schema modeline is present (YAML only) - if self._lsp_type == "yaml": - initial_text = self._ensure_modeline(initial_text) - - # Open document in LSP - if self._lsp_client: - await self._lsp_client.did_open(document_uri, initial_text) - - try: - result = await self._run_editor(initial_text, title, document_uri) - finally: - # Close document in LSP - if self._lsp_client: - await self._lsp_client.did_close(document_uri) - try: - from xml_pipeline.console.lsp import get_lsp_manager, LSPServerType - manager = get_lsp_manager() - if self._lsp_type == "yaml": - await manager.release_client(LSPServerType.YAML) - elif self._lsp_type == "assemblyscript": - await manager.release_client(LSPServerType.ASSEMBLYSCRIPT) - except Exception: - pass - - return result - - def _ensure_modeline(self, text: str) -> str: - """Ensure YAML has schema modeline if schema_type is set.""" - if self.schema_type is None: - return text - - modeline = f"# yaml-language-server: $schema=~/.xml-pipeline/schemas/{self.schema_type}.schema.json" - - # Check if modeline already exists - lines = text.split("\n") - for line in lines[:3]: # Check first 3 lines - if "yaml-language-server" in line and "$schema" in line: - return text - - # Add modeline at the top - return modeline + "\n" + text - - async def _run_editor( - self, - initial_text: str, - title: str, - uri: str, - ) -> Tuple[Optional[str], bool]: - """Run the editor application.""" - result = {"text": None, "saved": False} - - # Create buffer - buffer = Buffer(multiline=True, name="editor") - buffer.text = initial_text - - # Key bindings - kb = KeyBindings() - - @kb.add("c-s") # Ctrl+S to save - def save(event): - result["text"] = buffer.text - result["saved"] = True - event.app.exit() - - @kb.add("c-q") # Ctrl+Q to quit without saving - def quit_nosave(event): - result["text"] = None - result["saved"] = False - event.app.exit() - - @kb.add("escape") # Escape to quit - def escape(event): - result["text"] = None - result["saved"] = False - event.app.exit() - - @kb.add("c-space") # Ctrl+Space for completion - async def trigger_completion(event): - if self._lsp_client: - doc = buffer.document - line = doc.cursor_position_row - col = doc.cursor_position_col - completions = await self._lsp_client.completion(uri, line, col) - if completions: - # Show first completion as hint - self._diagnostics_text = f"Completions: {', '.join(c.label for c in completions[:5])}" - event.app.invalidate() - - @kb.add("f1") # F1 for hover - async def show_hover(event): - if self._lsp_client: - doc = buffer.document - line = doc.cursor_position_row - col = doc.cursor_position_col - hover = await self._lsp_client.hover(uri, line, col) - if hover: - self._hover_text = hover.contents[:200] # Truncate - self._show_hover = True - else: - self._hover_text = "" - self._show_hover = False - event.app.invalidate() - - @kb.add("escape", filter=Condition(lambda: self._show_hover)) - def hide_hover(event): - self._show_hover = False - event.app.invalidate() - - @kb.add("c-p") # Ctrl+P for signature help (ASLS only) - async def show_signature_help(event): - # Only available for ASLS - if self._lsp_client and self._lsp_type == "assemblyscript": - doc = buffer.document - line = doc.cursor_position_row - col = doc.cursor_position_col - try: - sig_help = await self._lsp_client.signature_help(uri, line, col) - if sig_help and sig_help.get("signatures"): - sig = sig_help["signatures"][0] - label = sig.get("label", "") - self._hover_text = f"Signature: {label}" - self._show_hover = True - else: - self._hover_text = "" - self._show_hover = False - except Exception: - pass - event.app.invalidate() - - # Syntax highlighting - lexer = get_lexer_for_syntax(self.syntax) - - # Header - def get_header(): - if self._lsp_client: - if self._lsp_type == "yaml": - lsp_status = " [YAML LSP]" - elif self._lsp_type == "assemblyscript": - lsp_status = " [ASLS]" - else: - lsp_status = " [LSP]" - else: - lsp_status = "" - - parts = [ - ("class:header", f" {title}{lsp_status} "), - ("class:header.key", " Ctrl+S"), - ("class:header", "=Save "), - ("class:header.key", " Ctrl+Q"), - ("class:header", "=Quit "), - ("class:header.key", " F1"), - ("class:header", "=Hover "), - ] - - # Add Ctrl+P hint for AssemblyScript - if self._lsp_type == "assemblyscript" and self._lsp_client: - parts.extend([ - ("class:header.key", " Ctrl+P"), - ("class:header", "=Sig "), - ]) - - return parts - - header = Window( - height=1, - content=FormattedTextControl(get_header), - style="class:header", - ) - - # Editor window - editor_window = Window( - content=BufferControl( - buffer=buffer, - lexer=lexer, - ), - ) - - # Status bar - def get_status(): - row = buffer.document.cursor_position_row + 1 - col = buffer.document.cursor_position_col + 1 - lines = len(buffer.text.split("\n")) - - parts = [("class:status", f" Line {row}/{lines}, Col {col} ")] - - if self._diagnostics_text: - parts.append(("class:status.diag", f" | {self._diagnostics_text}")) - - return parts - - status_bar = Window( - height=1, - content=FormattedTextControl(get_status), - style="class:status", - ) - - # Hover popup (shown conditionally) - def get_hover_text(): - if self._show_hover and self._hover_text: - return [("class:hover", self._hover_text)] - return [] - - hover_window = ConditionalContainer( - Window( - height=3, - content=FormattedTextControl(get_hover_text), - style="class:hover", - ), - filter=Condition(lambda: self._show_hover and bool(self._hover_text)), - ) - - # Layout - layout = Layout( - HSplit([ - header, - editor_window, - hover_window, - status_bar, - ]) - ) - - # Styles - style = Style.from_dict({ - "header": "bg:#005f87 #ffffff", - "header.key": "bg:#005f87 #ffff00 bold", - "status": "bg:#444444 #ffffff", - "status.diag": "bg:#444444 #ff8800", - "hover": "bg:#333333 #ffffff italic", - "diagnostic.error": "bg:#5f0000 #ffffff", - "diagnostic.warning": "bg:#5f5f00 #ffffff", - }) - - # Set up diagnostics callback - async def on_text_changed(buff): - if self._lsp_client: - self._document_version += 1 - diagnostics = await self._lsp_client.did_change( - uri, buff.text, self._document_version - ) - if diagnostics: - errors = sum(1 for d in diagnostics if d.severity == "error") - warnings = sum(1 for d in diagnostics if d.severity == "warning") - parts = [] - if errors: - parts.append(f"{errors} error{'s' if errors > 1 else ''}") - if warnings: - parts.append(f"{warnings} warning{'s' if warnings > 1 else ''}") - self._diagnostics_text = " | ".join(parts) if parts else "" - else: - self._diagnostics_text = "" - - buffer.on_text_changed += lambda buff: asyncio.create_task(on_text_changed(buff)) - - # Create and run application - app: Application = Application( - layout=layout, - key_bindings=kb, - style=style, - full_screen=True, - mouse_support=True, - ) - - await app.run_async() - - return result["text"], result["saved"] - - -async def edit_text_async( - initial_text: str, - title: str = "Editor", - schema_type: Optional[str] = None, - syntax: str = "yaml", -) -> Tuple[Optional[str], bool]: - """ - Async wrapper for LSP-enabled text editing. - - Args: - initial_text: Text to edit - title: Title shown in header - schema_type: "organism" or "listener" for YAML schema modeline - syntax: Syntax highlighting ("yaml", "typescript", "assemblyscript", "ts", "as") - - Returns: - (edited_text, saved) - edited_text is None if cancelled - """ - editor = LSPEditor(schema_type=schema_type, syntax=syntax) - return await editor.edit(initial_text, title=title) - - -async def edit_file_async( - filepath: str, - title: Optional[str] = None, - schema_type: Optional[str] = None, - syntax: Optional[str] = None, -) -> bool: - """ - Edit a file with LSP support. - - Args: - filepath: Path to file - title: Optional title (defaults to filename) - schema_type: "organism" or "listener" for YAML schema modeline - syntax: Syntax highlighting (auto-detected from extension if not specified) - - Returns: - True if saved, False if cancelled - """ - path = Path(filepath) - title = title or path.name - - # Auto-detect syntax from extension if not specified - if syntax is None: - syntax = detect_syntax_from_path(path) - - # Load existing content or empty - if path.exists(): - initial_text = path.read_text() - else: - initial_text = "" - - # Edit - edited_text, saved = await edit_text_async( - initial_text, - title=title, - schema_type=schema_type, - syntax=syntax, - ) - - # Save if requested - if saved and edited_text is not None: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(edited_text) - return True - - return False - - -async def edit_assemblyscript_source( - filepath: str, - title: Optional[str] = None, -) -> bool: - """ - Edit an AssemblyScript listener source file with ASLS support. - - Args: - filepath: Path to .ts or .as file - title: Optional title (defaults to filename) - - Returns: - True if saved, False if cancelled - """ - return await edit_file_async( - filepath, - title=title, - syntax="assemblyscript", - ) diff --git a/xml_pipeline/console/lsp/__init__.py b/xml_pipeline/console/lsp/__init__.py deleted file mode 100644 index 2ff9a70..0000000 --- a/xml_pipeline/console/lsp/__init__.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -LSP (Language Server Protocol) integration for the editor. - -Provides: -- YAMLLSPClient: Wrapper for yaml-language-server communication -- ASLSClient: Wrapper for AssemblyScript language server communication -- LSPServerManager: Server lifecycle management -- LSPBridge: Integration with prompt_toolkit editor - -Supported Language Servers: -- yaml-language-server: npm install -g yaml-language-server -- asls (AssemblyScript): npm install -g assemblyscript-lsp -""" - -from __future__ import annotations - -from .client import ( - YAMLLSPClient, - LSPCompletion, - LSPDiagnostic, - LSPHover, - is_lsp_available, -) -from .asls_client import ( - ASLSClient, - ASLSConfig, - is_asls_available, - is_assemblyscript_file, - ASSEMBLYSCRIPT_EXTENSIONS, -) -from .manager import ( - LSPServerManager, - LSPServerType, - get_lsp_manager, - ensure_lsp_stopped, -) -from .bridge import ( - LSPCompleter, - DiagnosticsProcessor, -) - -__all__ = [ - # YAML Client - "YAMLLSPClient", - "LSPCompletion", - "LSPDiagnostic", - "LSPHover", - "is_lsp_available", - # AssemblyScript Client - "ASLSClient", - "ASLSConfig", - "is_asls_available", - "is_assemblyscript_file", - "ASSEMBLYSCRIPT_EXTENSIONS", - # Manager - "LSPServerManager", - "LSPServerType", - "get_lsp_manager", - "ensure_lsp_stopped", - # Bridge - "LSPCompleter", - "DiagnosticsProcessor", -] diff --git a/xml_pipeline/console/lsp/asls_client.py b/xml_pipeline/console/lsp/asls_client.py deleted file mode 100644 index 30ebde8..0000000 --- a/xml_pipeline/console/lsp/asls_client.py +++ /dev/null @@ -1,527 +0,0 @@ -""" -AssemblyScript Language Server Protocol client. - -Wraps communication with asls (AssemblyScript Language Server) for: -- Autocompletion for AgentServer SDK types -- Type checking and diagnostics -- Hover documentation - -Install: npm install -g assemblyscript-lsp - -Used for editing WASM listener source files written in AssemblyScript. -""" - -from __future__ import annotations - -import asyncio -import shutil -import logging -from dataclasses import dataclass -from pathlib import Path -from typing import Optional, Any - -from .client import ( - LSPCompletion, - LSPDiagnostic, - LSPHover, -) - -logger = logging.getLogger(__name__) - - -def _check_asls() -> bool: - """Check if asls (AssemblyScript Language Server) is installed.""" - return shutil.which("asls") is not None - - -def is_asls_available() -> tuple[bool, str]: - """ - Check if AssemblyScript LSP support is available. - - Returns (available, reason) tuple. - """ - if not _check_asls(): - return False, "asls not found (npm install -g assemblyscript-lsp)" - - return True, "AssemblyScript LSP available" - - -# File extensions handled by ASLS -ASSEMBLYSCRIPT_EXTENSIONS = {".ts", ".as"} - - -def is_assemblyscript_file(path: str | Path) -> bool: - """Check if a file should use the AssemblyScript LSP.""" - return Path(path).suffix.lower() in ASSEMBLYSCRIPT_EXTENSIONS - - -@dataclass -class ASLSConfig: - """ - Configuration for the AssemblyScript Language Server. - - These settings are passed during initialization. - """ - - # Path to asconfig.json (AssemblyScript project config) - asconfig_path: Optional[str] = None - - # Path to AgentServer SDK type definitions - sdk_types_path: Optional[str] = None - - # Enable strict null checks - strict_null_checks: bool = True - - # Enable additional diagnostics - verbose_diagnostics: bool = False - - -class ASLSClient: - """ - Client for communicating with the AssemblyScript Language Server. - - Uses stdio for communication with the language server process. - - Usage: - client = ASLSClient() - if await client.start(): - await client.did_open(uri, content) - completions = await client.completion(uri, line, col) - await client.stop() - """ - - def __init__(self, config: Optional[ASLSConfig] = None): - """ - Initialize the ASLS client. - - Args: - config: Optional ASLS configuration - """ - self.config = config or ASLSConfig() - self._process: Optional[asyncio.subprocess.Process] = None - self._reader_task: Optional[asyncio.Task] = None - self._request_id = 0 - self._pending_requests: dict[int, asyncio.Future] = {} - self._diagnostics: dict[str, list[LSPDiagnostic]] = {} - self._initialized = False - self._lock = asyncio.Lock() - - async def start(self) -> bool: - """ - Start the AssemblyScript language server. - - Returns True if started successfully. - """ - available, reason = is_asls_available() - if not available: - logger.warning(f"ASLS not available: {reason}") - return False - - try: - self._process = await asyncio.create_subprocess_exec( - "asls", "--stdio", - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - # Start reader task - self._reader_task = asyncio.create_task(self._read_messages()) - - # Initialize LSP - await self._initialize() - self._initialized = True - logger.info("AssemblyScript language server started") - return True - - except Exception as e: - logger.error(f"Failed to start asls: {e}") - await self.stop() - return False - - async def stop(self) -> None: - """Stop the language server.""" - self._initialized = False - - if self._reader_task: - self._reader_task.cancel() - try: - await self._reader_task - except asyncio.CancelledError: - pass - self._reader_task = None - - if self._process: - self._process.terminate() - try: - await asyncio.wait_for(self._process.wait(), timeout=2) - except asyncio.TimeoutError: - self._process.kill() - self._process = None - - # Cancel pending requests - for future in self._pending_requests.values(): - if not future.done(): - future.cancel() - self._pending_requests.clear() - - async def _initialize(self) -> None: - """Send LSP initialize request.""" - init_options: dict[str, Any] = {} - - if self.config.asconfig_path: - init_options["asconfigPath"] = self.config.asconfig_path - - if self.config.sdk_types_path: - init_options["sdkTypesPath"] = self.config.sdk_types_path - - result = await self._request( - "initialize", - { - "processId": None, - "rootUri": None, - "capabilities": { - "textDocument": { - "completion": { - "completionItem": { - "snippetSupport": True, - "documentationFormat": ["markdown", "plaintext"], - } - }, - "hover": { - "contentFormat": ["markdown", "plaintext"], - }, - "publishDiagnostics": { - "relatedInformation": True, - }, - "signatureHelp": { - "signatureInformation": { - "documentationFormat": ["markdown", "plaintext"], - } - }, - } - }, - "initializationOptions": init_options, - }, - ) - logger.debug(f"ASLS initialized: {result}") - - # Send initialized notification - await self._notify("initialized", {}) - - async def did_open(self, uri: str, content: str) -> None: - """Notify server that a document was opened.""" - if not self._initialized: - return - - # Determine language ID based on extension - language_id = "assemblyscript" - if uri.endswith(".ts"): - language_id = "typescript" # ASLS may prefer this - - await self._notify( - "textDocument/didOpen", - { - "textDocument": { - "uri": uri, - "languageId": language_id, - "version": 1, - "text": content, - } - }, - ) - - async def did_change( - self, uri: str, content: str, version: int = 1 - ) -> list[LSPDiagnostic]: - """ - Notify server of document change. - - Returns current diagnostics for the document. - """ - if not self._initialized: - return [] - - await self._notify( - "textDocument/didChange", - { - "textDocument": {"uri": uri, "version": version}, - "contentChanges": [{"text": content}], - }, - ) - - # Wait briefly for diagnostics - await asyncio.sleep(0.2) # ASLS may need more time than YAML - - return self._diagnostics.get(uri, []) - - async def did_close(self, uri: str) -> None: - """Notify server that a document was closed.""" - if not self._initialized: - return - - await self._notify( - "textDocument/didClose", - {"textDocument": {"uri": uri}}, - ) - - # Clear diagnostics - self._diagnostics.pop(uri, None) - - async def completion( - self, uri: str, line: int, column: int - ) -> list[LSPCompletion]: - """ - Request completions at a position. - - Args: - uri: Document URI - line: 0-indexed line number - column: 0-indexed column number - - Returns list of completion items. - """ - if not self._initialized: - return [] - - try: - result = await self._request( - "textDocument/completion", - { - "textDocument": {"uri": uri}, - "position": {"line": line, "character": column}, - }, - ) - - if result is None: - return [] - - items = result.get("items", []) if isinstance(result, dict) else result - return [LSPCompletion.from_lsp(item) for item in items] - - except Exception as e: - logger.debug(f"ASLS completion request failed: {e}") - return [] - - async def hover(self, uri: str, line: int, column: int) -> Optional[LSPHover]: - """ - Request hover information at a position. - - Args: - uri: Document URI - line: 0-indexed line number - column: 0-indexed column number - """ - if not self._initialized: - return None - - try: - result = await self._request( - "textDocument/hover", - { - "textDocument": {"uri": uri}, - "position": {"line": line, "character": column}, - }, - ) - - return LSPHover.from_lsp(result) if result else None - - except Exception as e: - logger.debug(f"ASLS hover request failed: {e}") - return None - - async def signature_help( - self, uri: str, line: int, column: int - ) -> Optional[dict[str, Any]]: - """ - Request signature help at a position. - - Useful when typing function arguments. - """ - if not self._initialized: - return None - - try: - result = await self._request( - "textDocument/signatureHelp", - { - "textDocument": {"uri": uri}, - "position": {"line": line, "character": column}, - }, - ) - return result - - except Exception as e: - logger.debug(f"ASLS signature help request failed: {e}") - return None - - async def go_to_definition( - self, uri: str, line: int, column: int - ) -> Optional[list[dict[str, Any]]]: - """ - Request go-to-definition at a position. - - Returns list of location objects. - """ - if not self._initialized: - return None - - try: - result = await self._request( - "textDocument/definition", - { - "textDocument": {"uri": uri}, - "position": {"line": line, "character": column}, - }, - ) - - if result is None: - return None - - # Normalize to list - if isinstance(result, dict): - return [result] - return result - - except Exception as e: - logger.debug(f"ASLS go-to-definition failed: {e}") - return None - - def get_diagnostics(self, uri: str) -> list[LSPDiagnostic]: - """Get current diagnostics for a document.""" - return self._diagnostics.get(uri, []) - - # ------------------------------------------------------------------------- - # LSP Protocol Implementation (shared pattern with YAMLLSPClient) - # ------------------------------------------------------------------------- - - async def _request(self, method: str, params: dict[str, Any]) -> Any: - """Send a request and wait for response.""" - async with self._lock: - self._request_id += 1 - req_id = self._request_id - - message = { - "jsonrpc": "2.0", - "id": req_id, - "method": method, - "params": params, - } - - future: asyncio.Future = asyncio.Future() - self._pending_requests[req_id] = future - - try: - await self._send_message(message) - return await asyncio.wait_for(future, timeout=10.0) # Longer timeout for ASLS - except asyncio.TimeoutError: - logger.warning(f"ASLS request timed out: {method}") - return None - finally: - self._pending_requests.pop(req_id, None) - - async def _notify(self, method: str, params: dict[str, Any]) -> None: - """Send a notification (no response expected).""" - message = { - "jsonrpc": "2.0", - "method": method, - "params": params, - } - await self._send_message(message) - - async def _send_message(self, message: dict[str, Any]) -> None: - """Send a JSON-RPC message to the server.""" - if not self._process or not self._process.stdin: - return - - import json - content = json.dumps(message) - header = f"Content-Length: {len(content)}\r\n\r\n" - - try: - self._process.stdin.write(header.encode()) - self._process.stdin.write(content.encode()) - await self._process.stdin.drain() - except (BrokenPipeError, OSError, ConnectionResetError) as e: - logger.error(f"Failed to send ASLS message: {e}") - - async def _read_messages(self) -> None: - """Read messages from the server.""" - if not self._process or not self._process.stdout: - return - - import json - - try: - while True: - # Read header - header = b"" - while b"\r\n\r\n" not in header: - chunk = await self._process.stdout.read(1) - if not chunk: - return # EOF - header += chunk - - # Parse content length - content_length = 0 - for line in header.decode().split("\r\n"): - if line.startswith("Content-Length:"): - content_length = int(line.split(":")[1].strip()) - break - - if content_length == 0: - continue - - # Read content - content = await self._process.stdout.read(content_length) - - if not content: - return - - # Parse and handle message - try: - message = json.loads(content.decode()) - await self._handle_message(message) - except json.JSONDecodeError as e: - logger.error(f"Failed to parse ASLS message: {e}") - - except asyncio.CancelledError: - pass - except Exception as e: - logger.error(f"ASLS reader error: {e}") - - async def _handle_message(self, message: dict[str, Any]) -> None: - """Handle an incoming LSP message.""" - if "id" in message and "result" in message: - # Response to a request - req_id = message["id"] - if req_id in self._pending_requests: - future = self._pending_requests[req_id] - if not future.done(): - future.set_result(message.get("result")) - - elif "id" in message and "error" in message: - # Error response - req_id = message["id"] - if req_id in self._pending_requests: - future = self._pending_requests[req_id] - if not future.done(): - error = message["error"] - future.set_exception( - Exception(f"ASLS error: {error.get('message', error)}") - ) - - elif message.get("method") == "textDocument/publishDiagnostics": - # Diagnostics notification - params = message.get("params", {}) - uri = params.get("uri", "") - diagnostics = [ - LSPDiagnostic.from_lsp(d) - for d in params.get("diagnostics", []) - ] - self._diagnostics[uri] = diagnostics - logger.debug(f"ASLS: {len(diagnostics)} diagnostics for {uri}") - - elif "method" in message: - # Other notification - logger.debug(f"ASLS notification: {message.get('method')}") diff --git a/xml_pipeline/console/lsp/bridge.py b/xml_pipeline/console/lsp/bridge.py deleted file mode 100644 index 4da9d4a..0000000 --- a/xml_pipeline/console/lsp/bridge.py +++ /dev/null @@ -1,314 +0,0 @@ -""" -Bridge between LSP client and prompt_toolkit. - -Provides: -- LSPCompleter: Async completer for prompt_toolkit using LSP -- DiagnosticsProcessor: Processes diagnostics for inline display -""" - -from __future__ import annotations - -import asyncio -from dataclasses import dataclass -from typing import Optional, Iterable, TYPE_CHECKING - -if TYPE_CHECKING: - from .client import YAMLLSPClient, LSPDiagnostic, LSPCompletion - -try: - from prompt_toolkit.completion import Completer, Completion - from prompt_toolkit.document import Document - PROMPT_TOOLKIT_AVAILABLE = True -except ImportError: - PROMPT_TOOLKIT_AVAILABLE = False - # Stub classes for type checking - class Completer: # type: ignore - pass - class Completion: # type: ignore - pass - class Document: # type: ignore - pass - - -@dataclass -class DiagnosticMark: - """A diagnostic marker for display in the editor.""" - - line: int - column: int - end_column: int - message: str - severity: str # error, warning, info, hint - - @property - def is_error(self) -> bool: - return self.severity == "error" - - @property - def is_warning(self) -> bool: - return self.severity == "warning" - - @property - def style(self) -> str: - """Get prompt_toolkit style for this diagnostic.""" - if self.severity == "error": - return "class:diagnostic.error" - elif self.severity == "warning": - return "class:diagnostic.warning" - elif self.severity == "info": - return "class:diagnostic.info" - else: - return "class:diagnostic.hint" - - -class LSPCompleter(Completer): - """ - prompt_toolkit completer that uses LSP for suggestions. - - Usage: - completer = LSPCompleter(lsp_client, document_uri) - buffer = Buffer(completer=completer) - """ - - def __init__( - self, - client: Optional["YAMLLSPClient"], - uri: str, - fallback_completer: Optional[Completer] = None, - ): - """ - Initialize the LSP completer. - - Args: - client: LSP client (can be None for fallback-only mode) - uri: Document URI for LSP requests - fallback_completer: Fallback when LSP unavailable - """ - self.client = client - self.uri = uri - self.fallback_completer = fallback_completer - self._cache: dict[tuple[int, int], list["LSPCompletion"]] = {} - self._cache_version = 0 - - def invalidate_cache(self) -> None: - """Invalidate the completion cache.""" - self._cache.clear() - self._cache_version += 1 - - def get_completions( - self, - document: Document, - complete_event, - ) -> Iterable[Completion]: - """ - Get completions for the current document position. - - This is called synchronously by prompt_toolkit. - We use a cached result if available, otherwise - return nothing (async completions handled separately). - """ - if not PROMPT_TOOLKIT_AVAILABLE: - return - - # Get current position - line = document.cursor_position_row - col = document.cursor_position_col - - # Check cache - cache_key = (line, col) - if cache_key in self._cache: - completions = self._cache[cache_key] - for item in completions: - yield Completion( - text=item.insert_text or item.label, - start_position=-len(self._get_word_before_cursor(document)), - display=item.label, - display_meta=item.detail or item.kind, - ) - return - - # Fallback to basic completer - if self.fallback_completer: - yield from self.fallback_completer.get_completions( - document, complete_event - ) - - async def get_completions_async( - self, - document: Document, - ) -> list["LSPCompletion"]: - """ - Get completions asynchronously from LSP. - - Call this when Ctrl+Space is pressed. - """ - if self.client is None: - return [] - - line = document.cursor_position_row - col = document.cursor_position_col - - # Request from LSP - completions = await self.client.completion(self.uri, line, col) - - # Cache result - self._cache[(line, col)] = completions - - return completions - - def _get_word_before_cursor(self, document: Document) -> str: - """Get the word being typed before cursor.""" - text = document.text_before_cursor - if not text: - return "" - - # Find word boundary - i = len(text) - 1 - while i >= 0 and (text[i].isalnum() or text[i] in "_-"): - i -= 1 - - return text[i + 1:] - - -class DiagnosticsProcessor: - """ - Processes LSP diagnostics for display in the editor. - - Converts LSP diagnostics into markers that can be - displayed inline in the prompt_toolkit editor. - """ - - def __init__(self, client: Optional["YAMLLSPClient"], uri: str): - self.client = client - self.uri = uri - self._marks: list[DiagnosticMark] = [] - - def get_marks(self) -> list[DiagnosticMark]: - """Get current diagnostic marks.""" - return self._marks - - def get_marks_for_line(self, line: int) -> list[DiagnosticMark]: - """Get diagnostic marks for a specific line.""" - return [m for m in self._marks if m.line == line] - - def has_errors(self) -> bool: - """Check if there are any error-level diagnostics.""" - return any(m.is_error for m in self._marks) - - def has_warnings(self) -> bool: - """Check if there are any warning-level diagnostics.""" - return any(m.is_warning for m in self._marks) - - def get_error_count(self) -> int: - """Get number of errors.""" - return sum(1 for m in self._marks if m.is_error) - - def get_warning_count(self) -> int: - """Get number of warnings.""" - return sum(1 for m in self._marks if m.is_warning) - - async def update(self, content: str, version: int = 1) -> list[DiagnosticMark]: - """ - Update diagnostics by sending content to LSP. - - Returns the new list of diagnostic marks. - """ - if self.client is None: - self._marks = [] - return [] - - diagnostics = await self.client.did_change(self.uri, content, version) - - self._marks = [ - DiagnosticMark( - line=d.line, - column=d.column, - end_column=d.end_column, - message=d.message, - severity=d.severity, - ) - for d in diagnostics - ] - - return self._marks - - def format_status(self) -> str: - """Format diagnostics as status bar text.""" - errors = self.get_error_count() - warnings = self.get_warning_count() - - if errors == 0 and warnings == 0: - return "" - - parts = [] - if errors > 0: - parts.append(f"{errors} error{'s' if errors > 1 else ''}") - if warnings > 0: - parts.append(f"{warnings} warning{'s' if warnings > 1 else ''}") - - return " | ".join(parts) - - def format_messages(self, max_lines: int = 3) -> list[str]: - """Format diagnostic messages for display.""" - messages = [] - - for mark in self._marks[:max_lines]: - prefix = "E" if mark.is_error else "W" - messages.append(f"[{prefix}] Line {mark.line + 1}: {mark.message}") - - remaining = len(self._marks) - max_lines - if remaining > 0: - messages.append(f"... and {remaining} more") - - return messages - - -class HoverPopup: - """ - Manages hover information display. - - Shows documentation when hovering over a field - or pressing F1 on a position. - """ - - def __init__(self, client: Optional["YAMLLSPClient"], uri: str): - self.client = client - self.uri = uri - self._current_hover: Optional[str] = None - self._hover_position: Optional[tuple[int, int]] = None - - async def get_hover(self, line: int, col: int) -> Optional[str]: - """ - Get hover information for a position. - - Returns formatted hover text or None. - """ - if self.client is None: - return None - - hover = await self.client.hover(self.uri, line, col) - - if hover is None: - self._current_hover = None - self._hover_position = None - return None - - self._current_hover = hover.contents - self._hover_position = (line, col) - - return hover.contents - - def clear(self) -> None: - """Clear current hover.""" - self._current_hover = None - self._hover_position = None - - @property - def has_hover(self) -> bool: - """Check if there's an active hover.""" - return self._current_hover is not None - - @property - def text(self) -> str: - """Get current hover text.""" - return self._current_hover or "" diff --git a/xml_pipeline/console/lsp/client.py b/xml_pipeline/console/lsp/client.py deleted file mode 100644 index 2efceb4..0000000 --- a/xml_pipeline/console/lsp/client.py +++ /dev/null @@ -1,538 +0,0 @@ -""" -YAML Language Server Protocol client. - -Wraps communication with yaml-language-server for: -- Autocompletion -- Diagnostics (validation errors) -- Hover information -""" - -from __future__ import annotations - -import asyncio -import json -import subprocess -import shutil -from dataclasses import dataclass, field -from pathlib import Path -from typing import Optional, Any -import logging - -logger = logging.getLogger(__name__) - - -# Check for lsp-client availability -def _check_lsp_client() -> bool: - """Check if lsp-client package is available.""" - try: - import lsp_client # noqa: F401 - return True - except ImportError: - return False - - -def _check_yaml_language_server() -> bool: - """Check if yaml-language-server is installed.""" - return shutil.which("yaml-language-server") is not None - - -def is_lsp_available() -> tuple[bool, str]: - """ - Check if LSP support is available. - - Returns (available, reason) tuple. - """ - if not _check_lsp_client(): - return False, "lsp-client package not installed (pip install lsp-client)" - - if not _check_yaml_language_server(): - return False, "yaml-language-server not found (npm install -g yaml-language-server)" - - return True, "LSP available" - - -@dataclass -class LSPCompletion: - """Normalized completion item from LSP.""" - - label: str - kind: str = "text" # text, keyword, property, value, snippet - detail: str = "" - documentation: str = "" - insert_text: str = "" - sort_text: str = "" - - @classmethod - def from_lsp(cls, item: dict[str, Any]) -> "LSPCompletion": - """Create from LSP CompletionItem.""" - kind_map = { - 1: "text", - 2: "method", - 3: "function", - 5: "field", - 6: "variable", - 9: "module", - 10: "property", - 12: "value", - 14: "keyword", - 15: "snippet", - } - - return cls( - label=item.get("label", ""), - kind=kind_map.get(item.get("kind", 1), "text"), - detail=item.get("detail", ""), - documentation=_extract_documentation(item.get("documentation")), - insert_text=item.get("insertText", item.get("label", "")), - sort_text=item.get("sortText", item.get("label", "")), - ) - - -@dataclass -class LSPDiagnostic: - """Normalized diagnostic from LSP.""" - - line: int - column: int - end_line: int - end_column: int - message: str - severity: str = "error" # error, warning, info, hint - source: str = "yaml-language-server" - - @classmethod - def from_lsp(cls, diag: dict[str, Any]) -> "LSPDiagnostic": - """Create from LSP Diagnostic.""" - severity_map = {1: "error", 2: "warning", 3: "info", 4: "hint"} - - range_data = diag.get("range", {}) - start = range_data.get("start", {}) - end = range_data.get("end", {}) - - return cls( - line=start.get("line", 0), - column=start.get("character", 0), - end_line=end.get("line", 0), - end_column=end.get("character", 0), - message=diag.get("message", ""), - severity=severity_map.get(diag.get("severity", 1), "error"), - source=diag.get("source", "yaml-language-server"), - ) - - -@dataclass -class LSPHover: - """Normalized hover information from LSP.""" - - contents: str - range_start_line: Optional[int] = None - range_start_col: Optional[int] = None - - @classmethod - def from_lsp(cls, hover: dict[str, Any]) -> Optional["LSPHover"]: - """Create from LSP Hover response.""" - if not hover: - return None - - contents = hover.get("contents") - if isinstance(contents, str): - text = contents - elif isinstance(contents, dict): - text = contents.get("value", str(contents)) - elif isinstance(contents, list): - text = "\n".join( - c.get("value", str(c)) if isinstance(c, dict) else str(c) - for c in contents - ) - else: - return None - - range_data = hover.get("range", {}) - start = range_data.get("start", {}) - - return cls( - contents=text, - range_start_line=start.get("line"), - range_start_col=start.get("character"), - ) - - -def _extract_documentation(doc: Any) -> str: - """Extract documentation string from LSP documentation field.""" - if doc is None: - return "" - if isinstance(doc, str): - return doc - if isinstance(doc, dict): - return doc.get("value", "") - return str(doc) - - -class YAMLLSPClient: - """ - Client for communicating with yaml-language-server. - - Uses stdio for communication with the language server process. - """ - - def __init__(self, schema_uri: Optional[str] = None): - """ - Initialize the LSP client. - - Args: - schema_uri: Default schema URI for YAML files - """ - self.schema_uri = schema_uri - self._process: Optional[subprocess.Popen] = None - self._reader_task: Optional[asyncio.Task] = None - self._request_id = 0 - self._pending_requests: dict[int, asyncio.Future] = {} - self._diagnostics: dict[str, list[LSPDiagnostic]] = {} - self._initialized = False - self._lock = asyncio.Lock() - - async def start(self) -> bool: - """ - Start the language server. - - Returns True if started successfully. - """ - available, reason = is_lsp_available() - if not available: - logger.warning(f"LSP not available: {reason}") - return False - - try: - self._process = subprocess.Popen( - ["yaml-language-server", "--stdio"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - # Start reader task - self._reader_task = asyncio.create_task(self._read_messages()) - - # Initialize LSP - await self._initialize() - self._initialized = True - return True - - except Exception as e: - logger.error(f"Failed to start yaml-language-server: {e}") - await self.stop() - return False - - async def stop(self) -> None: - """Stop the language server.""" - self._initialized = False - - if self._reader_task: - self._reader_task.cancel() - try: - await self._reader_task - except asyncio.CancelledError: - pass - self._reader_task = None - - if self._process: - self._process.terminate() - try: - self._process.wait(timeout=2) - except subprocess.TimeoutExpired: - self._process.kill() - self._process = None - - # Cancel pending requests - for future in self._pending_requests.values(): - if not future.done(): - future.cancel() - self._pending_requests.clear() - - async def _initialize(self) -> None: - """Send LSP initialize request.""" - result = await self._request( - "initialize", - { - "processId": None, - "rootUri": None, - "capabilities": { - "textDocument": { - "completion": { - "completionItem": { - "snippetSupport": True, - "documentationFormat": ["markdown", "plaintext"], - } - }, - "hover": { - "contentFormat": ["markdown", "plaintext"], - }, - "publishDiagnostics": {}, - } - }, - "initializationOptions": { - "yaml": { - "validate": True, - "hover": True, - "completion": True, - "schemas": {}, - } - }, - }, - ) - logger.debug(f"LSP initialized: {result}") - - # Send initialized notification - await self._notify("initialized", {}) - - async def did_open(self, uri: str, content: str) -> None: - """Notify server that a document was opened.""" - if not self._initialized: - return - - await self._notify( - "textDocument/didOpen", - { - "textDocument": { - "uri": uri, - "languageId": "yaml", - "version": 1, - "text": content, - } - }, - ) - - async def did_change(self, uri: str, content: str, version: int = 1) -> list[LSPDiagnostic]: - """ - Notify server of document change. - - Returns current diagnostics for the document. - """ - if not self._initialized: - return [] - - await self._notify( - "textDocument/didChange", - { - "textDocument": {"uri": uri, "version": version}, - "contentChanges": [{"text": content}], - }, - ) - - # Wait briefly for diagnostics - await asyncio.sleep(0.1) - - return self._diagnostics.get(uri, []) - - async def did_close(self, uri: str) -> None: - """Notify server that a document was closed.""" - if not self._initialized: - return - - await self._notify( - "textDocument/didClose", - {"textDocument": {"uri": uri}}, - ) - - # Clear diagnostics - self._diagnostics.pop(uri, None) - - async def completion( - self, uri: str, line: int, column: int - ) -> list[LSPCompletion]: - """ - Request completions at a position. - - Args: - uri: Document URI - line: 0-indexed line number - column: 0-indexed column number - - Returns list of completion items. - """ - if not self._initialized: - return [] - - try: - result = await self._request( - "textDocument/completion", - { - "textDocument": {"uri": uri}, - "position": {"line": line, "character": column}, - }, - ) - - if result is None: - return [] - - items = result.get("items", []) if isinstance(result, dict) else result - return [LSPCompletion.from_lsp(item) for item in items] - - except Exception as e: - logger.debug(f"Completion request failed: {e}") - return [] - - async def hover(self, uri: str, line: int, column: int) -> Optional[LSPHover]: - """ - Request hover information at a position. - - Args: - uri: Document URI - line: 0-indexed line number - column: 0-indexed column number - """ - if not self._initialized: - return None - - try: - result = await self._request( - "textDocument/hover", - { - "textDocument": {"uri": uri}, - "position": {"line": line, "character": column}, - }, - ) - - return LSPHover.from_lsp(result) if result else None - - except Exception as e: - logger.debug(f"Hover request failed: {e}") - return None - - def get_diagnostics(self, uri: str) -> list[LSPDiagnostic]: - """Get current diagnostics for a document.""" - return self._diagnostics.get(uri, []) - - async def _request(self, method: str, params: dict[str, Any]) -> Any: - """Send a request and wait for response.""" - async with self._lock: - self._request_id += 1 - req_id = self._request_id - - message = { - "jsonrpc": "2.0", - "id": req_id, - "method": method, - "params": params, - } - - future: asyncio.Future = asyncio.Future() - self._pending_requests[req_id] = future - - try: - await self._send_message(message) - return await asyncio.wait_for(future, timeout=5.0) - except asyncio.TimeoutError: - logger.warning(f"LSP request timed out: {method}") - return None - finally: - self._pending_requests.pop(req_id, None) - - async def _notify(self, method: str, params: dict[str, Any]) -> None: - """Send a notification (no response expected).""" - message = { - "jsonrpc": "2.0", - "method": method, - "params": params, - } - await self._send_message(message) - - async def _send_message(self, message: dict[str, Any]) -> None: - """Send a JSON-RPC message to the server.""" - if not self._process or not self._process.stdin: - return - - content = json.dumps(message) - header = f"Content-Length: {len(content)}\r\n\r\n" - - try: - self._process.stdin.write(header.encode()) - self._process.stdin.write(content.encode()) - self._process.stdin.flush() - except (BrokenPipeError, OSError) as e: - logger.error(f"Failed to send LSP message: {e}") - - async def _read_messages(self) -> None: - """Read messages from the server.""" - if not self._process or not self._process.stdout: - return - - loop = asyncio.get_event_loop() - - try: - while True: - # Read header - header = b"" - while b"\r\n\r\n" not in header: - chunk = await loop.run_in_executor( - None, self._process.stdout.read, 1 - ) - if not chunk: - return # EOF - header += chunk - - # Parse content length - content_length = 0 - for line in header.decode().split("\r\n"): - if line.startswith("Content-Length:"): - content_length = int(line.split(":")[1].strip()) - break - - if content_length == 0: - continue - - # Read content - content = await loop.run_in_executor( - None, self._process.stdout.read, content_length - ) - - if not content: - return - - # Parse and handle message - try: - message = json.loads(content.decode()) - await self._handle_message(message) - except json.JSONDecodeError as e: - logger.error(f"Failed to parse LSP message: {e}") - - except asyncio.CancelledError: - pass - except Exception as e: - logger.error(f"LSP reader error: {e}") - - async def _handle_message(self, message: dict[str, Any]) -> None: - """Handle an incoming LSP message.""" - if "id" in message and "result" in message: - # Response to a request - req_id = message["id"] - if req_id in self._pending_requests: - future = self._pending_requests[req_id] - if not future.done(): - future.set_result(message.get("result")) - - elif "id" in message and "error" in message: - # Error response - req_id = message["id"] - if req_id in self._pending_requests: - future = self._pending_requests[req_id] - if not future.done(): - error = message["error"] - future.set_exception( - Exception(f"LSP error: {error.get('message', error)}") - ) - - elif message.get("method") == "textDocument/publishDiagnostics": - # Diagnostics notification - params = message.get("params", {}) - uri = params.get("uri", "") - diagnostics = [ - LSPDiagnostic.from_lsp(d) - for d in params.get("diagnostics", []) - ] - self._diagnostics[uri] = diagnostics - logger.debug(f"Received {len(diagnostics)} diagnostics for {uri}") - - elif "method" in message: - # Other notification - logger.debug(f"LSP notification: {message.get('method')}") diff --git a/xml_pipeline/console/lsp/manager.py b/xml_pipeline/console/lsp/manager.py deleted file mode 100644 index 2882969..0000000 --- a/xml_pipeline/console/lsp/manager.py +++ /dev/null @@ -1,211 +0,0 @@ -""" -LSP Server lifecycle manager. - -Manages language server instances that can be shared across -multiple editor sessions. Supports multiple language servers: -- yaml-language-server (for config files) -- asls (for AssemblyScript listener source) -""" - -from __future__ import annotations - -import asyncio -import logging -from enum import Enum -from typing import Optional, Union - -from .client import YAMLLSPClient, is_lsp_available -from .asls_client import ASLSClient, ASLSConfig, is_asls_available - -logger = logging.getLogger(__name__) - - -class LSPServerType(Enum): - """Supported language server types.""" - YAML = "yaml" - ASSEMBLYSCRIPT = "assemblyscript" - - -# Type alias for any LSP client -LSPClient = Union[YAMLLSPClient, ASLSClient] - - -class LSPServerManager: - """ - Manages the lifecycle of LSP servers. - - Provides singleton client instances that start on first use - and stop when explicitly requested or when the process exits. - - Supports multiple language servers running concurrently. - """ - - def __init__(self): - self._clients: dict[LSPServerType, LSPClient] = {} - self._ref_counts: dict[LSPServerType, int] = {} - self._lock = asyncio.Lock() - - def is_running(self, server_type: LSPServerType = LSPServerType.YAML) -> bool: - """Check if a specific LSP server is running.""" - client = self._clients.get(server_type) - return client is not None and client._initialized - - async def get_client( - self, - server_type: LSPServerType = LSPServerType.YAML, - asls_config: Optional[ASLSConfig] = None, - ) -> Optional[LSPClient]: - """ - Get an LSP client, starting the server if needed. - - Args: - server_type: Which language server to get - asls_config: Configuration for ASLS (only used if server_type is ASSEMBLYSCRIPT) - - Returns None if the requested LSP is not available. - """ - async with self._lock: - # Check if already running - if server_type in self._clients: - client = self._clients[server_type] - if client._initialized: - self._ref_counts[server_type] = self._ref_counts.get(server_type, 0) + 1 - return client - - # Start the appropriate server - if server_type == LSPServerType.YAML: - return await self._start_yaml_server() - elif server_type == LSPServerType.ASSEMBLYSCRIPT: - return await self._start_asls_server(asls_config) - else: - logger.error(f"Unknown LSP server type: {server_type}") - return None - - async def _start_yaml_server(self) -> Optional[YAMLLSPClient]: - """Start the YAML language server.""" - available, reason = is_lsp_available() - if not available: - logger.info(f"YAML LSP not available: {reason}") - return None - - client = YAMLLSPClient() - success = await client.start() - - if success: - self._clients[LSPServerType.YAML] = client - self._ref_counts[LSPServerType.YAML] = 1 - logger.info("yaml-language-server started") - return client - else: - return None - - async def _start_asls_server( - self, config: Optional[ASLSConfig] = None - ) -> Optional[ASLSClient]: - """Start the AssemblyScript language server.""" - available, reason = is_asls_available() - if not available: - logger.info(f"ASLS not available: {reason}") - return None - - client = ASLSClient(config=config) - success = await client.start() - - if success: - self._clients[LSPServerType.ASSEMBLYSCRIPT] = client - self._ref_counts[LSPServerType.ASSEMBLYSCRIPT] = 1 - logger.info("AssemblyScript language server started") - return client - else: - return None - - async def release_client( - self, server_type: LSPServerType = LSPServerType.YAML - ) -> None: - """ - Release a reference to a client. - - Stops the server when the last reference is released. - """ - async with self._lock: - if server_type not in self._ref_counts: - return - - self._ref_counts[server_type] -= 1 - - if self._ref_counts[server_type] <= 0: - client = self._clients.pop(server_type, None) - self._ref_counts.pop(server_type, None) - - if client is not None: - await client.stop() - logger.info(f"{server_type.value} language server stopped") - - async def stop(self, server_type: Optional[LSPServerType] = None) -> None: - """ - Force stop LSP server(s). - - Args: - server_type: Specific server to stop, or None to stop all - """ - async with self._lock: - if server_type is not None: - # Stop specific server - client = self._clients.pop(server_type, None) - self._ref_counts.pop(server_type, None) - if client is not None: - await client.stop() - logger.info(f"{server_type.value} language server stopped (forced)") - else: - # Stop all servers - for st, client in list(self._clients.items()): - await client.stop() - logger.info(f"{st.value} language server stopped (forced)") - self._clients.clear() - self._ref_counts.clear() - - async def stop_all(self) -> None: - """Force stop all LSP servers.""" - await self.stop(None) - - # Convenience methods for YAML (backwards compatible) - - async def get_yaml_client(self) -> Optional[YAMLLSPClient]: - """Get YAML LSP client (convenience method).""" - client = await self.get_client(LSPServerType.YAML) - return client if isinstance(client, YAMLLSPClient) else None - - async def get_asls_client( - self, config: Optional[ASLSConfig] = None - ) -> Optional[ASLSClient]: - """Get AssemblyScript LSP client (convenience method).""" - client = await self.get_client(LSPServerType.ASSEMBLYSCRIPT, asls_config=config) - return client if isinstance(client, ASLSClient) else None - - # Context manager for YAML (backwards compatible) - - async def __aenter__(self) -> Optional[YAMLLSPClient]: - """Context manager entry - get YAML client.""" - return await self.get_yaml_client() - - async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: - """Context manager exit - release YAML client.""" - await self.release_client(LSPServerType.YAML) - - -# Global singleton -_manager: Optional[LSPServerManager] = None - - -def get_lsp_manager() -> LSPServerManager: - """Get the global LSP server manager.""" - global _manager - if _manager is None: - _manager = LSPServerManager() - return _manager - - -async def ensure_lsp_stopped() -> None: - """Ensure all LSP servers are stopped. Call on application shutdown.""" - if _manager is not None: - await _manager.stop_all() diff --git a/xml_pipeline/console/secure_console.py b/xml_pipeline/console/secure_console.py deleted file mode 100644 index 910f46a..0000000 --- a/xml_pipeline/console/secure_console.py +++ /dev/null @@ -1,980 +0,0 @@ -""" -secure_console.py — Password-protected console for privileged operations. - -The console is the sole privileged interface to the organism. Privileged -operations are only accessible via local keyboard input, never over the network. - -Features: -- Password protection with Argon2id hashing -- Protected commands require password re-entry -- Attach/detach model with idle timeout -- Integration with context buffer for /monitor - -Security model: -- Keyboard = Local = Trusted -- No network port for privileged operations -- Password hash stored in ~/.xml-pipeline/console.key (mode 600) -""" - -from __future__ import annotations - -import asyncio -import getpass -import os -import stat -import sys -from datetime import datetime, timezone -from pathlib import Path -from typing import TYPE_CHECKING, Optional, Callable, Awaitable - -import yaml -from argon2 import PasswordHasher -from argon2.exceptions import VerifyMismatchError - -# prompt_toolkit may not work in all terminals (e.g., Git Bash on Windows) -# We provide a fallback to simple input() -try: - from prompt_toolkit import PromptSession - from prompt_toolkit.history import FileHistory - from prompt_toolkit.patch_stdout import patch_stdout - PROMPT_TOOLKIT_AVAILABLE = True -except ImportError: - PROMPT_TOOLKIT_AVAILABLE = False - -if TYPE_CHECKING: - from xml_pipeline.message_bus.stream_pump import StreamPump - - -# ============================================================================ -# Constants -# ============================================================================ - -CONFIG_DIR = Path.home() / ".xml-pipeline" -KEY_FILE = CONFIG_DIR / "console.key" -HISTORY_FILE = CONFIG_DIR / "history" - -# Commands that require password re-entry -PROTECTED_COMMANDS = {"restart", "kill", "pause", "resume"} - -# Idle timeout before auto-detach (seconds, 0 = disabled) -DEFAULT_IDLE_TIMEOUT = 30 * 60 # 30 minutes - - -# ============================================================================ -# ANSI Colors -# ============================================================================ - -class Colors: - RESET = "\033[0m" - BOLD = "\033[1m" - DIM = "\033[2m" - RED = "\033[31m" - GREEN = "\033[32m" - YELLOW = "\033[33m" - BLUE = "\033[34m" - MAGENTA = "\033[35m" - CYAN = "\033[36m" - - -def cprint(text: str, color: str = Colors.RESET): - """Print with ANSI color.""" - try: - print(f"{color}{text}{Colors.RESET}") - except UnicodeEncodeError: - print(text) - - -# ============================================================================ -# Password Management -# ============================================================================ - -class PasswordManager: - """Manages password hashing and verification.""" - - def __init__(self, key_path: Path = KEY_FILE): - self.key_path = key_path - self.hasher = PasswordHasher() - self._hash: Optional[str] = None - - def ensure_config_dir(self): - """Create config directory if needed.""" - self.key_path.parent.mkdir(parents=True, exist_ok=True) - - def has_password(self) -> bool: - """Check if password has been set.""" - return self.key_path.exists() - - def load_hash(self) -> Optional[str]: - """Load password hash from file.""" - if not self.key_path.exists(): - return None - try: - with open(self.key_path) as f: - data = yaml.safe_load(f) - self._hash = data.get("hash") - return self._hash - except Exception: - return None - - def save_hash(self, password: str) -> None: - """Hash password and save to file.""" - self.ensure_config_dir() - - hash_value = self.hasher.hash(password) - data = { - "algorithm": "argon2id", - "hash": hash_value, - "created": datetime.now(timezone.utc).isoformat(), - } - - with open(self.key_path, "w") as f: - yaml.dump(data, f) - - # Set file permissions to 600 (owner read/write only) - if sys.platform != "win32": - os.chmod(self.key_path, stat.S_IRUSR | stat.S_IWUSR) - - self._hash = hash_value - - def verify(self, password: str) -> bool: - """Verify password against stored hash.""" - if self._hash is None: - self.load_hash() - if self._hash is None: - return False - try: - self.hasher.verify(self._hash, password) - return True - except VerifyMismatchError: - return False - - -# ============================================================================ -# Secure Console -# ============================================================================ - -class SecureConsole: - """ - Password-protected console with privileged command support. - - The console can be in one of two states: - - Attached: Full access, can send messages and run commands - - Detached: Limited access, only /commands work, @messages rejected - """ - - def __init__( - self, - pump: StreamPump, - idle_timeout: int = DEFAULT_IDLE_TIMEOUT, - ): - self.pump = pump - self.idle_timeout = idle_timeout - self.password_mgr = PasswordManager() - - # State - self.authenticated = False - self.attached = True # Start attached - self.running = False - - # prompt_toolkit session (may be None if fallback mode) - self.session: Optional[PromptSession] = None - self.use_simple_input = False # Fallback mode flag - - # ------------------------------------------------------------------ - # Startup - # ------------------------------------------------------------------ - - def _init_prompt_session(self) -> None: - """Initialize prompt session (with fallback).""" - if self.session is not None: - return # Already initialized - - self.password_mgr.ensure_config_dir() - if PROMPT_TOOLKIT_AVAILABLE: - try: - self.session = PromptSession( - history=FileHistory(str(HISTORY_FILE)) - ) - except Exception as e: - cprint(f"Note: Using simple input mode ({type(e).__name__})", Colors.DIM) - self.use_simple_input = True - else: - self.use_simple_input = True - - async def authenticate(self) -> bool: - """ - Authenticate user (call before starting pump). - - Returns True if authenticated, False otherwise. - """ - self._init_prompt_session() - - # Ensure password is set up - if not await self._ensure_password(): - return False - - # Authenticate - if not await self._authenticate(): - cprint("Authentication failed.", Colors.RED) - return False - - self.authenticated = True - return True - - async def run_command_loop(self) -> None: - """ - Run the command loop (call after authentication). - - This shows the banner and enters the main input loop. - """ - if not self.authenticated: - cprint("Not authenticated. Call authenticate() first.", Colors.RED) - return - - self.running = True - self._print_banner() - await self._main_loop() - - async def run(self) -> None: - """Main console loop (combines authenticate + run_command_loop).""" - if await self.authenticate(): - await self.run_command_loop() - - async def _ensure_password(self) -> bool: - """Ensure password is set up (first run setup).""" - if self.password_mgr.has_password(): - return True - - cprint("\n" + "=" * 50, Colors.CYAN) - cprint(" First-time setup: Create console password", Colors.CYAN) - cprint("=" * 50 + "\n", Colors.CYAN) - - cprint("This password protects privileged operations.", Colors.DIM) - cprint("It will be required at startup and for protected commands.\n", Colors.DIM) - - # Get password with confirmation - while True: - password = await self._prompt_password("New password: ") - if not password: - cprint("Password cannot be empty.", Colors.RED) - continue - - if len(password) < 4: - cprint("Password must be at least 4 characters.", Colors.RED) - continue - - confirm = await self._prompt_password("Confirm password: ") - if password != confirm: - cprint("Passwords do not match.", Colors.RED) - continue - - break - - self.password_mgr.save_hash(password) - cprint("\nPassword set successfully.\n", Colors.GREEN) - return True - - async def _authenticate(self) -> bool: - """Authenticate user at startup.""" - self.password_mgr.load_hash() - - for attempt in range(3): - password = await self._prompt_password("Password: ") - if self.password_mgr.verify(password): - self.authenticated = True - return True - cprint("Incorrect password.", Colors.RED) - - return False - - async def _prompt_password(self, prompt: str) -> str: - """Prompt for password (hidden input when possible).""" - if self.use_simple_input: - # Simple input mode: use visible input (getpass unreliable in some terminals) - cprint("(password will be visible)", Colors.DIM) - print(prompt, end="", flush=True) - loop = asyncio.get_event_loop() - try: - line = await loop.run_in_executor(None, sys.stdin.readline) - return line.strip() if line else "" - except (EOFError, KeyboardInterrupt): - return "" - else: - # Use prompt_toolkit for password input (hidden) - try: - session = PromptSession() - return await session.prompt_async(prompt, is_password=True) - except (EOFError, KeyboardInterrupt): - return "" - except Exception: - # Fallback if prompt_toolkit fails mid-session - self.use_simple_input = True - return await self._prompt_password(prompt) - - # ------------------------------------------------------------------ - # Main Loop - # ------------------------------------------------------------------ - - async def _main_loop(self) -> None: - """Main input loop.""" - while self.running: - try: - # Determine prompt based on attach state - prompt_str = "> " if self.attached else "# " - - # Read input - line = await self._read_input(prompt_str) - - await self._handle_input(line.strip()) - - except EOFError: - cprint("\nEOF received. Shutting down.", Colors.YELLOW) - break - except KeyboardInterrupt: - continue - - async def _read_input(self, prompt: str) -> str: - """Read a line of input (with fallback for non-TTY terminals).""" - if self.use_simple_input: - # Fallback: simple blocking input - loop = asyncio.get_event_loop() - print(prompt, end="", flush=True) - try: - line = await loop.run_in_executor(None, sys.stdin.readline) - if not line: - raise EOFError() - return line.strip() - except (EOFError, KeyboardInterrupt): - raise - else: - # Use prompt_toolkit with optional timeout - try: - with patch_stdout(): - if self.idle_timeout > 0: - try: - return await asyncio.wait_for( - self.session.prompt_async(prompt), - timeout=self.idle_timeout - ) - except asyncio.TimeoutError: - cprint("\nIdle timeout. Detaching console.", Colors.YELLOW) - self.attached = False - return "" - else: - return await self.session.prompt_async(prompt) - except Exception: - # Fall back to simple input if prompt_toolkit fails - self.use_simple_input = True - return await self._read_input(prompt) - - async def _handle_input(self, line: str) -> None: - """Route input to appropriate handler.""" - if not line: - return - - if line.startswith("/"): - await self._handle_command(line) - elif line.startswith("@"): - await self._handle_message(line) - else: - cprint("Use @listener message or /command", Colors.DIM) - - # ------------------------------------------------------------------ - # Command Handling - # ------------------------------------------------------------------ - - async def _handle_command(self, line: str) -> None: - """Handle /command.""" - parts = line[1:].split(None, 1) - cmd = parts[0].lower() if parts else "" - args = parts[1] if len(parts) > 1 else "" - - # Check if protected command - if cmd in PROTECTED_COMMANDS: - if not await self._verify_password(): - cprint("Password required for this command.", Colors.RED) - return - - # Dispatch to handler - handler = getattr(self, f"_cmd_{cmd}", None) - if handler: - await handler(args) - else: - cprint(f"Unknown command: /{cmd}", Colors.RED) - cprint("Type /help for available commands.", Colors.DIM) - - async def _verify_password(self) -> bool: - """Verify password for protected commands.""" - password = await self._prompt_password("Password: ") - return self.password_mgr.verify(password) - - # ------------------------------------------------------------------ - # Message Handling - # ------------------------------------------------------------------ - - async def _handle_message(self, line: str) -> None: - """Handle @listener message.""" - if not self.attached: - cprint("Console detached. Use /attach first.", Colors.RED) - return - - parts = line[1:].split(None, 1) - if not parts: - cprint("Usage: @listener message", Colors.DIM) - return - - target = parts[0].lower() - message = parts[1] if len(parts) > 1 else "" - - # Check if listener exists - if target not in self.pump.listeners: - cprint(f"Unknown listener: {target}", Colors.RED) - cprint("Use /listeners to see available listeners.", Colors.DIM) - return - - cprint(f"[sending to {target}]", Colors.DIM) - - # Create payload based on target listener - listener = self.pump.listeners[target] - payload = self._create_payload(listener, message) - if payload is None: - cprint(f"Cannot create payload for {target}", Colors.RED) - return - - # Create thread and inject message - import uuid - thread_id = str(uuid.uuid4()) - - envelope = self.pump._wrap_in_envelope( - payload=payload, - from_id="console", - to_id=target, - thread_id=thread_id, - ) - - await self.pump.inject(envelope, thread_id=thread_id, from_id="console") - - def _create_payload(self, listener, message: str): - """Create payload instance for a listener from message text.""" - payload_class = listener.payload_class - - # Try to create payload with common field patterns - # Most payloads have a single text field like 'name', 'message', 'text', etc. - if hasattr(payload_class, '__dataclass_fields__'): - fields = payload_class.__dataclass_fields__ - field_names = list(fields.keys()) - - if len(field_names) == 1: - # Single field - use the message as its value - return payload_class(**{field_names[0]: message}) - elif 'name' in field_names: - return payload_class(name=message) - elif 'message' in field_names: - return payload_class(message=message) - elif 'text' in field_names: - return payload_class(text=message) - - # Fallback: try with no args - try: - return payload_class() - except Exception: - return None - - # ------------------------------------------------------------------ - # Commands: Informational - # ------------------------------------------------------------------ - - async def _cmd_help(self, args: str) -> None: - """Show available commands.""" - cprint("\nCommands:", Colors.CYAN) - cprint(" /help Show this help", Colors.DIM) - cprint(" /status Show organism status", Colors.DIM) - cprint(" /listeners List registered listeners", Colors.DIM) - cprint(" /threads List active threads", Colors.DIM) - cprint(" /buffer Inspect thread's context buffer", Colors.DIM) - cprint(" /monitor Show recent messages from thread", Colors.DIM) - cprint(" /monitor * Show recent messages from all threads", Colors.DIM) - cprint("") - cprint("Configuration:", Colors.CYAN) - cprint(" /config Show current config", Colors.DIM) - cprint(" /config -e Edit organism.yaml", Colors.DIM) - cprint(" /config @name Edit listener config", Colors.DIM) - cprint(" /config --list List listener configs", Colors.DIM) - cprint("") - cprint("Protected (require password):", Colors.YELLOW) - cprint(" /restart Restart the pipeline", Colors.DIM) - cprint(" /kill Terminate a thread", Colors.DIM) - cprint(" /pause Pause message processing", Colors.DIM) - cprint(" /resume Resume message processing", Colors.DIM) - cprint("") - cprint("Session:", Colors.CYAN) - cprint(" /attach Attach console (enable @messages)", Colors.DIM) - cprint(" /detach Detach console (organism keeps running)", Colors.DIM) - cprint(" /passwd Change console password", Colors.DIM) - cprint(" /quit Graceful shutdown", Colors.DIM) - cprint("") - - async def _cmd_status(self, args: str) -> None: - """Show organism status.""" - from xml_pipeline.memory import get_context_buffer - from xml_pipeline.message_bus.thread_registry import get_registry - - buffer = get_context_buffer() - registry = get_registry() - stats = buffer.get_stats() - - cprint(f"\nOrganism: {self.pump.config.name}", Colors.CYAN) - cprint(f"Status: {'attached' if self.attached else 'detached'}", - Colors.GREEN if self.attached else Colors.YELLOW) - cprint(f"Listeners: {len(self.pump.listeners)}", Colors.DIM) - cprint(f"Threads: {stats['thread_count']} active", Colors.DIM) - cprint(f"Buffer: {stats['total_slots']} slots across threads", Colors.DIM) - cprint("") - - async def _cmd_listeners(self, args: str) -> None: - """List registered listeners.""" - cprint("\nRegistered listeners:", Colors.CYAN) - for name, listener in self.pump.listeners.items(): - agent_tag = "[agent] " if listener.is_agent else "" - cprint(f" {name:20} {agent_tag}{listener.description}", Colors.DIM) - cprint("") - - async def _cmd_threads(self, args: str) -> None: - """List active threads.""" - from xml_pipeline.memory import get_context_buffer - - buffer = get_context_buffer() - stats = buffer.get_stats() - - if stats["thread_count"] == 0: - cprint("\nNo active threads.", Colors.DIM) - return - - cprint(f"\nActive threads ({stats['thread_count']}):", Colors.CYAN) - - # Access internal threads dict (not ideal but works for now) - for thread_id, ctx in buffer._threads.items(): - slot_count = len(ctx) - age = datetime.now(timezone.utc) - ctx._created_at - age_str = str(age).split(".")[0] # Remove microseconds - - # Get last sender/receiver - if slot_count > 0: - last = ctx[-1] - flow = f"{last.from_id} -> {last.to_id}" - else: - flow = "(empty)" - - cprint(f" {thread_id[:12]}... slots={slot_count:3} age={age_str} {flow}", Colors.DIM) - cprint("") - - async def _cmd_buffer(self, args: str) -> None: - """Inspect a thread's context buffer.""" - if not args: - cprint("Usage: /buffer ", Colors.DIM) - return - - from xml_pipeline.memory import get_context_buffer - buffer = get_context_buffer() - - # Find thread by prefix - thread_id = None - for tid in buffer._threads.keys(): - if tid.startswith(args): - thread_id = tid - break - - if not thread_id: - cprint(f"Thread not found: {args}", Colors.RED) - return - - ctx = buffer.get_thread(thread_id) - cprint(f"\nThread: {thread_id}", Colors.CYAN) - cprint(f"Slots: {len(ctx)}", Colors.DIM) - cprint("-" * 60, Colors.DIM) - - for slot in ctx: - payload_type = type(slot.payload).__name__ - cprint(f"[{slot.index}] {slot.from_id} -> {slot.to_id}: {payload_type}", Colors.DIM) - # Show first 100 chars of payload repr - payload_repr = repr(slot.payload)[:100] - cprint(f" {payload_repr}", Colors.DIM) - cprint("") - - async def _cmd_monitor(self, args: str) -> None: - """Show recent messages from a thread's context buffer.""" - if not args: - cprint("Usage: /monitor ", Colors.DIM) - cprint(" /monitor * (show all threads)", Colors.DIM) - return - - from xml_pipeline.memory import get_context_buffer - buffer = get_context_buffer() - - # Find thread by prefix (or * for all) - monitor_all = args.strip() == "*" - thread_id = None - - if not monitor_all: - for tid in buffer._threads.keys(): - if tid.startswith(args): - thread_id = tid - break - - if not thread_id: - cprint(f"Thread not found: {args}", Colors.RED) - return - - # Show header - if monitor_all: - cprint("\nAll threads:", Colors.CYAN) - else: - cprint(f"\nThread {thread_id[:12]}...:", Colors.CYAN) - cprint("-" * 60, Colors.DIM) - - # Show messages - if monitor_all: - for tid, ctx in buffer._threads.items(): - if len(ctx) > 0: - cprint(f"\n[{tid[:12]}...] ({len(ctx)} messages)", Colors.YELLOW) - # Show last 5 messages per thread - for slot in ctx.get_slice(-5): - self._print_monitor_slot(tid, slot) - else: - ctx = buffer.get_thread(thread_id) - # Show all messages (up to 20) - slots = ctx.get_slice(-20) - for slot in slots: - self._print_monitor_slot(thread_id, slot) - if len(ctx) > 20: - cprint(f" ... ({len(ctx) - 20} earlier messages)", Colors.DIM) - - cprint("") - - def _print_monitor_slot(self, thread_id: str, slot) -> None: - """Print a single slot in monitor format.""" - payload_type = type(slot.payload).__name__ - tid_short = thread_id[:8] - timestamp = slot.metadata.timestamp.split("T")[1][:8] if "T" in slot.metadata.timestamp else "" - - # Color based on direction - if slot.from_id == "console": - color = Colors.GREEN - elif "response" in slot.to_id.lower() or "console" in slot.to_id.lower(): - color = Colors.CYAN - else: - color = Colors.DIM - - # Format: [time] thread: from -> to: Type - cprint(f"[{timestamp}] {tid_short}: {slot.from_id} -> {slot.to_id}: {payload_type}", color) - - # Show payload content (abbreviated) - payload_str = repr(slot.payload) - if len(payload_str) > 80: - payload_str = payload_str[:77] + "..." - cprint(f" {payload_str}", Colors.DIM) - - async def _cmd_config(self, args: str) -> None: - """ - Edit configuration files. - - /config - Edit organism.yaml - /config @name - Edit listener config (e.g., /config @greeter) - /config --list - List available listener configs - /config --show - Show current config (read-only) - """ - args = args.strip() if args else "" - - if args == "--list": - await self._config_list() - elif args == "--show" or args == "": - await self._config_show() - elif args.startswith("@"): - listener_name = args[1:].strip() - if listener_name: - await self._config_edit_listener(listener_name) - else: - cprint("Usage: /config @listener_name", Colors.RED) - elif args == "--edit" or args == "-e": - await self._config_edit_organism() - else: - cprint(f"Unknown option: {args}", Colors.RED) - cprint("Usage:", Colors.DIM) - cprint(" /config Show current config", Colors.DIM) - cprint(" /config -e Edit organism.yaml", Colors.DIM) - cprint(" /config @name Edit listener config", Colors.DIM) - cprint(" /config --list List listener configs", Colors.DIM) - - async def _config_show(self) -> None: - """Show current configuration (read-only).""" - cprint(f"\nOrganism: {self.pump.config.name}", Colors.CYAN) - cprint(f"Port: {self.pump.config.port}", Colors.DIM) - cprint(f"Thread scheduling: {self.pump.config.thread_scheduling}", Colors.DIM) - cprint(f"Max concurrent pipelines: {self.pump.config.max_concurrent_pipelines}", Colors.DIM) - cprint(f"Max concurrent handlers: {self.pump.config.max_concurrent_handlers}", Colors.DIM) - cprint(f"Max concurrent per agent: {self.pump.config.max_concurrent_per_agent}", Colors.DIM) - cprint("\nUse /config -e to edit organism.yaml", Colors.DIM) - cprint("Use /config @listener to edit a listener config", Colors.DIM) - cprint("") - - async def _config_list(self) -> None: - """List available listener configs.""" - from xml_pipeline.config import get_listener_config_store - - store = get_listener_config_store() - listeners = store.list_listeners() - - cprint("\nListener configurations:", Colors.CYAN) - cprint(f"Directory: {store.listeners_dir}", Colors.DIM) - cprint("") - - if not listeners: - cprint(" No listener configs found.", Colors.DIM) - cprint(" Use /config @name to create one.", Colors.DIM) - else: - for name in sorted(listeners): - config = store.get(name) - agent_tag = "[agent]" if config.agent else "[tool]" if config.tool else "" - cprint(f" @{name:20} {agent_tag} {config.description or ''}", Colors.DIM) - - # Also show registered listeners without config files - unconfigured = [ - name for name in self.pump.listeners.keys() - if name not in listeners - ] - if unconfigured: - cprint("\nRegistered listeners without config files:", Colors.YELLOW) - for name in sorted(unconfigured): - listener = self.pump.listeners[name] - agent_tag = "[agent]" if listener.is_agent else "" - cprint(f" @{name:20} {agent_tag} {listener.description}", Colors.DIM) - - cprint("") - - async def _config_edit_organism(self) -> None: - """Edit organism.yaml in the full-screen editor.""" - from xml_pipeline.console.editor import edit_text_async - from xml_pipeline.config.schema import ensure_schemas - from xml_pipeline.config.split_loader import ( - get_organism_yaml_path, - load_organism_yaml_content, - save_organism_yaml_content, - ) - - # Ensure schemas are written for LSP - try: - ensure_schemas() - except Exception as e: - cprint(f"Warning: Could not write schemas: {e}", Colors.YELLOW) - - # Find organism.yaml - config_path = get_organism_yaml_path() - if config_path is None: - cprint("No organism.yaml found.", Colors.RED) - cprint("Searched in:", Colors.DIM) - cprint(" ~/.xml-pipeline/organism.yaml", Colors.DIM) - cprint(" ./organism.yaml", Colors.DIM) - cprint(" ./config/organism.yaml", Colors.DIM) - return - - # Load content - try: - content = load_organism_yaml_content(config_path) - except Exception as e: - cprint(f"Failed to load config: {e}", Colors.RED) - return - - # Edit - cprint(f"Editing: {config_path}", Colors.CYAN) - cprint("Press Ctrl+S to save, Ctrl+Q to cancel", Colors.DIM) - cprint("") - - edited_text, saved = await edit_text_async( - content, - title=f"organism.yaml ({config_path.name})", - schema_type="organism", - ) - - if saved and edited_text is not None: - try: - save_organism_yaml_content(config_path, edited_text) - cprint("Configuration saved.", Colors.GREEN) - cprint("Note: Restart required for changes to take effect.", Colors.YELLOW) - except yaml.YAMLError as e: - cprint(f"Invalid YAML: {e}", Colors.RED) - except Exception as e: - cprint(f"Failed to save: {e}", Colors.RED) - else: - cprint("Edit cancelled.", Colors.DIM) - - async def _config_edit_listener(self, name: str) -> None: - """Edit a listener config in the full-screen editor.""" - from xml_pipeline.config import get_listener_config_store - from xml_pipeline.console.editor import edit_text_async - from xml_pipeline.config.schema import ensure_schemas - - # Ensure schemas are written for LSP - try: - ensure_schemas() - except Exception as e: - cprint(f"Warning: Could not write schemas: {e}", Colors.YELLOW) - - store = get_listener_config_store() - - # Load or create content - if store.exists(name): - content = store.load_yaml(name) - cprint(f"Editing: {store.path_for(name)}", Colors.CYAN) - else: - # Check if it's a registered listener - if name in self.pump.listeners: - cprint(f"Creating new config for registered listener: {name}", Colors.CYAN) - else: - cprint(f"Creating new config for: {name}", Colors.CYAN) - content = store._default_template(name) - - cprint("Press Ctrl+S to save, Ctrl+Q to cancel", Colors.DIM) - cprint("") - - # Edit - edited_text, saved = await edit_text_async( - content, - title=f"{name}.yaml", - schema_type="listener", - ) - - if saved and edited_text is not None: - try: - path = store.save_yaml(name, edited_text) - cprint(f"Saved: {path}", Colors.GREEN) - cprint("Note: Restart required for changes to take effect.", Colors.YELLOW) - except yaml.YAMLError as e: - cprint(f"Invalid YAML: {e}", Colors.RED) - except Exception as e: - cprint(f"Failed to save: {e}", Colors.RED) - else: - cprint("Edit cancelled.", Colors.DIM) - - # ------------------------------------------------------------------ - # Commands: Protected - # ------------------------------------------------------------------ - - async def _cmd_restart(self, args: str) -> None: - """Restart the pipeline.""" - cprint("Restarting pipeline...", Colors.YELLOW) - await self.pump.shutdown() - - # Re-bootstrap - from xml_pipeline.message_bus.stream_pump import bootstrap - self.pump = await bootstrap() - - # Start pump in background - asyncio.create_task(self.pump.run()) - cprint("Pipeline restarted.", Colors.GREEN) - - async def _cmd_kill(self, args: str) -> None: - """Terminate a thread.""" - if not args: - cprint("Usage: /kill ", Colors.DIM) - return - - from xml_pipeline.memory import get_context_buffer - buffer = get_context_buffer() - - # Find thread by prefix - thread_id = None - for tid in buffer._threads.keys(): - if tid.startswith(args): - thread_id = tid - break - - if not thread_id: - cprint(f"Thread not found: {args}", Colors.RED) - return - - buffer.delete_thread(thread_id) - cprint(f"Thread {thread_id[:12]}... terminated.", Colors.YELLOW) - - async def _cmd_pause(self, args: str) -> None: - """Pause message processing.""" - cprint("Pause not yet implemented.", Colors.YELLOW) - # TODO: Implement pump pause - - async def _cmd_resume(self, args: str) -> None: - """Resume message processing.""" - cprint("Resume not yet implemented.", Colors.YELLOW) - # TODO: Implement pump resume - - # ------------------------------------------------------------------ - # Commands: Session - # ------------------------------------------------------------------ - - async def _cmd_attach(self, args: str) -> None: - """Attach console.""" - if self.attached: - cprint("Already attached.", Colors.DIM) - return - - if not await self._verify_password(): - cprint("Password required to attach.", Colors.RED) - return - - self.attached = True - cprint("Console attached.", Colors.GREEN) - - async def _cmd_detach(self, args: str) -> None: - """Detach console.""" - if not self.attached: - cprint("Already detached.", Colors.DIM) - return - - self.attached = False - cprint("Console detached. Organism continues running.", Colors.YELLOW) - cprint("Use /attach to re-attach.", Colors.DIM) - - async def _cmd_passwd(self, args: str) -> None: - """Change console password.""" - # Verify current password - current = await self._prompt_password("Current password: ") - if not self.password_mgr.verify(current): - cprint("Incorrect password.", Colors.RED) - return - - # Get new password - while True: - new_pass = await self._prompt_password("New password: ") - if not new_pass or len(new_pass) < 4: - cprint("Password must be at least 4 characters.", Colors.RED) - continue - - confirm = await self._prompt_password("Confirm new password: ") - if new_pass != confirm: - cprint("Passwords do not match.", Colors.RED) - continue - - break - - self.password_mgr.save_hash(new_pass) - cprint("Password changed successfully.", Colors.GREEN) - - async def _cmd_quit(self, args: str) -> None: - """Graceful shutdown.""" - cprint("Shutting down...", Colors.YELLOW) - self.running = False - await self.pump.shutdown() - - # ------------------------------------------------------------------ - # UI Helpers - # ------------------------------------------------------------------ - - def _print_banner(self) -> None: - """Print startup banner.""" - print() - cprint("+" + "=" * 44 + "+", Colors.CYAN) - cprint("|" + " " * 8 + "xml-pipeline console v3.0" + " " * 9 + "|", Colors.CYAN) - cprint("+" + "=" * 44 + "+", Colors.CYAN) - print() - cprint(f"Organism '{self.pump.config.name}' ready.", Colors.GREEN) - cprint(f"{len(self.pump.listeners)} listeners registered.", Colors.DIM) - cprint("Type /help for commands.", Colors.DIM) - print() diff --git a/xml_pipeline/console/tui_console.py b/xml_pipeline/console/tui_console.py deleted file mode 100644 index 9bdaf84..0000000 --- a/xml_pipeline/console/tui_console.py +++ /dev/null @@ -1,469 +0,0 @@ -""" -tui_console.py — Split-screen TUI console using prompt_toolkit. - -Features: -- Fixed Command History (Up/Down arrows) -- Robust Scrolling with snap-to-bottom and blank line spacer -- Fully implemented /monitor, /status, /listeners commands -""" - -from __future__ import annotations - -import asyncio -import os -from datetime import datetime -from pathlib import Path -from typing import TYPE_CHECKING, List, Optional - -try: - from prompt_toolkit import Application - from prompt_toolkit.buffer import Buffer - from prompt_toolkit.document import Document - from prompt_toolkit.formatted_text import FormattedText, HTML - from prompt_toolkit.key_binding import KeyBindings - from prompt_toolkit.layout import ( - Layout, - HSplit, - Window, - FormattedTextControl, - BufferControl, - ) - from prompt_toolkit.layout.dimension import Dimension - from prompt_toolkit.layout.margins import ScrollbarMargin - from prompt_toolkit.styles import Style - from prompt_toolkit.history import FileHistory - from prompt_toolkit.patch_stdout import patch_stdout - from prompt_toolkit.output.win32 import NoConsoleScreenBufferError - PROMPT_TOOLKIT_AVAILABLE = True -except ImportError: - PROMPT_TOOLKIT_AVAILABLE = False - NoConsoleScreenBufferError = Exception - -if TYPE_CHECKING: - from xml_pipeline.message_bus.stream_pump import StreamPump - - -# ============================================================================ -# Constants -# ============================================================================ - -CONFIG_DIR = Path.home() / ".xml-pipeline" -HISTORY_FILE = CONFIG_DIR / "history" - -STYLE = Style.from_dict({ - "output": "#ffffff", - "output.system": "#888888 italic", - "output.greeter": "#00ff00", - "output.shouter": "#ffff00", - "output.response": "#00ffff", - "output.error": "#ff0000", - "output.dim": "#666666", - "separator": "#444444", - "separator.text": "#888888", - "input": "#ffffff", - "prompt": "#00ff00 bold", -}) - - -# ============================================================================ -# Output Buffer -# ============================================================================ - -class OutputBuffer: - """Manages scrolling output history using a text Buffer.""" - - def __init__(self, max_lines: int = 1000): - self.max_lines = max_lines - self._lines: List[str] = [] - self.buffer = Buffer(read_only=True, name="output") - self._user_scrolled = False # Track if user manually scrolled - - def append(self, text: str, style: str = "output"): - timestamp = datetime.now().strftime("%H:%M:%S") - self._lines.append(f"[{timestamp}] {text}") - self._update_buffer() - - def append_raw(self, text: str, style: str = "output"): - self._lines.append(text) - self._update_buffer() - - def _update_buffer(self): - """Update buffer content. Auto-scroll only if user hasn't scrolled up.""" - if len(self._lines) > self.max_lines: - self._lines = self._lines[-self.max_lines:] - - text = "\n".join(self._lines) - - # If user scrolled up, preserve their position; otherwise snap to bottom - if self._user_scrolled: - old_pos = self.buffer.cursor_position - self.buffer.set_document( - Document(text=text, cursor_position=min(old_pos, len(text))), - bypass_readonly=True - ) - else: - # Auto-scroll to bottom for new content - self.buffer.set_document( - Document(text=text, cursor_position=len(text)), - bypass_readonly=True - ) - - def is_at_bottom(self) -> bool: - """Check if we should show the spacer (user hasn't scrolled away).""" - return not self._user_scrolled - - def scroll_to_bottom(self): - """Force cursor to the end and mark as 'at bottom'.""" - self.buffer.cursor_position = len(self.buffer.text) - self._user_scrolled = False # Reset flag when explicitly scrolling to bottom - - def mark_scrolled(self): - """Called when user manually scrolls up.""" - self._user_scrolled = True - - def mark_unscrolled(self): - """Called when user scrolls to bottom.""" - self._user_scrolled = False - - def clear(self): - self._lines.clear() - self.buffer.set_document(Document(text=""), bypass_readonly=True) - self._user_scrolled = False - - -# ============================================================================ -# TUI Console -# ============================================================================ - -class TUIConsole: - def __init__(self, pump: StreamPump): - self.pump = pump - self.output = OutputBuffer() - self.running = False - self.attached = True - self.use_simple_mode = False - - CONFIG_DIR.mkdir(parents=True, exist_ok=True) - - try: - if not PROMPT_TOOLKIT_AVAILABLE: - raise ImportError("prompt_toolkit not available") - - # Command history setup - if HISTORY_FILE.exists() and not os.access(HISTORY_FILE, os.W_OK): - os.chmod(HISTORY_FILE, 0o666) - - self.input_buffer = Buffer( - history=FileHistory(str(HISTORY_FILE)), - multiline=False, - accept_handler=self._accept_handler - ) - - self._build_ui() - except (NoConsoleScreenBufferError, ImportError, Exception) as e: - self.use_simple_mode = True - self.app = None - print(f"\033[2mNote: Using simple mode ({type(e).__name__})\033[0m") - - def _accept_handler(self, buffer: Buffer) -> bool: - text = buffer.text.strip() - if text: - asyncio.create_task(self._process_input(text)) - return False - - def _build_ui(self): - kb = KeyBindings() - - @kb.add("c-c") - @kb.add("c-d") - def _(event): - self.running = False - event.app.exit() - - @kb.add("c-l") - def _(event): - self.output.clear() - - @kb.add("up") - def _(event): - self.input_buffer.history_backward() - - @kb.add("down") - def _(event): - self.input_buffer.history_forward() - - @kb.add("pageup") - def _(event): - buf = self.output.buffer - doc = buf.document - new_row = max(0, doc.cursor_position_row - 20) - buf.cursor_position = doc.translate_row_col_to_index(new_row, 0) - self._invalidate() - - @kb.add("pagedown") - def _(event): - buf = self.output.buffer - doc = buf.document - lines = doc.line_count - new_row = doc.cursor_position_row + 20 - - if new_row >= lines - 1: - self.output.scroll_to_bottom() - else: - buf.cursor_position = doc.translate_row_col_to_index(new_row, 0) - self._invalidate() - - @kb.add("c-home") - def _(event): - self.output.buffer.cursor_position = 0 - self._invalidate() - - @kb.add("c-end") - def _(event): - self.output.scroll_to_bottom() - self._invalidate() - - output_control = BufferControl( - buffer=self.output.buffer, - focusable=False, - include_default_input_processors=False, - ) - - self.output_window = Window( - content=output_control, - wrap_lines=True, - right_margins=[ScrollbarMargin(display_arrows=True)], - ) - - def get_spacer_height(): - return 1 if self.output.is_at_bottom() else 0 - - spacer = Window(height=lambda: Dimension.exact(get_spacer_height())) - - def get_separator(): - name = self.pump.config.name - width = 60 - padding = "─" * ((width - len(name) - 4) // 2) - return FormattedText([ - ("class:separator", padding), - ("class:separator.text", f" {name} "), - ("class:separator", padding), - ]) - - separator = Window( - content=FormattedTextControl(text=get_separator), - height=1, - ) - - input_window = Window( - content=BufferControl(buffer=self.input_buffer), - height=1, - ) - - from prompt_toolkit.layout import VSplit - input_row = VSplit([ - Window( - content=FormattedTextControl(text=lambda: FormattedText([("class:prompt", "> ")])), - width=2, - ), - input_window, - ]) - - root = HSplit([ - self.output_window, - spacer, - separator, - input_row, - ]) - - self.layout = Layout(root, focused_element=input_window) - - self.app = Application( - layout=self.layout, - key_bindings=kb, - style=STYLE, - full_screen=True, - mouse_support=True, - ) - - def print(self, text: str, style: str = "output"): - if self.use_simple_mode: - self._print_simple(text, style) - else: - self.output.append(text, style) - self._invalidate() - - def print_raw(self, text: str, style: str = "output"): - if self.use_simple_mode: - self._print_simple(text, style) - else: - self.output.append_raw(text, style) - self._invalidate() - - def print_system(self, text: str): - self.print(text, "output.system") - - def print_error(self, text: str): - self.print(text, "output.error") - - def _invalidate(self): - if self.app: - try: - self.app.invalidate() - except Exception: - pass - - def _print_simple(self, text: str, style: str = "output"): - colors = { - "output.system": "\033[2m", - "output.error": "\033[31m", - "output.dim": "\033[2m", - "output.greeter": "\033[32m", - "output.shouter": "\033[33m", - "output.response": "\033[36m", - } - color = colors.get(style, "") - print(f"{color}{text}\033[0m") - - async def run(self): - self.running = True - if self.use_simple_mode: - await self._run_simple() - return - - self.print_raw(f"xml-pipeline console v3.0", "output.system") - self.print_raw(f"Organism: {self.pump.config.name}", "output.system") - self.print_raw(f"Type /help for commands, @listener message to chat", "output.dim") - self.print_raw("", "output") - - try: - async def refresh_loop(): - while self.running: - await asyncio.sleep(0.1) - if self.app and self.app.is_running: - self.app.invalidate() - - refresh_task = asyncio.create_task(refresh_loop()) - try: - await self.app.run_async() - finally: - refresh_task.cancel() - except Exception as e: - print(f"Console error: {e}") - finally: - self.running = False - - async def _run_simple(self): - print(f"\033[36mxml-pipeline console v3.0 (simple mode)\033[0m") - while self.running: - try: - line = await asyncio.get_event_loop().run_in_executor(None, lambda: input("> ")) - if line: await self._process_input(line.strip()) - except (EOFError, KeyboardInterrupt): break - self.running = False - - async def _process_input(self, line: str): - if not self.use_simple_mode: - self.print_raw(f"> {line}", "output.dim") - if line.startswith("/"): - await self._handle_command(line) - elif line.startswith("@"): - await self._handle_message(line) - else: - self.print("Use @listener message or /command", "output.dim") - - async def _handle_command(self, line: str): - parts = line[1:].split(None, 1) - cmd = parts[0].lower() if parts else "" - args = parts[1] if len(parts) > 1 else "" - - handler = getattr(self, f"_cmd_{cmd}", None) - if handler: - await handler(args) - else: - self.print_error(f"Unknown command: /{cmd}") - - async def _cmd_help(self, args: str): - self.print_raw("Commands:", "output.system") - self.print_raw(" /status, /listeners, /threads, /monitor, /clear, /quit", "output.dim") - - async def _cmd_status(self, args: str): - from xml_pipeline.memory import get_context_buffer - buffer = get_context_buffer() - stats = buffer.get_stats() - self.print_raw(f"Organism: {self.pump.config.name}", "output.system") - self.print_raw(f"Threads: {stats['thread_count']} active, {stats['total_slots']} slots total", "output.dim") - - async def _cmd_listeners(self, args: str): - self.print_raw("Listeners:", "output.system") - for name, l in self.pump.listeners.items(): - tag = "[agent]" if l.is_agent else "[handler]" - self.print_raw(f" {name:15} {tag} {l.description}", "output.dim") - - async def _cmd_threads(self, args: str): - from xml_pipeline.memory import get_context_buffer - buffer = get_context_buffer() - for tid, ctx in buffer._threads.items(): - self.print_raw(f" {tid[:8]}... slots: {len(ctx)}", "output.dim") - - async def _cmd_monitor(self, args: str): - from xml_pipeline.memory import get_context_buffer - buffer = get_context_buffer() - if args == "*": - for tid, ctx in buffer._threads.items(): - self.print_raw(f"--- Thread {tid[:8]} ---", "output.system") - for slot in list(ctx)[-3:]: - self.print_raw(f" {slot.from_id} -> {slot.to_id}: {type(slot.payload).__name__}", "output.dim") - elif args: - matches = [t for t in buffer._threads if t.startswith(args)] - if not matches: - self.print_error(f"No thread matching {args}") - return - ctx = buffer.get_thread(matches[0]) - for slot in ctx: - self.print_raw(f" [{slot.from_id} -> {slot.to_id}] {type(slot.payload).__name__}", "output.dim") - else: - self.print("Usage: /monitor or /monitor *", "output.dim") - - async def _cmd_clear(self, args: str): - self.output.clear() - - async def _cmd_quit(self, args: str): - self.running = False - if self.app: self.app.exit() - - async def _handle_message(self, line: str): - parts = line[1:].split(None, 1) - if not parts: return - target, message = parts[0].lower(), (parts[1] if len(parts) > 1 else "") - if target not in self.pump.listeners: - self.print_error(f"Unknown listener: {target}") - return - - listener = self.pump.listeners[target] - payload = self._create_payload(listener, message) - if payload is None: - self.print_error(f"Cannot create payload for {target}") - return - - import uuid - thread_id = str(uuid.uuid4()) - envelope = self.pump._wrap_in_envelope(payload, "console", target, thread_id) - await self.pump.inject(envelope, thread_id, "console") - - def _create_payload(self, listener, message: str): - payload_class = listener.payload_class - if hasattr(payload_class, '__dataclass_fields__'): - fields = list(payload_class.__dataclass_fields__.keys()) - if len(fields) == 1: return payload_class(**{fields[0]: message}) - if 'message' in fields: return payload_class(message=message) - if 'text' in fields: return payload_class(text=message) - return None - - def on_response(self, from_id: str, payload): - style = "output.response" if from_id == "response-handler" else "output" - text = f"[{from_id}] {getattr(payload, 'message', payload)}" - self.print_raw(text, style) - -def create_tui_console(pump: StreamPump) -> TUIConsole: - return TUIConsole(pump) \ No newline at end of file diff --git a/xml_pipeline/main.py b/xml_pipeline/main.py deleted file mode 100644 index e69de29..0000000 diff --git a/xml_pipeline/server/__init__.py b/xml_pipeline/server/__init__.py deleted file mode 100644 index 55ef269..0000000 --- a/xml_pipeline/server/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -""" -HTTP/WebSocket server for xml-pipeline. - -Provides: -- REST API for auth and management -- WebSocket for console and GUI clients -""" - -from .app import create_app, run_server - -__all__ = ["create_app", "run_server"] diff --git a/xml_pipeline/server/app.py b/xml_pipeline/server/app.py deleted file mode 100644 index cd3a83c..0000000 --- a/xml_pipeline/server/app.py +++ /dev/null @@ -1,266 +0,0 @@ -""" -aiohttp-based HTTP/WebSocket server. - -Provides: -- REST API for authentication -- WebSocket for console/GUI message sending -- Integration with SystemPipeline for message injection -""" - -from __future__ import annotations - -import asyncio -import json -import logging -from typing import TYPE_CHECKING, Optional, Callable - -try: - from aiohttp import web, WSMsgType - AIOHTTP_AVAILABLE = True -except ImportError: - AIOHTTP_AVAILABLE = False - web = None - WSMsgType = None - -from ..auth.users import get_user_store, UserStore -from ..auth.sessions import get_session_manager, SessionManager, Session - -if TYPE_CHECKING: - from ..message_bus.stream_pump import StreamPump - from ..message_bus.system_pipeline import SystemPipeline - -logger = logging.getLogger(__name__) - - -def auth_middleware(): - @web.middleware - async def middleware(request, handler): - if request.path in ("/auth/login", "/health"): - return await handler(request) - - auth_header = request.headers.get("Authorization", "") - if not auth_header.startswith("Bearer "): - return web.json_response({"error": "Missing Authorization"}, status=401) - - token = auth_header[7:] - session = request.app["session_manager"].validate(token) - - if not session: - return web.json_response({"error": "Invalid token"}, status=401) - - request["session"] = session - return await handler(request) - - return middleware - - -async def handle_login(request): - try: - data = await request.json() - except: - return web.json_response({"error": "Invalid JSON"}, status=400) - - username = data.get("username", "") - password = data.get("password", "") - - if not username or not password: - return web.json_response({"error": "Credentials required"}, status=400) - - user = request.app["user_store"].authenticate(username, password) - if not user: - return web.json_response({"error": "Invalid credentials"}, status=401) - - session = request.app["session_manager"].create(user.username, user.role) - return web.json_response(session.to_dict()) - - -async def handle_logout(request): - session = request["session"] - request.app["session_manager"].revoke(session.token) - return web.json_response({"message": "Logged out"}) - - -async def handle_me(request): - session = request["session"] - return web.json_response({ - "username": session.username, - "role": session.role, - "expires_at": session.expires_at.isoformat(), - }) - - -async def handle_health(request): - return web.json_response({"status": "ok"}) - - -async def handle_websocket(request): - session = request["session"] - pump = request.app.get("pump") - system_pipeline = request.app.get("system_pipeline") - - ws = web.WebSocketResponse() - await ws.prepare(request) - - # Track this WebSocket for response delivery - ws_id = id(ws) - request.app["websockets"][ws_id] = { - "ws": ws, - "user": session.username, - "threads": set(), # Thread IDs this client is subscribed to - } - - await ws.send_json({"type": "connected", "username": session.username}) - - try: - async for msg in ws: - if msg.type == WSMsgType.TEXT: - try: - data = json.loads(msg.data) - resp = await handle_ws_msg( - data, session, pump, system_pipeline, - request.app["websockets"][ws_id] - ) - await ws.send_json(resp) - except Exception as e: - logger.exception(f"WebSocket error: {e}") - await ws.send_json({"type": "error", "error": str(e)}) - finally: - # Cleanup on disconnect - del request.app["websockets"][ws_id] - - return ws - - -async def handle_ws_msg(data, session, pump, system_pipeline, ws_state): - """ - Handle WebSocket message. - - Message types: - ping - Keepalive - status - Get server status - listeners - List available listeners - targets - Alias for listeners - send - Send message to pipeline (@target or explicit) - """ - t = data.get("type", "") - - if t == "ping": - return {"type": "pong"} - - elif t == "status": - from ..memory import get_context_buffer - stats = get_context_buffer().get_stats() - return {"type": "status", "threads": stats["thread_count"]} - - elif t == "listeners" or t == "targets": - if not pump: - return {"type": "listeners", "listeners": []} - return {"type": "listeners", "listeners": list(pump.listeners.keys())} - - elif t == "send": - # Send message to pipeline - if not system_pipeline: - return {"type": "error", "error": "Pipeline not available"} - - # Support two formats: - # 1. {"type": "send", "raw": "@greeter Dan"} - # 2. {"type": "send", "target": "greeter", "content": "Dan"} - raw = data.get("raw") - if raw: - # Parse @target message format - try: - thread_id = await system_pipeline.inject_console( - raw=raw, - user=session.username, - ) - except ValueError as e: - return {"type": "error", "error": str(e)} - else: - target = data.get("target") - content = data.get("content", data.get("text", data.get("message", ""))) - - if not target: - return {"type": "error", "error": "Missing target"} - if not content: - return {"type": "error", "error": "Missing content"} - - try: - thread_id = await system_pipeline.inject_raw( - target=target, - content=content, - source="websocket", - user=session.username, - ) - except ValueError as e: - return {"type": "error", "error": str(e)} - - # Track thread for response delivery - ws_state["threads"].add(thread_id) - - return { - "type": "sent", - "thread_id": thread_id, - "target": data.get("target") or raw.split()[0].lstrip("@") if raw else None, - } - - return {"type": "error", "error": f"Unknown message type: {t}"} - - -def create_app(pump=None, system_pipeline=None): - """ - Create the aiohttp application. - - Args: - pump: StreamPump instance (optional) - system_pipeline: SystemPipeline instance (optional, created from pump if not provided) - """ - if not AIOHTTP_AVAILABLE: - raise RuntimeError("aiohttp not installed") - - app = web.Application(middlewares=[auth_middleware()]) - app["user_store"] = get_user_store() - app["session_manager"] = get_session_manager() - app["pump"] = pump - app["websockets"] = {} # Track connected WebSocket clients - - # Create SystemPipeline if pump provided but system_pipeline not - if pump and not system_pipeline: - from ..message_bus.system_pipeline import SystemPipeline - system_pipeline = SystemPipeline(pump) - - app["system_pipeline"] = system_pipeline - - app.router.add_post("/auth/login", handle_login) - app.router.add_post("/auth/logout", handle_logout) - app.router.add_get("/auth/me", handle_me) - app.router.add_get("/health", handle_health) - app.router.add_get("/ws", handle_websocket) - - return app - - -async def run_server(pump=None, host="127.0.0.1", port=8765): - """ - Run the server. - - Args: - pump: StreamPump instance for message handling - host: Bind address - port: Port number - """ - app = create_app(pump) - runner = web.AppRunner(app) - await runner.setup() - - site = web.TCPSite(runner, host, port) - await site.start() - - print(f"Server on http://{host}:{port}") - - try: - while True: - await asyncio.sleep(3600) - except asyncio.CancelledError: - pass - finally: - await runner.cleanup() diff --git a/xml_pipeline/xml_listener.py b/xml_pipeline/xml_listener.py deleted file mode 100644 index 5954061..0000000 --- a/xml_pipeline/xml_listener.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -xml_listener.py — The Sovereign Contract for All Capabilities (v1.3) -""" - -from __future__ import annotations -from typing import Optional, Type, Callable -from pydantic import BaseModel - -class XMLListener: - """ - Base class for all reactive capabilities. - Now supports Autonomous Registration via Pydantic payload classes. - """ - - def __init__( - self, - name: str, - payload_class: Type[BaseModel], - handler: Callable[[dict], bytes], - description: Optional[str] = None - ): - self.agent_name = name - self.payload_class = payload_class - self.handler = handler - self.description = description or payload_class.__doc__ or "No description provided." - - # In v1.3, the root tag is derived from the payload class name - self.root_tag = payload_class.__name__ - self.listens_to = [self.root_tag] - - async def handle( - self, - payload_dict: dict, - thread_id: str, - sender_name: str, - ) -> Optional[bytes]: - """ - React to a pre-validated dictionary payload. - Returns raw response XML bytes. - """ - # 1. Execute the handler logic - # Note: In v1.3, the Bus/Lark handles the XML -> Dict conversion - return await self.handler(payload_dict) - - def generate_xsd(self) -> str: - """ - Autonomous XSD Synthesis. - Inspects the payload_class and generates an XSD string. - """ - # Logic to iterate over self.payload_class.model_fields - # and build the definitions. - pass - - def generate_prompt_fragment(self) -> str: - """ - Prompt Synthesis (The 'Mente'). - Generates the tool usage instructions for other agents. - """ - fragment = [ - f"Capability: {self.agent_name}", - f"Root Tag: <{self.root_tag}>", - f"Description: {self.description}", - "\nParameters:" - ] - - for name, field in self.payload_class.model_fields.items(): - field_type = field.annotation.__name__ - field_desc = field.description or "No description" - fragment.append(f" - {name} ({field_type}): {field_desc}") - - return "\n".join(fragment) - - def make_response_envelope( - self, - payload_bytes: bytes, - thread_id: str, - to: Optional[str] = None - ) -> bytes: - """ - Wraps response bytes in a standard envelope. - """ - # Logic to build the meta block and append the payload_bytes - pass \ No newline at end of file