diff --git a/pyproject.toml b/pyproject.toml
index 48ee1bc..7fc5ff4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "xml-pipeline"
-version = "0.3.1"
+version = "0.4.0"
description = "Schema-driven XML message bus for multi-agent systems"
readme = "README.md"
requires-python = ">=3.11"
diff --git a/xml_pipeline/__init__.py b/xml_pipeline/__init__.py
index 286c785..f00b2e4 100644
--- a/xml_pipeline/__init__.py
+++ b/xml_pipeline/__init__.py
@@ -2,4 +2,4 @@
xml-pipeline: Tamper-proof nervous system for multi-agent organisms.
"""
-__version__ = "0.3.1"
+__version__ = "0.4.0"
diff --git a/xml_pipeline/agentserver.py b/xml_pipeline/agentserver.py
deleted file mode 100644
index 2de3779..0000000
--- a/xml_pipeline/agentserver.py
+++ /dev/null
@@ -1,295 +0,0 @@
-# agent_server.py
-"""
-AgentServer — The Living Organism Host
-December 25, 2025
-
-Preliminary but runnable implementation.
-
-This is the body: one process, one secure WebSocket endpoint,
-hosting many concurrent AgentService organs sharing a single
-tamper-proof MessageBus from xml-pipeline.
-
-Features in this version:
-- Mandatory WSS (TLS)
-- First-message TOTP authentication
-- Per-user capability control via config/users.yaml
-- Personalized on connect
-- Ed25519 identity generation helper
-- Boot-time agent registration
-- Hooks for future signed privileged commands
-
-XML wins.
-"""
-
-import asyncio
-import os
-import ssl
-import time
-from typing import Optional, Dict, Any
-
-import pyotp
-import yaml
-from cryptography.hazmat.primitives.asymmetric import ed25519
-from cryptography.hazmat.primitives import serialization
-from websockets.server import serve, WebSocketServerProtocol
-
-from xml_pipeline import MessageBus
-from xml_pipeline.service import AgentService
-from xml_pipeline.message import repair_and_canonicalize, XmlTamperError
-
-
-class AgentServer:
- """
- The body of the organism.
- One instance = one living, multi-personality swarm.
- """
-
- # Default identity location — can be overridden if needed
- IDENTITY_DIR = os.path.expanduser("~/.agent_server")
- PRIVATE_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519")
- PUBLIC_KEY_PATH = os.path.join(IDENTITY_DIR, "identity.ed25519.pub")
-
- def __init__(
- self,
- host: str = "0.0.0.0",
- port: int = 8765,
- ssl_context: Optional[ssl.SSLContext] = None,
- users_config_path: str = "config/users.yaml",
- identity_pubkey_path: Optional[str] = None,
- ):
- self.host = host
- self.port = port
- self.ssl_context = ssl_context # None = ws:// (dev only), set for wss://
- self.bus = MessageBus()
-
- # Load per-user TOTP secrets + allowed root tags
- self.users_config: Dict[str, Dict[str, Any]] = self._load_users_config(users_config_path)
-
- # Load organism public key for future privileged command verification
- self.pubkey: Optional[bytes] = None
- pubkey_path = identity_pubkey_path or self.PUBLIC_KEY_PATH
- if os.path.exists(pubkey_path):
- self.pubkey = self._load_pubkey(pubkey_path)
-
- # Built-in platform listeners will be added here in future versions
-
- @staticmethod
- def _load_users_config(path: str) -> Dict[str, Dict[str, Any]]:
- """Load users.yaml → {user_id: {totp_secret: ..., allowed_root_tags: [...]}}"""
- if not os.path.exists(path):
- raise FileNotFoundError(f"Users config not found: {path}")
- with open(path) as f:
- data = yaml.safe_load(f) or {}
- return data.get("users", {})
-
- @staticmethod
- def _load_pubkey(path: str) -> bytes:
- """Load raw Ed25519 public key bytes"""
- with open(path, "rb") as f:
- content = f.read().strip()
- # Accept either raw bytes or ssh-ed25519 format
- if content.startswith(b"ssh-ed25519 "):
- import base64
- return base64.b64decode(content.split()[1])
- return content
-
- def register_agent(
- self,
- agent_class: type[AgentService],
- *,
- system_prompt: str,
- max_concurrent: int = 10,
- session_timeout: float = 1800.0,
- version: str = "1.0",
- public: bool = True,
- ) -> None:
- """Register a permanent agent at boot time."""
- # Wrapper to store public flag for catalog building
- self.bus.register_agent(
- agent_class=agent_class,
- system_prompt=system_prompt,
- max_concurrent=max_concurrent,
- session_timeout=session_timeout,
- version=version,
- metadata={"public": public},
- )
-
- async def _handle_client(self, websocket: WebSocketServerProtocol):
- """Per-connection handler: authenticate → send catalog → pump messages"""
- context = {
- "authenticated": False,
- "user": None,
- "allowed_tags": set(),
- "bad_message_count": 0,
- "last_bad_time": 0.0,
- }
-
- try:
- # 1. Authentication — first message must be
- first_raw = await asyncio.wait_for(websocket.recv(), timeout=15.0)
- auth_msg = repair_and_canonicalize(first_raw)
-
- if auth_msg.getroot().tag != "authenticate":
- await websocket.close(code=1008, reason="First message must be ")
- return
-
- totp_code = auth_msg.getroot().get("totp")
- if not totp_code:
- await websocket.close(code=1008, reason="Missing TOTP code")
- return
-
- user_id = self._authenticate_totp(totp_code)
- if not user_id:
- await websocket.close(code=1008, reason="Invalid TOTP")
- return
-
- user_cfg = self.users_config[user_id]
- allowed_tags = set(user_cfg.get("allowed_root_tags", []))
- if "*" in allowed_tags:
- # Wildcard = all current + future tags
- allowed_tags = None # Special sentinel
-
- context.update({
- "authenticated": True,
- "user": user_id,
- "allowed_tags": allowed_tags,
- })
-
- # 2. Send personalized catalog
- catalog_xml = self.bus.build_catalog_for_user(allowed_tags)
- await websocket.send(catalog_xml)
-
- # 3. Message pump
- async def inbound():
- async for raw in websocket:
- try:
- yield repair_and_canonicalize(raw)
- except XmlTamperError:
- await websocket.close(code=1008, reason="Invalid/tampered XML")
- raise
-
- async def outbound(message: bytes):
- await websocket.send(message)
-
- await self.bus.run(
- inbound=inbound(),
- outbound=outbound,
- context=context, # For ACL checks in listeners
- )
-
- except asyncio.TimeoutError:
- await websocket.close(code=1008, reason="Authentication timeout")
- except Exception as e:
- print(f"Client error ({websocket.remote_address}): {e}")
-
- def _authenticate_totp(self, code: str) -> Optional[str]:
- """Validate TOTP and return user identifier if successful"""
- for user_id, cfg in self.users_config.items():
- totp = pyotp.TOTP(cfg["totp_secret"])
- if totp.verify(code, valid_window=1): # 30s tolerance
- return user_id
- return None
-
- async def start(self):
- """Start the organism — runs forever"""
- scheme = "wss" if self.ssl_context else "ws"
- print(f"AgentServer starting on {scheme}://{self.host}:{self.port}")
- print("Organism awakening...")
-
- async with serve(self._handle_client, self.host, self.port, ssl=self.ssl_context):
- await asyncio.Future() # Run forever
-
- @classmethod
- def generate_identity(cls, force: bool = False) -> None:
- """
- Generate the organism's permanent Ed25519 identity.
- Run once on first deployment.
- """
- os.makedirs(cls.IDENTITY_DIR, exist_ok=True)
-
- if os.path.exists(cls.PRIVATE_KEY_PATH) and not force:
- print("Identity already exists:")
- print(f" Private key: {cls.PRIVATE_KEY_PATH}")
- print(f" Public key : {cls.PUBLIC_KEY_PATH}")
- print("Use --force to regenerate (will overwrite!).")
- return
-
- print("Generating organism Ed25519 identity...")
-
- private_key = ed25519.Ed25519PrivateKey.generate()
- public_key = private_key.public_key()
-
- # Private key — PEM PKCS8 unencrypted (rely on file permissions)
- private_pem = private_key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.PKCS8,
- encryption_algorithm=serialization.NoEncryption(),
- )
-
- # Public key — raw bytes + ssh-ed25519 format for readability
- public_raw = public_key.public_bytes(
- encoding=serialization.Encoding.Raw,
- format=serialization.PublicFormat.Raw,
- )
- public_ssh = f"ssh-ed25519 {public_raw.hex()} organism@{os.uname().nodename}"
-
- # Write with secure permissions
- with open(cls.PRIVATE_KEY_PATH, "wb") as f:
- os.fchmod(f.fileno(), 0o600)
- f.write(private_pem)
-
- with open(cls.PUBLIC_KEY_PATH, "w") as f:
- f.write(public_ssh + "\n")
-
- print("Organism identity created!")
- print(f"Private key (KEEP SAFE): {cls.PRIVATE_KEY_PATH}")
- print(f"Public key : {cls.PUBLIC_KEY_PATH}")
- print("\nBackup the private key offline. Lose it → lose structural control forever.")
-
-
-# ————————————————————————
-# Example CLI entrypoint
-# ————————————————————————
-if __name__ == "__main__":
- import argparse
-
- parser = argparse.ArgumentParser(description="AgentServer — the living organism")
- subparsers = parser.add_subparsers(dest="command", required=True)
-
- # Run the server
- run_p = subparsers.add_parser("run", help="Start the organism")
- run_p.add_argument("--host", default="0.0.0.0")
- run_p.add_argument("--port", type=int, default=8765)
- run_p.add_argument("--cert", help="Path to TLS fullchain.pem")
- run_p.add_argument("--key", help="Path to TLS privkey.pem")
- run_p.add_argument("--users-config", default="config/users.yaml")
-
- # Generate identity
- gen_p = subparsers.add_parser("generate-identity", help="Create cryptographic identity")
- gen_p.add_argument("--force", action="store_true")
-
- args = parser.parse_args()
-
- if args.command == "generate-identity":
- AgentServer.generate_identity(force=args.force)
-
- else: # run
- ssl_ctx = None
- if args.cert and args.key:
- ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- ssl_ctx.load_cert_chain(args.cert, args.key)
-
- server = AgentServer(
- host=args.host,
- port=args.port,
- ssl_context=ssl_ctx,
- users_config_path=args.users_config,
- )
-
- # Example boot-time listeners (uncomment and customise)
- # from your_agents import CodingAgent, ResearchAgent, GrokAgent
- # server.register_agent(CodingAgent, system_prompt="You are an elite Python engineer...", max_concurrent=20)
- # server.register_agent(ResearchAgent, system_prompt="You are a thorough researcher...", max_concurrent=10)
- # server.register_agent(GrokAgent, system_prompt="You are Grok, built by xAI...", max_concurrent=15)
-
- asyncio.run(server.start())
diff --git a/xml_pipeline/auth/__init__.py b/xml_pipeline/auth/__init__.py
deleted file mode 100644
index 90e34f2..0000000
--- a/xml_pipeline/auth/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-"""
-Authentication and authorization for xml-pipeline.
-
-Provides:
-- UserStore: User management with Argon2id password hashing
-- SessionManager: Token-based session management
-"""
-
-from .users import User, UserStore, get_user_store
-from .sessions import Session, SessionManager, get_session_manager
-
-__all__ = [
- "User",
- "UserStore",
- "get_user_store",
- "Session",
- "SessionManager",
- "get_session_manager",
-]
diff --git a/xml_pipeline/auth/sessions.py b/xml_pipeline/auth/sessions.py
deleted file mode 100644
index 994abd8..0000000
--- a/xml_pipeline/auth/sessions.py
+++ /dev/null
@@ -1,197 +0,0 @@
-"""
-Session management with token-based authentication.
-
-Tokens are random hex strings stored in memory with expiry.
-"""
-
-from __future__ import annotations
-
-import secrets
-import threading
-from dataclasses import dataclass
-from datetime import datetime, timedelta, timezone
-from typing import Optional
-
-
-# Default session lifetime
-DEFAULT_SESSION_LIFETIME = timedelta(hours=8)
-
-# Token length in bytes (32 bytes = 64 hex chars)
-TOKEN_BYTES = 32
-
-
-@dataclass
-class Session:
- """An authenticated session."""
- token: str
- username: str
- role: str
- created_at: datetime
- expires_at: datetime
- last_activity: datetime
-
- def is_expired(self) -> bool:
- """Check if session has expired."""
- return datetime.now(timezone.utc) > self.expires_at
-
- def touch(self) -> None:
- """Update last activity time."""
- self.last_activity = datetime.now(timezone.utc)
-
- def to_dict(self) -> dict:
- """Convert to dict for API responses."""
- return {
- "token": self.token,
- "username": self.username,
- "role": self.role,
- "expires_at": self.expires_at.isoformat(),
- }
-
-
-class SessionManager:
- """
- Manages authenticated sessions.
-
- Thread-safe for concurrent access.
-
- Usage:
- manager = SessionManager()
-
- # Create session after successful login
- session = manager.create("admin", "admin")
-
- # Validate token on subsequent requests
- session = manager.validate(token)
- if session:
- print(f"Welcome back {session.username}")
-
- # Logout
- manager.revoke(token)
- """
-
- def __init__(self, lifetime: timedelta = DEFAULT_SESSION_LIFETIME):
- self.lifetime = lifetime
- self._sessions: dict[str, Session] = {}
- self._lock = threading.Lock()
-
- def create(
- self,
- username: str,
- role: str,
- lifetime: Optional[timedelta] = None,
- ) -> Session:
- """
- Create a new session.
-
- Args:
- username: Authenticated username
- role: User's role
- lifetime: Optional custom lifetime
-
- Returns:
- New Session with token
- """
- token = secrets.token_hex(TOKEN_BYTES)
- now = datetime.now(timezone.utc)
- expires = now + (lifetime or self.lifetime)
-
- session = Session(
- token=token,
- username=username,
- role=role,
- created_at=now,
- expires_at=expires,
- last_activity=now,
- )
-
- with self._lock:
- self._sessions[token] = session
- self._cleanup_expired()
-
- return session
-
- def validate(self, token: str) -> Optional[Session]:
- """
- Validate a session token.
-
- Args:
- token: Session token from client
-
- Returns:
- Session if valid, None if invalid/expired
- """
- with self._lock:
- session = self._sessions.get(token)
- if not session:
- return None
-
- if session.is_expired():
- del self._sessions[token]
- return None
-
- session.touch()
- return session
-
- def revoke(self, token: str) -> bool:
- """
- Revoke a session (logout).
-
- Returns:
- True if session was revoked, False if not found
- """
- with self._lock:
- if token in self._sessions:
- del self._sessions[token]
- return True
- return False
-
- def revoke_user(self, username: str) -> int:
- """
- Revoke all sessions for a user.
-
- Returns:
- Number of sessions revoked
- """
- with self._lock:
- to_revoke = [
- token for token, session in self._sessions.items()
- if session.username == username
- ]
- for token in to_revoke:
- del self._sessions[token]
- return len(to_revoke)
-
- def get_user_sessions(self, username: str) -> list[Session]:
- """Get all active sessions for a user."""
- with self._lock:
- return [
- s for s in self._sessions.values()
- if s.username == username and not s.is_expired()
- ]
-
- def _cleanup_expired(self) -> None:
- """Remove expired sessions. Must hold lock."""
- expired = [
- token for token, session in self._sessions.items()
- if session.is_expired()
- ]
- for token in expired:
- del self._sessions[token]
-
- def active_count(self) -> int:
- """Count active sessions."""
- with self._lock:
- self._cleanup_expired()
- return len(self._sessions)
-
-
-# Global instance
-_manager: Optional[SessionManager] = None
-
-
-def get_session_manager() -> SessionManager:
- """Get the global session manager."""
- global _manager
- if _manager is None:
- _manager = SessionManager()
- return _manager
diff --git a/xml_pipeline/auth/totp.py b/xml_pipeline/auth/totp.py
deleted file mode 100644
index e69de29..0000000
diff --git a/xml_pipeline/auth/users.py b/xml_pipeline/auth/users.py
deleted file mode 100644
index bdbd9a0..0000000
--- a/xml_pipeline/auth/users.py
+++ /dev/null
@@ -1,227 +0,0 @@
-"""
-User store with Argon2id password hashing.
-
-Users are stored in ~/.xml-pipeline/users.yaml with hashed passwords.
-"""
-
-from __future__ import annotations
-
-import os
-import stat
-import sys
-from dataclasses import dataclass, field
-from datetime import datetime, timezone
-from pathlib import Path
-from typing import Optional
-
-import yaml
-from argon2 import PasswordHasher
-from argon2.exceptions import VerifyMismatchError
-
-
-CONFIG_DIR = Path.home() / ".xml-pipeline"
-USERS_FILE = CONFIG_DIR / "users.yaml"
-
-
-@dataclass
-class User:
- """A user account."""
- username: str
- password_hash: str
- role: str = "operator" # admin, operator, viewer
- created_at: str = ""
- last_login: Optional[str] = None
-
- def to_dict(self) -> dict:
- return {
- "username": self.username,
- "password_hash": self.password_hash,
- "role": self.role,
- "created_at": self.created_at,
- "last_login": self.last_login,
- }
-
- @classmethod
- def from_dict(cls, data: dict) -> User:
- return cls(
- username=data["username"],
- password_hash=data["password_hash"],
- role=data.get("role", "operator"),
- created_at=data.get("created_at", ""),
- last_login=data.get("last_login"),
- )
-
-
-class UserStore:
- """
- Manages user accounts with secure password storage.
-
- Usage:
- store = UserStore()
- store.create_user("admin", "secretpass", role="admin")
-
- user = store.authenticate("admin", "secretpass")
- if user:
- print(f"Welcome {user.username}!")
- """
-
- def __init__(self, users_file: Path = USERS_FILE):
- self.users_file = users_file
- self.hasher = PasswordHasher()
- self._users: dict[str, User] = {}
- self._load()
-
- def _ensure_dir(self) -> None:
- """Create config directory if needed."""
- self.users_file.parent.mkdir(parents=True, exist_ok=True)
-
- def _load(self) -> None:
- """Load users from file."""
- if not self.users_file.exists():
- return
- try:
- with open(self.users_file) as f:
- data = yaml.safe_load(f) or {}
- for username, user_data in data.get("users", {}).items():
- user_data["username"] = username
- self._users[username] = User.from_dict(user_data)
- except Exception:
- pass
-
- def _save(self) -> None:
- """Save users to file."""
- self._ensure_dir()
-
- data = {
- "users": {
- username: {
- "password_hash": user.password_hash,
- "role": user.role,
- "created_at": user.created_at,
- "last_login": user.last_login,
- }
- for username, user in self._users.items()
- }
- }
-
- with open(self.users_file, "w") as f:
- yaml.dump(data, f, default_flow_style=False)
-
- # Set file permissions to 600
- if sys.platform != "win32":
- os.chmod(self.users_file, stat.S_IRUSR | stat.S_IWUSR)
-
- def has_users(self) -> bool:
- """Check if any users exist."""
- return len(self._users) > 0
-
- def get_user(self, username: str) -> Optional[User]:
- """Get user by username."""
- return self._users.get(username)
-
- def list_users(self) -> list[str]:
- """List all usernames."""
- return list(self._users.keys())
-
- def create_user(
- self,
- username: str,
- password: str,
- role: str = "operator",
- ) -> User:
- """
- Create a new user.
-
- Args:
- username: Unique username
- password: Plain text password (will be hashed)
- role: User role (admin, operator, viewer)
-
- Returns:
- The created User
-
- Raises:
- ValueError: If username already exists
- """
- if username in self._users:
- raise ValueError(f"User already exists: {username}")
-
- if len(password) < 4:
- raise ValueError("Password must be at least 4 characters")
-
- user = User(
- username=username,
- password_hash=self.hasher.hash(password),
- role=role,
- created_at=datetime.now(timezone.utc).isoformat(),
- )
-
- self._users[username] = user
- self._save()
- return user
-
- def authenticate(self, username: str, password: str) -> Optional[User]:
- """
- Authenticate user with password.
-
- Returns:
- User if authentication successful, None otherwise
- """
- user = self._users.get(username)
- if not user:
- return None
-
- try:
- self.hasher.verify(user.password_hash, password)
-
- # Update last login
- user.last_login = datetime.now(timezone.utc).isoformat()
- self._save()
-
- return user
- except VerifyMismatchError:
- return None
-
- def change_password(self, username: str, new_password: str) -> bool:
- """Change user's password."""
- user = self._users.get(username)
- if not user:
- return False
-
- if len(new_password) < 4:
- raise ValueError("Password must be at least 4 characters")
-
- user.password_hash = self.hasher.hash(new_password)
- self._save()
- return True
-
- def delete_user(self, username: str) -> bool:
- """Delete a user."""
- if username not in self._users:
- return False
-
- del self._users[username]
- self._save()
- return True
-
- def set_role(self, username: str, role: str) -> bool:
- """Change user's role."""
- user = self._users.get(username)
- if not user:
- return False
-
- user.role = role
- self._save()
- return True
-
-
-# Global instance
-_store: Optional[UserStore] = None
-
-
-def get_user_store() -> UserStore:
- """Get the global user store."""
- global _store
- if _store is None:
- _store = UserStore()
- return _store
diff --git a/xml_pipeline/config/features.py b/xml_pipeline/config/features.py
index 55211a7..0ed6cd6 100644
--- a/xml_pipeline/config/features.py
+++ b/xml_pipeline/config/features.py
@@ -16,14 +16,13 @@ def _check_import(module: str) -> bool:
# Feature registry: feature_name -> (check_function, description)
+# Note: auth, server, lsp moved to Nextra (proprietary)
FEATURES: dict[str, tuple[Callable[[], bool], str]] = {
"anthropic": (lambda: _check_import("anthropic"), "Anthropic Claude SDK"),
"openai": (lambda: _check_import("openai"), "OpenAI SDK"),
"redis": (lambda: _check_import("redis"), "Redis for distributed keyvalue"),
"search": (lambda: _check_import("duckduckgo_search"), "DuckDuckGo search"),
- "auth": (lambda: _check_import("pyotp") and _check_import("argon2"), "TOTP auth"),
- "server": (lambda: _check_import("websockets"), "WebSocket server"),
- "lsp": (lambda: _check_import("lsp_client"), "LSP client for config editor"),
+ "console": (lambda: _check_import("prompt_toolkit"), "Interactive console example"),
}
@@ -80,15 +79,7 @@ def check_features(config) -> FeatureCheck:
# This would need more sophisticated detection based on tool config
pass
- # Check if auth is needed (multi-tenant mode)
- if getattr(config, "auth", None):
- if not result.available.get("auth"):
- result.missing["auth"] = "Config has auth enabled"
-
- # Check if websocket server is needed
- if getattr(config, "server", None):
- if not result.available.get("server"):
- result.missing["server"] = "Config has server enabled"
+ # Note: auth/server config sections are read but implemented in Nextra
return result
diff --git a/xml_pipeline/console/__init__.py b/xml_pipeline/console/__init__.py
deleted file mode 100644
index 2f275ad..0000000
--- a/xml_pipeline/console/__init__.py
+++ /dev/null
@@ -1,12 +0,0 @@
-"""
-console — Console interfaces for xml-pipeline.
-
-Provides:
-- SecureConsole: Local keyboard-only console (no network)
-- ConsoleClient: Network client connecting to server with auth
-"""
-
-from xml_pipeline.console.secure_console import SecureConsole, PasswordManager
-from xml_pipeline.console.client import ConsoleClient
-
-__all__ = ["SecureConsole", "PasswordManager", "ConsoleClient"]
diff --git a/xml_pipeline/console/client.py b/xml_pipeline/console/client.py
deleted file mode 100644
index 65cbfc0..0000000
--- a/xml_pipeline/console/client.py
+++ /dev/null
@@ -1,367 +0,0 @@
-"""
-Console client that connects to the agent server.
-
-Provides SSH-style login with username/password authentication.
-"""
-
-from __future__ import annotations
-
-import asyncio
-import getpass
-import json
-import sys
-from pathlib import Path
-from typing import Optional
-
-try:
- import aiohttp
- AIOHTTP_AVAILABLE = True
-except ImportError:
- AIOHTTP_AVAILABLE = False
-
-try:
- from prompt_toolkit import PromptSession
- from prompt_toolkit.history import InMemoryHistory
- from prompt_toolkit.styles import Style
- PROMPT_TOOLKIT_AVAILABLE = True
-except ImportError:
- PROMPT_TOOLKIT_AVAILABLE = False
-
-from ..config import get_agent_config_store, CONFIG_DIR
-
-
-DEFAULT_HOST = "127.0.0.1"
-DEFAULT_PORT = 8765
-MAX_LOGIN_ATTEMPTS = 3
-
-# Default organism config path
-DEFAULT_ORGANISM_CONFIG = Path("config/organism.yaml")
-
-
-class ConsoleClient:
- """
- Text-based console client for the agent server.
-
- Usage:
- client = ConsoleClient()
- asyncio.run(client.run())
- """
-
- def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT):
- self.host = host
- self.port = port
- self.base_url = f"http://{host}:{port}"
- self.ws_url = f"ws://{host}:{port}/ws"
- self.token: Optional[str] = None
- self.username: Optional[str] = None
- self.session: Optional[aiohttp.ClientSession] = None
- self.ws: Optional[aiohttp.ClientWebSocketResponse] = None
- self.running = False
-
- async def login(self) -> bool:
- """
- Perform SSH-style login.
-
- Returns:
- True if login successful, False otherwise
- """
- print(f"Connecting to {self.host}:{self.port}...")
-
- for attempt in range(1, MAX_LOGIN_ATTEMPTS + 1):
- try:
- username = input("Username: ")
- password = getpass.getpass("Password: ")
- except (EOFError, KeyboardInterrupt):
- print("\nLogin cancelled.")
- return False
-
- if not username or not password:
- print("Username and password required.")
- continue
-
- try:
- async with aiohttp.ClientSession() as session:
- async with session.post(
- f"{self.base_url}/auth/login",
- json={"username": username, "password": password},
- ) as resp:
- data = await resp.json()
-
- if resp.status == 200:
- self.token = data["token"]
- self.username = username
- print(f"Welcome, {username}!")
- return True
- else:
- error = data.get("error", "Authentication failed")
- remaining = MAX_LOGIN_ATTEMPTS - attempt
- if remaining > 0:
- print(f"{error}. {remaining} attempt(s) remaining.")
- else:
- print(f"{error}. No attempts remaining.")
- except aiohttp.ClientError as e:
- print(f"Connection error: {e}")
- return False
-
- return False
-
- async def connect_ws(self) -> bool:
- """Connect to WebSocket after authentication."""
- if not self.token:
- return False
-
- try:
- self.session = aiohttp.ClientSession(
- headers={"Authorization": f"Bearer {self.token}"}
- )
- self.ws = await self.session.ws_connect(self.ws_url)
-
- # Wait for connected message
- msg = await self.ws.receive_json()
- if msg.get("type") == "connected":
- return True
- return False
- except Exception as e:
- print(f"WebSocket connection failed: {e}")
- return False
-
- async def send_command(self, cmd: str) -> Optional[dict]:
- """Send a command via WebSocket and get response."""
- if not self.ws:
- return None
-
- await self.ws.send_json(cmd)
- return await self.ws.receive_json()
-
- def print_help(self):
- """Print available commands."""
- print("""
-Available commands:
- /help - Show this help
- /status - Show server status
- /listeners - List available targets
- /targets - Alias for /listeners
- /configure - Edit organism.yaml (swarm wiring)
- /configure @agent - Edit agent config (prompt, model, etc.)
- /quit - Disconnect and exit
-
-Send messages:
- @target message - Send message to a target listener
- Example: @greeter Hello there!
-""")
-
- async def handle_command(self, line: str) -> bool:
- """
- Handle a command line.
-
- Returns:
- False if should quit, True otherwise
- """
- line = line.strip()
- if not line:
- return True
-
- if line == "/help":
- self.print_help()
- elif line == "/quit" or line == "/exit":
- return False
- elif line == "/status":
- resp = await self.send_command({"type": "status"})
- if resp:
- threads = resp.get("threads", 0)
- print(f"Active threads: {threads}")
- elif line == "/listeners" or line == "/targets":
- resp = await self.send_command({"type": "listeners"})
- if resp:
- listeners = resp.get("listeners", [])
- if listeners:
- print("Available targets:")
- for name in listeners:
- print(f" - {name}")
- else:
- print("No targets available (pipeline not running)")
- elif line == "/configure":
- # Edit organism.yaml
- await self._configure_organism()
- elif line.startswith("/configure @"):
- # Edit agent config: /configure @agent
- agent_name = line[12:].strip()
- if agent_name:
- await self._configure_agent(agent_name)
- else:
- print("Usage: /configure @agent_name")
- elif line.startswith("/configure "):
- # Also support /configure agent without @
- agent_name = line[11:].strip()
- if agent_name:
- await self._configure_agent(agent_name)
- else:
- print("Usage: /configure @agent_name")
- elif line.startswith("/"):
- print(f"Unknown command: {line}")
- elif line.startswith("@"):
- # Send message to target: @target message
- resp = await self.send_command({"type": "send", "raw": line})
- if resp:
- if resp.get("type") == "sent":
- thread_id = resp.get("thread_id", "")[:8]
- target = resp.get("target", "unknown")
- print(f"Sent to {target} (thread: {thread_id}...)")
- elif resp.get("type") == "error":
- print(f"Error: {resp.get('error')}")
- else:
- print("Use @target message to send. Example: @greeter Hello!")
- print("Type /listeners to see available targets.")
-
- return True
-
- async def _configure_organism(self):
- """Open organism.yaml in editor."""
- from .editor import edit_text, PROMPT_TOOLKIT_AVAILABLE as EDITOR_AVAILABLE
-
- # Find organism.yaml
- organism_path = DEFAULT_ORGANISM_CONFIG
- if not organism_path.exists():
- # Try absolute path in config dir
- organism_path = CONFIG_DIR / "organism.yaml"
-
- if not organism_path.exists():
- print(f"organism.yaml not found at {organism_path}")
- print("Create one or specify path in configuration.")
- return
-
- # Load content
- content = organism_path.read_text()
-
- if EDITOR_AVAILABLE:
- # Use built-in editor
- edited, saved = await asyncio.get_event_loop().run_in_executor(
- None,
- lambda: edit_text(content, title=f"organism.yaml")
- )
-
- if saved and edited is not None:
- organism_path.write_text(edited)
- print(f"Saved {organism_path}")
- print("Note: Restart server to apply changes.")
- else:
- print("Cancelled.")
- else:
- print(f"Edit manually: {organism_path}")
-
- async def _configure_agent(self, agent_name: str):
- """Open agent config in editor."""
- from .editor import edit_text, PROMPT_TOOLKIT_AVAILABLE as EDITOR_AVAILABLE
-
- store = get_agent_config_store()
-
- # Load or create config content
- content = store.load_yaml(agent_name)
- config_path = store.path_for(agent_name)
-
- if EDITOR_AVAILABLE:
- # Use built-in editor
- edited, saved = await asyncio.get_event_loop().run_in_executor(
- None,
- lambda: edit_text(content, title=f"Agent: {agent_name}")
- )
-
- if saved and edited is not None:
- try:
- store.save_yaml(agent_name, edited)
- print(f"Saved {config_path}")
- except Exception as e:
- print(f"Error saving: {e}")
- else:
- print("Cancelled.")
- else:
- # Fallback: show path
- if not config_path.exists():
- # Create default
- store.save_yaml(agent_name, content)
- print(f"Edit manually: {config_path}")
-
- async def run(self):
- """Main client loop."""
- if not AIOHTTP_AVAILABLE:
- print("Error: aiohttp not installed")
- sys.exit(1)
-
- # Login
- if not await self.login():
- print("Authentication failed.")
- sys.exit(1)
-
- # Connect WebSocket
- if not await self.connect_ws():
- print("Failed to connect to server.")
- sys.exit(1)
-
- print("Connected. Type /help for commands, /quit to exit.")
-
- self.running = True
-
- try:
- if PROMPT_TOOLKIT_AVAILABLE:
- await self._run_prompt_toolkit()
- else:
- await self._run_simple()
- finally:
- await self.cleanup()
-
- async def _run_prompt_toolkit(self):
- """Run with prompt_toolkit for better UX."""
- style = Style.from_dict({
- "prompt": "ansicyan bold",
- })
-
- session = PromptSession(
- history=InMemoryHistory(),
- style=style,
- )
-
- while self.running:
- try:
- line = await asyncio.get_event_loop().run_in_executor(
- None,
- lambda: session.prompt(f"{self.username}> ")
- )
- if not await self.handle_command(line):
- break
- except (EOFError, KeyboardInterrupt):
- break
-
- async def _run_simple(self):
- """Run with simple input (fallback)."""
- while self.running:
- try:
- line = input(f"{self.username}> ")
- if not await self.handle_command(line):
- break
- except (EOFError, KeyboardInterrupt):
- break
-
- async def cleanup(self):
- """Clean up connections."""
- if self.ws:
- await self.ws.close()
- if self.session:
- await self.session.close()
- print("Disconnected.")
-
-
-def main():
- """Entry point."""
- import argparse
-
- parser = argparse.ArgumentParser(description="XML Pipeline Console")
- parser.add_argument("--host", default=DEFAULT_HOST, help="Server host")
- parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Server port")
- args = parser.parse_args()
-
- client = ConsoleClient(host=args.host, port=args.port)
- asyncio.run(client.run())
-
-
-if __name__ == "__main__":
- main()
diff --git a/xml_pipeline/console/console_registry.py b/xml_pipeline/console/console_registry.py
deleted file mode 100644
index 6136f05..0000000
--- a/xml_pipeline/console/console_registry.py
+++ /dev/null
@@ -1,19 +0,0 @@
-"""
-console_registry.py — Global console reference for handlers.
-
-This module provides a central place to register and retrieve
-the active console instance, avoiding Python module import issues.
-"""
-
-_console = None
-
-
-def set_console(console):
- """Set the active console instance."""
- global _console
- _console = console
-
-
-def get_console():
- """Get the active console instance (or None)."""
- return _console
diff --git a/xml_pipeline/console/editor.py b/xml_pipeline/console/editor.py
deleted file mode 100644
index f87395b..0000000
--- a/xml_pipeline/console/editor.py
+++ /dev/null
@@ -1,752 +0,0 @@
-"""
-Full-screen text editor using prompt_toolkit.
-
-Provides a vim-like editing experience for configuration files.
-Supports optional LSP integration for YAML files.
-"""
-
-from __future__ import annotations
-
-import asyncio
-import logging
-from pathlib import Path
-from typing import Optional, Tuple, TYPE_CHECKING
-
-if TYPE_CHECKING:
- from xml_pipeline.console.lsp import YAMLLSPClient, ASLSClient
- from typing import Union
- LSPClientType = Union[YAMLLSPClient, ASLSClient]
-
-logger = logging.getLogger(__name__)
-
-try:
- from prompt_toolkit import Application
- from prompt_toolkit.buffer import Buffer
- from prompt_toolkit.layout import Layout, HSplit, VSplit
- from prompt_toolkit.layout.containers import Window, ConditionalContainer, Float, FloatContainer
- from prompt_toolkit.layout.controls import BufferControl, FormattedTextControl
- from prompt_toolkit.layout.menus import CompletionsMenu
- from prompt_toolkit.key_binding import KeyBindings
- from prompt_toolkit.filters import Condition
- from prompt_toolkit.styles import Style
- from prompt_toolkit.lexers import PygmentsLexer
- from prompt_toolkit.completion import Completer, Completion
- from prompt_toolkit.document import Document
- PROMPT_TOOLKIT_AVAILABLE = True
-except ImportError:
- PROMPT_TOOLKIT_AVAILABLE = False
-
-try:
- from pygments.lexers.data import YamlLexer
- from pygments.lexers.javascript import TypeScriptLexer
- PYGMENTS_AVAILABLE = True
-except ImportError:
- PYGMENTS_AVAILABLE = False
-
-
-# Supported syntax types and their lexers
-SYNTAX_LEXERS = {
- "yaml": "YamlLexer",
- "typescript": "TypeScriptLexer",
- "assemblyscript": "TypeScriptLexer", # AS uses TS syntax
- "ts": "TypeScriptLexer",
- "as": "TypeScriptLexer",
-}
-
-
-def get_lexer_for_syntax(syntax: str) -> Optional[object]:
- """
- Get a Pygments lexer for the given syntax type.
-
- Args:
- syntax: Syntax name ("yaml", "typescript", "ts", "as", "assemblyscript")
-
- Returns:
- PygmentsLexer instance or None
- """
- if not PYGMENTS_AVAILABLE:
- return None
-
- syntax_lower = syntax.lower()
-
- if syntax_lower in ("yaml", "yml"):
- return PygmentsLexer(YamlLexer)
- elif syntax_lower in ("typescript", "ts", "assemblyscript", "as"):
- return PygmentsLexer(TypeScriptLexer)
- else:
- return None
-
-
-def detect_syntax_from_path(path: str | Path) -> str:
- """
- Detect syntax type from file extension.
-
- Returns:
- Syntax name for use with get_lexer_for_syntax()
- """
- ext = Path(path).suffix.lower()
-
- extension_map = {
- ".yaml": "yaml",
- ".yml": "yaml",
- ".ts": "typescript",
- ".as": "assemblyscript",
- }
-
- return extension_map.get(ext, "text")
-
-
-def edit_text(
- initial_text: str,
- title: str = "Editor",
- syntax: str = "yaml",
-) -> Tuple[Optional[str], bool]:
- """
- Open full-screen editor for text.
-
- Args:
- initial_text: Text to edit
- title: Title shown in header
- syntax: Syntax highlighting ("yaml", "typescript", "ts", "as", "text")
-
- Returns:
- (edited_text, saved) - edited_text is None if cancelled
- """
- if not PROMPT_TOOLKIT_AVAILABLE:
- print("Error: prompt_toolkit not installed")
- return None, False
-
- # State
- result = {"text": None, "saved": False}
-
- # Create buffer with initial text
- buffer = Buffer(
- multiline=True,
- name="editor",
- )
- buffer.text = initial_text
-
- # Key bindings
- kb = KeyBindings()
-
- @kb.add("c-s") # Ctrl+S to save
- def save(event):
- result["text"] = buffer.text
- result["saved"] = True
- event.app.exit()
-
- @kb.add("c-q") # Ctrl+Q to quit without saving
- def quit_nosave(event):
- result["text"] = None
- result["saved"] = False
- event.app.exit()
-
- @kb.add("escape") # Escape to quit
- def escape(event):
- result["text"] = None
- result["saved"] = False
- event.app.exit()
-
- # Syntax highlighting
- lexer = get_lexer_for_syntax(syntax)
-
- # Layout
- header = Window(
- height=1,
- content=FormattedTextControl(
- lambda: [
- ("class:header", f" {title} "),
- ("class:header.key", " Ctrl+S"),
- ("class:header", "=Save "),
- ("class:header.key", " Ctrl+Q"),
- ("class:header", "=Quit "),
- ]
- ),
- style="class:header",
- )
-
- editor_window = Window(
- content=BufferControl(
- buffer=buffer,
- lexer=lexer,
- ),
- )
-
- # Status bar showing cursor position
- def get_status():
- row = buffer.document.cursor_position_row + 1
- col = buffer.document.cursor_position_col + 1
- lines = len(buffer.text.split("\n"))
- return [
- ("class:status", f" Line {row}/{lines}, Col {col} "),
- ]
-
- status_bar = Window(
- height=1,
- content=FormattedTextControl(get_status),
- style="class:status",
- )
-
- layout = Layout(
- HSplit([
- header,
- editor_window,
- status_bar,
- ])
- )
-
- # Styles
- style = Style.from_dict({
- "header": "bg:#005f87 #ffffff",
- "header.key": "bg:#005f87 #ffff00 bold",
- "status": "bg:#444444 #ffffff",
- })
-
- # Create and run application
- app = Application(
- layout=layout,
- key_bindings=kb,
- style=style,
- full_screen=True,
- mouse_support=True,
- )
-
- app.run()
-
- return result["text"], result["saved"]
-
-
-def edit_file(filepath: str, title: Optional[str] = None) -> bool:
- """
- Edit a file in the full-screen editor.
-
- Args:
- filepath: Path to file
- title: Optional title (defaults to filename)
-
- Returns:
- True if saved, False if cancelled
- """
- from pathlib import Path
-
- path = Path(filepath)
- title = title or path.name
-
- # Load existing content or empty
- if path.exists():
- initial_text = path.read_text()
- else:
- initial_text = ""
-
- # Edit
- edited_text, saved = edit_text(initial_text, title=title, syntax="yaml")
-
- # Save if requested
- if saved and edited_text is not None:
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(edited_text)
- return True
-
- return False
-
-
-# Fallback: use system editor via subprocess
-def edit_with_system_editor(filepath: str) -> bool:
- """
- Edit file using system's default editor ($EDITOR or fallback).
-
- Returns True if file was modified.
- """
- import os
- import subprocess
- from pathlib import Path
-
- path = Path(filepath)
-
- # Get editor from environment
- editor = os.environ.get("EDITOR", os.environ.get("VISUAL", ""))
-
- if not editor:
- # Fallback based on platform
- import platform
- if platform.system() == "Windows":
- editor = "notepad"
- else:
- editor = "nano" # Most likely available
-
- # Get modification time before edit
- mtime_before = path.stat().st_mtime if path.exists() else None
-
- # Open editor
- try:
- subprocess.run([editor, str(path)], check=True)
- except subprocess.CalledProcessError:
- return False
- except FileNotFoundError:
- print(f"Editor not found: {editor}")
- return False
-
- # Check if modified
- if path.exists():
- mtime_after = path.stat().st_mtime
- return mtime_before is None or mtime_after > mtime_before
-
- return False
-
-
-# =============================================================================
-# LSP-Enhanced Editor
-# =============================================================================
-
-
-class LSPEditor:
- """
- Full-screen editor with optional LSP integration.
-
- Provides:
- - Syntax highlighting (YAML, TypeScript/AssemblyScript via Pygments)
- - Autocompletion (LSP when available)
- - Inline diagnostics (LSP when available)
- - Hover documentation on F1 (LSP when available)
-
- Usage:
- # YAML config editing
- editor = LSPEditor(schema_type="listener")
- edited_text, saved = await editor.edit(initial_text, title="greeter.yaml")
-
- # AssemblyScript listener editing
- editor = LSPEditor(syntax="assemblyscript")
- edited_text, saved = await editor.edit(source_code, title="handler.ts")
- """
-
- def __init__(
- self,
- schema_type: Optional[str] = None,
- schema_uri: Optional[str] = None,
- syntax: str = "yaml",
- ):
- """
- Initialize the LSP editor.
-
- Args:
- schema_type: Schema type ("organism" or "listener") for YAML modeline
- schema_uri: Explicit schema URI to use
- syntax: Syntax highlighting ("yaml", "typescript", "assemblyscript", "ts", "as")
- """
- self.schema_type = schema_type
- self.schema_uri = schema_uri
- self.syntax = syntax
- self._lsp_client: Optional["LSPClientType"] = None
- self._lsp_type: Optional[str] = None # "yaml" or "assemblyscript"
- self._diagnostics_text = ""
- self._hover_text = ""
- self._show_hover = False
- self._document_version = 0
-
- async def edit(
- self,
- initial_text: str,
- title: str = "Editor",
- document_uri: Optional[str] = None,
- ) -> Tuple[Optional[str], bool]:
- """
- Open the editor with LSP support.
-
- Args:
- initial_text: Initial content to edit
- title: Title shown in header
- document_uri: URI for LSP (auto-generated if not provided)
-
- Returns:
- (edited_text, saved) - edited_text is None if cancelled
- """
- if not PROMPT_TOOLKIT_AVAILABLE:
- print("Error: prompt_toolkit not installed")
- return None, False
-
- # Determine LSP type based on syntax
- syntax_lower = self.syntax.lower()
- if syntax_lower in ("yaml", "yml"):
- self._lsp_type = "yaml"
- elif syntax_lower in ("typescript", "ts", "assemblyscript", "as"):
- self._lsp_type = "assemblyscript"
- else:
- self._lsp_type = None
-
- # Try to get appropriate LSP client
- try:
- from xml_pipeline.console.lsp import get_lsp_manager, LSPServerType
- manager = get_lsp_manager()
-
- if self._lsp_type == "yaml":
- self._lsp_client = await manager.get_yaml_client()
- elif self._lsp_type == "assemblyscript":
- self._lsp_client = await manager.get_asls_client()
- else:
- self._lsp_client = None
-
- except Exception as e:
- logger.debug(f"LSP not available: {e}")
- self._lsp_client = None
-
- # Generate document URI with appropriate extension
- if document_uri is None:
- ext = ".yaml" if self._lsp_type == "yaml" else ".ts"
- document_uri = f"file:///temp/{title.replace(' ', '_')}{ext}"
-
- # Ensure schema modeline is present (YAML only)
- if self._lsp_type == "yaml":
- initial_text = self._ensure_modeline(initial_text)
-
- # Open document in LSP
- if self._lsp_client:
- await self._lsp_client.did_open(document_uri, initial_text)
-
- try:
- result = await self._run_editor(initial_text, title, document_uri)
- finally:
- # Close document in LSP
- if self._lsp_client:
- await self._lsp_client.did_close(document_uri)
- try:
- from xml_pipeline.console.lsp import get_lsp_manager, LSPServerType
- manager = get_lsp_manager()
- if self._lsp_type == "yaml":
- await manager.release_client(LSPServerType.YAML)
- elif self._lsp_type == "assemblyscript":
- await manager.release_client(LSPServerType.ASSEMBLYSCRIPT)
- except Exception:
- pass
-
- return result
-
- def _ensure_modeline(self, text: str) -> str:
- """Ensure YAML has schema modeline if schema_type is set."""
- if self.schema_type is None:
- return text
-
- modeline = f"# yaml-language-server: $schema=~/.xml-pipeline/schemas/{self.schema_type}.schema.json"
-
- # Check if modeline already exists
- lines = text.split("\n")
- for line in lines[:3]: # Check first 3 lines
- if "yaml-language-server" in line and "$schema" in line:
- return text
-
- # Add modeline at the top
- return modeline + "\n" + text
-
- async def _run_editor(
- self,
- initial_text: str,
- title: str,
- uri: str,
- ) -> Tuple[Optional[str], bool]:
- """Run the editor application."""
- result = {"text": None, "saved": False}
-
- # Create buffer
- buffer = Buffer(multiline=True, name="editor")
- buffer.text = initial_text
-
- # Key bindings
- kb = KeyBindings()
-
- @kb.add("c-s") # Ctrl+S to save
- def save(event):
- result["text"] = buffer.text
- result["saved"] = True
- event.app.exit()
-
- @kb.add("c-q") # Ctrl+Q to quit without saving
- def quit_nosave(event):
- result["text"] = None
- result["saved"] = False
- event.app.exit()
-
- @kb.add("escape") # Escape to quit
- def escape(event):
- result["text"] = None
- result["saved"] = False
- event.app.exit()
-
- @kb.add("c-space") # Ctrl+Space for completion
- async def trigger_completion(event):
- if self._lsp_client:
- doc = buffer.document
- line = doc.cursor_position_row
- col = doc.cursor_position_col
- completions = await self._lsp_client.completion(uri, line, col)
- if completions:
- # Show first completion as hint
- self._diagnostics_text = f"Completions: {', '.join(c.label for c in completions[:5])}"
- event.app.invalidate()
-
- @kb.add("f1") # F1 for hover
- async def show_hover(event):
- if self._lsp_client:
- doc = buffer.document
- line = doc.cursor_position_row
- col = doc.cursor_position_col
- hover = await self._lsp_client.hover(uri, line, col)
- if hover:
- self._hover_text = hover.contents[:200] # Truncate
- self._show_hover = True
- else:
- self._hover_text = ""
- self._show_hover = False
- event.app.invalidate()
-
- @kb.add("escape", filter=Condition(lambda: self._show_hover))
- def hide_hover(event):
- self._show_hover = False
- event.app.invalidate()
-
- @kb.add("c-p") # Ctrl+P for signature help (ASLS only)
- async def show_signature_help(event):
- # Only available for ASLS
- if self._lsp_client and self._lsp_type == "assemblyscript":
- doc = buffer.document
- line = doc.cursor_position_row
- col = doc.cursor_position_col
- try:
- sig_help = await self._lsp_client.signature_help(uri, line, col)
- if sig_help and sig_help.get("signatures"):
- sig = sig_help["signatures"][0]
- label = sig.get("label", "")
- self._hover_text = f"Signature: {label}"
- self._show_hover = True
- else:
- self._hover_text = ""
- self._show_hover = False
- except Exception:
- pass
- event.app.invalidate()
-
- # Syntax highlighting
- lexer = get_lexer_for_syntax(self.syntax)
-
- # Header
- def get_header():
- if self._lsp_client:
- if self._lsp_type == "yaml":
- lsp_status = " [YAML LSP]"
- elif self._lsp_type == "assemblyscript":
- lsp_status = " [ASLS]"
- else:
- lsp_status = " [LSP]"
- else:
- lsp_status = ""
-
- parts = [
- ("class:header", f" {title}{lsp_status} "),
- ("class:header.key", " Ctrl+S"),
- ("class:header", "=Save "),
- ("class:header.key", " Ctrl+Q"),
- ("class:header", "=Quit "),
- ("class:header.key", " F1"),
- ("class:header", "=Hover "),
- ]
-
- # Add Ctrl+P hint for AssemblyScript
- if self._lsp_type == "assemblyscript" and self._lsp_client:
- parts.extend([
- ("class:header.key", " Ctrl+P"),
- ("class:header", "=Sig "),
- ])
-
- return parts
-
- header = Window(
- height=1,
- content=FormattedTextControl(get_header),
- style="class:header",
- )
-
- # Editor window
- editor_window = Window(
- content=BufferControl(
- buffer=buffer,
- lexer=lexer,
- ),
- )
-
- # Status bar
- def get_status():
- row = buffer.document.cursor_position_row + 1
- col = buffer.document.cursor_position_col + 1
- lines = len(buffer.text.split("\n"))
-
- parts = [("class:status", f" Line {row}/{lines}, Col {col} ")]
-
- if self._diagnostics_text:
- parts.append(("class:status.diag", f" | {self._diagnostics_text}"))
-
- return parts
-
- status_bar = Window(
- height=1,
- content=FormattedTextControl(get_status),
- style="class:status",
- )
-
- # Hover popup (shown conditionally)
- def get_hover_text():
- if self._show_hover and self._hover_text:
- return [("class:hover", self._hover_text)]
- return []
-
- hover_window = ConditionalContainer(
- Window(
- height=3,
- content=FormattedTextControl(get_hover_text),
- style="class:hover",
- ),
- filter=Condition(lambda: self._show_hover and bool(self._hover_text)),
- )
-
- # Layout
- layout = Layout(
- HSplit([
- header,
- editor_window,
- hover_window,
- status_bar,
- ])
- )
-
- # Styles
- style = Style.from_dict({
- "header": "bg:#005f87 #ffffff",
- "header.key": "bg:#005f87 #ffff00 bold",
- "status": "bg:#444444 #ffffff",
- "status.diag": "bg:#444444 #ff8800",
- "hover": "bg:#333333 #ffffff italic",
- "diagnostic.error": "bg:#5f0000 #ffffff",
- "diagnostic.warning": "bg:#5f5f00 #ffffff",
- })
-
- # Set up diagnostics callback
- async def on_text_changed(buff):
- if self._lsp_client:
- self._document_version += 1
- diagnostics = await self._lsp_client.did_change(
- uri, buff.text, self._document_version
- )
- if diagnostics:
- errors = sum(1 for d in diagnostics if d.severity == "error")
- warnings = sum(1 for d in diagnostics if d.severity == "warning")
- parts = []
- if errors:
- parts.append(f"{errors} error{'s' if errors > 1 else ''}")
- if warnings:
- parts.append(f"{warnings} warning{'s' if warnings > 1 else ''}")
- self._diagnostics_text = " | ".join(parts) if parts else ""
- else:
- self._diagnostics_text = ""
-
- buffer.on_text_changed += lambda buff: asyncio.create_task(on_text_changed(buff))
-
- # Create and run application
- app: Application = Application(
- layout=layout,
- key_bindings=kb,
- style=style,
- full_screen=True,
- mouse_support=True,
- )
-
- await app.run_async()
-
- return result["text"], result["saved"]
-
-
-async def edit_text_async(
- initial_text: str,
- title: str = "Editor",
- schema_type: Optional[str] = None,
- syntax: str = "yaml",
-) -> Tuple[Optional[str], bool]:
- """
- Async wrapper for LSP-enabled text editing.
-
- Args:
- initial_text: Text to edit
- title: Title shown in header
- schema_type: "organism" or "listener" for YAML schema modeline
- syntax: Syntax highlighting ("yaml", "typescript", "assemblyscript", "ts", "as")
-
- Returns:
- (edited_text, saved) - edited_text is None if cancelled
- """
- editor = LSPEditor(schema_type=schema_type, syntax=syntax)
- return await editor.edit(initial_text, title=title)
-
-
-async def edit_file_async(
- filepath: str,
- title: Optional[str] = None,
- schema_type: Optional[str] = None,
- syntax: Optional[str] = None,
-) -> bool:
- """
- Edit a file with LSP support.
-
- Args:
- filepath: Path to file
- title: Optional title (defaults to filename)
- schema_type: "organism" or "listener" for YAML schema modeline
- syntax: Syntax highlighting (auto-detected from extension if not specified)
-
- Returns:
- True if saved, False if cancelled
- """
- path = Path(filepath)
- title = title or path.name
-
- # Auto-detect syntax from extension if not specified
- if syntax is None:
- syntax = detect_syntax_from_path(path)
-
- # Load existing content or empty
- if path.exists():
- initial_text = path.read_text()
- else:
- initial_text = ""
-
- # Edit
- edited_text, saved = await edit_text_async(
- initial_text,
- title=title,
- schema_type=schema_type,
- syntax=syntax,
- )
-
- # Save if requested
- if saved and edited_text is not None:
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(edited_text)
- return True
-
- return False
-
-
-async def edit_assemblyscript_source(
- filepath: str,
- title: Optional[str] = None,
-) -> bool:
- """
- Edit an AssemblyScript listener source file with ASLS support.
-
- Args:
- filepath: Path to .ts or .as file
- title: Optional title (defaults to filename)
-
- Returns:
- True if saved, False if cancelled
- """
- return await edit_file_async(
- filepath,
- title=title,
- syntax="assemblyscript",
- )
diff --git a/xml_pipeline/console/lsp/__init__.py b/xml_pipeline/console/lsp/__init__.py
deleted file mode 100644
index 2ff9a70..0000000
--- a/xml_pipeline/console/lsp/__init__.py
+++ /dev/null
@@ -1,63 +0,0 @@
-"""
-LSP (Language Server Protocol) integration for the editor.
-
-Provides:
-- YAMLLSPClient: Wrapper for yaml-language-server communication
-- ASLSClient: Wrapper for AssemblyScript language server communication
-- LSPServerManager: Server lifecycle management
-- LSPBridge: Integration with prompt_toolkit editor
-
-Supported Language Servers:
-- yaml-language-server: npm install -g yaml-language-server
-- asls (AssemblyScript): npm install -g assemblyscript-lsp
-"""
-
-from __future__ import annotations
-
-from .client import (
- YAMLLSPClient,
- LSPCompletion,
- LSPDiagnostic,
- LSPHover,
- is_lsp_available,
-)
-from .asls_client import (
- ASLSClient,
- ASLSConfig,
- is_asls_available,
- is_assemblyscript_file,
- ASSEMBLYSCRIPT_EXTENSIONS,
-)
-from .manager import (
- LSPServerManager,
- LSPServerType,
- get_lsp_manager,
- ensure_lsp_stopped,
-)
-from .bridge import (
- LSPCompleter,
- DiagnosticsProcessor,
-)
-
-__all__ = [
- # YAML Client
- "YAMLLSPClient",
- "LSPCompletion",
- "LSPDiagnostic",
- "LSPHover",
- "is_lsp_available",
- # AssemblyScript Client
- "ASLSClient",
- "ASLSConfig",
- "is_asls_available",
- "is_assemblyscript_file",
- "ASSEMBLYSCRIPT_EXTENSIONS",
- # Manager
- "LSPServerManager",
- "LSPServerType",
- "get_lsp_manager",
- "ensure_lsp_stopped",
- # Bridge
- "LSPCompleter",
- "DiagnosticsProcessor",
-]
diff --git a/xml_pipeline/console/lsp/asls_client.py b/xml_pipeline/console/lsp/asls_client.py
deleted file mode 100644
index 30ebde8..0000000
--- a/xml_pipeline/console/lsp/asls_client.py
+++ /dev/null
@@ -1,527 +0,0 @@
-"""
-AssemblyScript Language Server Protocol client.
-
-Wraps communication with asls (AssemblyScript Language Server) for:
-- Autocompletion for AgentServer SDK types
-- Type checking and diagnostics
-- Hover documentation
-
-Install: npm install -g assemblyscript-lsp
-
-Used for editing WASM listener source files written in AssemblyScript.
-"""
-
-from __future__ import annotations
-
-import asyncio
-import shutil
-import logging
-from dataclasses import dataclass
-from pathlib import Path
-from typing import Optional, Any
-
-from .client import (
- LSPCompletion,
- LSPDiagnostic,
- LSPHover,
-)
-
-logger = logging.getLogger(__name__)
-
-
-def _check_asls() -> bool:
- """Check if asls (AssemblyScript Language Server) is installed."""
- return shutil.which("asls") is not None
-
-
-def is_asls_available() -> tuple[bool, str]:
- """
- Check if AssemblyScript LSP support is available.
-
- Returns (available, reason) tuple.
- """
- if not _check_asls():
- return False, "asls not found (npm install -g assemblyscript-lsp)"
-
- return True, "AssemblyScript LSP available"
-
-
-# File extensions handled by ASLS
-ASSEMBLYSCRIPT_EXTENSIONS = {".ts", ".as"}
-
-
-def is_assemblyscript_file(path: str | Path) -> bool:
- """Check if a file should use the AssemblyScript LSP."""
- return Path(path).suffix.lower() in ASSEMBLYSCRIPT_EXTENSIONS
-
-
-@dataclass
-class ASLSConfig:
- """
- Configuration for the AssemblyScript Language Server.
-
- These settings are passed during initialization.
- """
-
- # Path to asconfig.json (AssemblyScript project config)
- asconfig_path: Optional[str] = None
-
- # Path to AgentServer SDK type definitions
- sdk_types_path: Optional[str] = None
-
- # Enable strict null checks
- strict_null_checks: bool = True
-
- # Enable additional diagnostics
- verbose_diagnostics: bool = False
-
-
-class ASLSClient:
- """
- Client for communicating with the AssemblyScript Language Server.
-
- Uses stdio for communication with the language server process.
-
- Usage:
- client = ASLSClient()
- if await client.start():
- await client.did_open(uri, content)
- completions = await client.completion(uri, line, col)
- await client.stop()
- """
-
- def __init__(self, config: Optional[ASLSConfig] = None):
- """
- Initialize the ASLS client.
-
- Args:
- config: Optional ASLS configuration
- """
- self.config = config or ASLSConfig()
- self._process: Optional[asyncio.subprocess.Process] = None
- self._reader_task: Optional[asyncio.Task] = None
- self._request_id = 0
- self._pending_requests: dict[int, asyncio.Future] = {}
- self._diagnostics: dict[str, list[LSPDiagnostic]] = {}
- self._initialized = False
- self._lock = asyncio.Lock()
-
- async def start(self) -> bool:
- """
- Start the AssemblyScript language server.
-
- Returns True if started successfully.
- """
- available, reason = is_asls_available()
- if not available:
- logger.warning(f"ASLS not available: {reason}")
- return False
-
- try:
- self._process = await asyncio.create_subprocess_exec(
- "asls", "--stdio",
- stdin=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
-
- # Start reader task
- self._reader_task = asyncio.create_task(self._read_messages())
-
- # Initialize LSP
- await self._initialize()
- self._initialized = True
- logger.info("AssemblyScript language server started")
- return True
-
- except Exception as e:
- logger.error(f"Failed to start asls: {e}")
- await self.stop()
- return False
-
- async def stop(self) -> None:
- """Stop the language server."""
- self._initialized = False
-
- if self._reader_task:
- self._reader_task.cancel()
- try:
- await self._reader_task
- except asyncio.CancelledError:
- pass
- self._reader_task = None
-
- if self._process:
- self._process.terminate()
- try:
- await asyncio.wait_for(self._process.wait(), timeout=2)
- except asyncio.TimeoutError:
- self._process.kill()
- self._process = None
-
- # Cancel pending requests
- for future in self._pending_requests.values():
- if not future.done():
- future.cancel()
- self._pending_requests.clear()
-
- async def _initialize(self) -> None:
- """Send LSP initialize request."""
- init_options: dict[str, Any] = {}
-
- if self.config.asconfig_path:
- init_options["asconfigPath"] = self.config.asconfig_path
-
- if self.config.sdk_types_path:
- init_options["sdkTypesPath"] = self.config.sdk_types_path
-
- result = await self._request(
- "initialize",
- {
- "processId": None,
- "rootUri": None,
- "capabilities": {
- "textDocument": {
- "completion": {
- "completionItem": {
- "snippetSupport": True,
- "documentationFormat": ["markdown", "plaintext"],
- }
- },
- "hover": {
- "contentFormat": ["markdown", "plaintext"],
- },
- "publishDiagnostics": {
- "relatedInformation": True,
- },
- "signatureHelp": {
- "signatureInformation": {
- "documentationFormat": ["markdown", "plaintext"],
- }
- },
- }
- },
- "initializationOptions": init_options,
- },
- )
- logger.debug(f"ASLS initialized: {result}")
-
- # Send initialized notification
- await self._notify("initialized", {})
-
- async def did_open(self, uri: str, content: str) -> None:
- """Notify server that a document was opened."""
- if not self._initialized:
- return
-
- # Determine language ID based on extension
- language_id = "assemblyscript"
- if uri.endswith(".ts"):
- language_id = "typescript" # ASLS may prefer this
-
- await self._notify(
- "textDocument/didOpen",
- {
- "textDocument": {
- "uri": uri,
- "languageId": language_id,
- "version": 1,
- "text": content,
- }
- },
- )
-
- async def did_change(
- self, uri: str, content: str, version: int = 1
- ) -> list[LSPDiagnostic]:
- """
- Notify server of document change.
-
- Returns current diagnostics for the document.
- """
- if not self._initialized:
- return []
-
- await self._notify(
- "textDocument/didChange",
- {
- "textDocument": {"uri": uri, "version": version},
- "contentChanges": [{"text": content}],
- },
- )
-
- # Wait briefly for diagnostics
- await asyncio.sleep(0.2) # ASLS may need more time than YAML
-
- return self._diagnostics.get(uri, [])
-
- async def did_close(self, uri: str) -> None:
- """Notify server that a document was closed."""
- if not self._initialized:
- return
-
- await self._notify(
- "textDocument/didClose",
- {"textDocument": {"uri": uri}},
- )
-
- # Clear diagnostics
- self._diagnostics.pop(uri, None)
-
- async def completion(
- self, uri: str, line: int, column: int
- ) -> list[LSPCompletion]:
- """
- Request completions at a position.
-
- Args:
- uri: Document URI
- line: 0-indexed line number
- column: 0-indexed column number
-
- Returns list of completion items.
- """
- if not self._initialized:
- return []
-
- try:
- result = await self._request(
- "textDocument/completion",
- {
- "textDocument": {"uri": uri},
- "position": {"line": line, "character": column},
- },
- )
-
- if result is None:
- return []
-
- items = result.get("items", []) if isinstance(result, dict) else result
- return [LSPCompletion.from_lsp(item) for item in items]
-
- except Exception as e:
- logger.debug(f"ASLS completion request failed: {e}")
- return []
-
- async def hover(self, uri: str, line: int, column: int) -> Optional[LSPHover]:
- """
- Request hover information at a position.
-
- Args:
- uri: Document URI
- line: 0-indexed line number
- column: 0-indexed column number
- """
- if not self._initialized:
- return None
-
- try:
- result = await self._request(
- "textDocument/hover",
- {
- "textDocument": {"uri": uri},
- "position": {"line": line, "character": column},
- },
- )
-
- return LSPHover.from_lsp(result) if result else None
-
- except Exception as e:
- logger.debug(f"ASLS hover request failed: {e}")
- return None
-
- async def signature_help(
- self, uri: str, line: int, column: int
- ) -> Optional[dict[str, Any]]:
- """
- Request signature help at a position.
-
- Useful when typing function arguments.
- """
- if not self._initialized:
- return None
-
- try:
- result = await self._request(
- "textDocument/signatureHelp",
- {
- "textDocument": {"uri": uri},
- "position": {"line": line, "character": column},
- },
- )
- return result
-
- except Exception as e:
- logger.debug(f"ASLS signature help request failed: {e}")
- return None
-
- async def go_to_definition(
- self, uri: str, line: int, column: int
- ) -> Optional[list[dict[str, Any]]]:
- """
- Request go-to-definition at a position.
-
- Returns list of location objects.
- """
- if not self._initialized:
- return None
-
- try:
- result = await self._request(
- "textDocument/definition",
- {
- "textDocument": {"uri": uri},
- "position": {"line": line, "character": column},
- },
- )
-
- if result is None:
- return None
-
- # Normalize to list
- if isinstance(result, dict):
- return [result]
- return result
-
- except Exception as e:
- logger.debug(f"ASLS go-to-definition failed: {e}")
- return None
-
- def get_diagnostics(self, uri: str) -> list[LSPDiagnostic]:
- """Get current diagnostics for a document."""
- return self._diagnostics.get(uri, [])
-
- # -------------------------------------------------------------------------
- # LSP Protocol Implementation (shared pattern with YAMLLSPClient)
- # -------------------------------------------------------------------------
-
- async def _request(self, method: str, params: dict[str, Any]) -> Any:
- """Send a request and wait for response."""
- async with self._lock:
- self._request_id += 1
- req_id = self._request_id
-
- message = {
- "jsonrpc": "2.0",
- "id": req_id,
- "method": method,
- "params": params,
- }
-
- future: asyncio.Future = asyncio.Future()
- self._pending_requests[req_id] = future
-
- try:
- await self._send_message(message)
- return await asyncio.wait_for(future, timeout=10.0) # Longer timeout for ASLS
- except asyncio.TimeoutError:
- logger.warning(f"ASLS request timed out: {method}")
- return None
- finally:
- self._pending_requests.pop(req_id, None)
-
- async def _notify(self, method: str, params: dict[str, Any]) -> None:
- """Send a notification (no response expected)."""
- message = {
- "jsonrpc": "2.0",
- "method": method,
- "params": params,
- }
- await self._send_message(message)
-
- async def _send_message(self, message: dict[str, Any]) -> None:
- """Send a JSON-RPC message to the server."""
- if not self._process or not self._process.stdin:
- return
-
- import json
- content = json.dumps(message)
- header = f"Content-Length: {len(content)}\r\n\r\n"
-
- try:
- self._process.stdin.write(header.encode())
- self._process.stdin.write(content.encode())
- await self._process.stdin.drain()
- except (BrokenPipeError, OSError, ConnectionResetError) as e:
- logger.error(f"Failed to send ASLS message: {e}")
-
- async def _read_messages(self) -> None:
- """Read messages from the server."""
- if not self._process or not self._process.stdout:
- return
-
- import json
-
- try:
- while True:
- # Read header
- header = b""
- while b"\r\n\r\n" not in header:
- chunk = await self._process.stdout.read(1)
- if not chunk:
- return # EOF
- header += chunk
-
- # Parse content length
- content_length = 0
- for line in header.decode().split("\r\n"):
- if line.startswith("Content-Length:"):
- content_length = int(line.split(":")[1].strip())
- break
-
- if content_length == 0:
- continue
-
- # Read content
- content = await self._process.stdout.read(content_length)
-
- if not content:
- return
-
- # Parse and handle message
- try:
- message = json.loads(content.decode())
- await self._handle_message(message)
- except json.JSONDecodeError as e:
- logger.error(f"Failed to parse ASLS message: {e}")
-
- except asyncio.CancelledError:
- pass
- except Exception as e:
- logger.error(f"ASLS reader error: {e}")
-
- async def _handle_message(self, message: dict[str, Any]) -> None:
- """Handle an incoming LSP message."""
- if "id" in message and "result" in message:
- # Response to a request
- req_id = message["id"]
- if req_id in self._pending_requests:
- future = self._pending_requests[req_id]
- if not future.done():
- future.set_result(message.get("result"))
-
- elif "id" in message and "error" in message:
- # Error response
- req_id = message["id"]
- if req_id in self._pending_requests:
- future = self._pending_requests[req_id]
- if not future.done():
- error = message["error"]
- future.set_exception(
- Exception(f"ASLS error: {error.get('message', error)}")
- )
-
- elif message.get("method") == "textDocument/publishDiagnostics":
- # Diagnostics notification
- params = message.get("params", {})
- uri = params.get("uri", "")
- diagnostics = [
- LSPDiagnostic.from_lsp(d)
- for d in params.get("diagnostics", [])
- ]
- self._diagnostics[uri] = diagnostics
- logger.debug(f"ASLS: {len(diagnostics)} diagnostics for {uri}")
-
- elif "method" in message:
- # Other notification
- logger.debug(f"ASLS notification: {message.get('method')}")
diff --git a/xml_pipeline/console/lsp/bridge.py b/xml_pipeline/console/lsp/bridge.py
deleted file mode 100644
index 4da9d4a..0000000
--- a/xml_pipeline/console/lsp/bridge.py
+++ /dev/null
@@ -1,314 +0,0 @@
-"""
-Bridge between LSP client and prompt_toolkit.
-
-Provides:
-- LSPCompleter: Async completer for prompt_toolkit using LSP
-- DiagnosticsProcessor: Processes diagnostics for inline display
-"""
-
-from __future__ import annotations
-
-import asyncio
-from dataclasses import dataclass
-from typing import Optional, Iterable, TYPE_CHECKING
-
-if TYPE_CHECKING:
- from .client import YAMLLSPClient, LSPDiagnostic, LSPCompletion
-
-try:
- from prompt_toolkit.completion import Completer, Completion
- from prompt_toolkit.document import Document
- PROMPT_TOOLKIT_AVAILABLE = True
-except ImportError:
- PROMPT_TOOLKIT_AVAILABLE = False
- # Stub classes for type checking
- class Completer: # type: ignore
- pass
- class Completion: # type: ignore
- pass
- class Document: # type: ignore
- pass
-
-
-@dataclass
-class DiagnosticMark:
- """A diagnostic marker for display in the editor."""
-
- line: int
- column: int
- end_column: int
- message: str
- severity: str # error, warning, info, hint
-
- @property
- def is_error(self) -> bool:
- return self.severity == "error"
-
- @property
- def is_warning(self) -> bool:
- return self.severity == "warning"
-
- @property
- def style(self) -> str:
- """Get prompt_toolkit style for this diagnostic."""
- if self.severity == "error":
- return "class:diagnostic.error"
- elif self.severity == "warning":
- return "class:diagnostic.warning"
- elif self.severity == "info":
- return "class:diagnostic.info"
- else:
- return "class:diagnostic.hint"
-
-
-class LSPCompleter(Completer):
- """
- prompt_toolkit completer that uses LSP for suggestions.
-
- Usage:
- completer = LSPCompleter(lsp_client, document_uri)
- buffer = Buffer(completer=completer)
- """
-
- def __init__(
- self,
- client: Optional["YAMLLSPClient"],
- uri: str,
- fallback_completer: Optional[Completer] = None,
- ):
- """
- Initialize the LSP completer.
-
- Args:
- client: LSP client (can be None for fallback-only mode)
- uri: Document URI for LSP requests
- fallback_completer: Fallback when LSP unavailable
- """
- self.client = client
- self.uri = uri
- self.fallback_completer = fallback_completer
- self._cache: dict[tuple[int, int], list["LSPCompletion"]] = {}
- self._cache_version = 0
-
- def invalidate_cache(self) -> None:
- """Invalidate the completion cache."""
- self._cache.clear()
- self._cache_version += 1
-
- def get_completions(
- self,
- document: Document,
- complete_event,
- ) -> Iterable[Completion]:
- """
- Get completions for the current document position.
-
- This is called synchronously by prompt_toolkit.
- We use a cached result if available, otherwise
- return nothing (async completions handled separately).
- """
- if not PROMPT_TOOLKIT_AVAILABLE:
- return
-
- # Get current position
- line = document.cursor_position_row
- col = document.cursor_position_col
-
- # Check cache
- cache_key = (line, col)
- if cache_key in self._cache:
- completions = self._cache[cache_key]
- for item in completions:
- yield Completion(
- text=item.insert_text or item.label,
- start_position=-len(self._get_word_before_cursor(document)),
- display=item.label,
- display_meta=item.detail or item.kind,
- )
- return
-
- # Fallback to basic completer
- if self.fallback_completer:
- yield from self.fallback_completer.get_completions(
- document, complete_event
- )
-
- async def get_completions_async(
- self,
- document: Document,
- ) -> list["LSPCompletion"]:
- """
- Get completions asynchronously from LSP.
-
- Call this when Ctrl+Space is pressed.
- """
- if self.client is None:
- return []
-
- line = document.cursor_position_row
- col = document.cursor_position_col
-
- # Request from LSP
- completions = await self.client.completion(self.uri, line, col)
-
- # Cache result
- self._cache[(line, col)] = completions
-
- return completions
-
- def _get_word_before_cursor(self, document: Document) -> str:
- """Get the word being typed before cursor."""
- text = document.text_before_cursor
- if not text:
- return ""
-
- # Find word boundary
- i = len(text) - 1
- while i >= 0 and (text[i].isalnum() or text[i] in "_-"):
- i -= 1
-
- return text[i + 1:]
-
-
-class DiagnosticsProcessor:
- """
- Processes LSP diagnostics for display in the editor.
-
- Converts LSP diagnostics into markers that can be
- displayed inline in the prompt_toolkit editor.
- """
-
- def __init__(self, client: Optional["YAMLLSPClient"], uri: str):
- self.client = client
- self.uri = uri
- self._marks: list[DiagnosticMark] = []
-
- def get_marks(self) -> list[DiagnosticMark]:
- """Get current diagnostic marks."""
- return self._marks
-
- def get_marks_for_line(self, line: int) -> list[DiagnosticMark]:
- """Get diagnostic marks for a specific line."""
- return [m for m in self._marks if m.line == line]
-
- def has_errors(self) -> bool:
- """Check if there are any error-level diagnostics."""
- return any(m.is_error for m in self._marks)
-
- def has_warnings(self) -> bool:
- """Check if there are any warning-level diagnostics."""
- return any(m.is_warning for m in self._marks)
-
- def get_error_count(self) -> int:
- """Get number of errors."""
- return sum(1 for m in self._marks if m.is_error)
-
- def get_warning_count(self) -> int:
- """Get number of warnings."""
- return sum(1 for m in self._marks if m.is_warning)
-
- async def update(self, content: str, version: int = 1) -> list[DiagnosticMark]:
- """
- Update diagnostics by sending content to LSP.
-
- Returns the new list of diagnostic marks.
- """
- if self.client is None:
- self._marks = []
- return []
-
- diagnostics = await self.client.did_change(self.uri, content, version)
-
- self._marks = [
- DiagnosticMark(
- line=d.line,
- column=d.column,
- end_column=d.end_column,
- message=d.message,
- severity=d.severity,
- )
- for d in diagnostics
- ]
-
- return self._marks
-
- def format_status(self) -> str:
- """Format diagnostics as status bar text."""
- errors = self.get_error_count()
- warnings = self.get_warning_count()
-
- if errors == 0 and warnings == 0:
- return ""
-
- parts = []
- if errors > 0:
- parts.append(f"{errors} error{'s' if errors > 1 else ''}")
- if warnings > 0:
- parts.append(f"{warnings} warning{'s' if warnings > 1 else ''}")
-
- return " | ".join(parts)
-
- def format_messages(self, max_lines: int = 3) -> list[str]:
- """Format diagnostic messages for display."""
- messages = []
-
- for mark in self._marks[:max_lines]:
- prefix = "E" if mark.is_error else "W"
- messages.append(f"[{prefix}] Line {mark.line + 1}: {mark.message}")
-
- remaining = len(self._marks) - max_lines
- if remaining > 0:
- messages.append(f"... and {remaining} more")
-
- return messages
-
-
-class HoverPopup:
- """
- Manages hover information display.
-
- Shows documentation when hovering over a field
- or pressing F1 on a position.
- """
-
- def __init__(self, client: Optional["YAMLLSPClient"], uri: str):
- self.client = client
- self.uri = uri
- self._current_hover: Optional[str] = None
- self._hover_position: Optional[tuple[int, int]] = None
-
- async def get_hover(self, line: int, col: int) -> Optional[str]:
- """
- Get hover information for a position.
-
- Returns formatted hover text or None.
- """
- if self.client is None:
- return None
-
- hover = await self.client.hover(self.uri, line, col)
-
- if hover is None:
- self._current_hover = None
- self._hover_position = None
- return None
-
- self._current_hover = hover.contents
- self._hover_position = (line, col)
-
- return hover.contents
-
- def clear(self) -> None:
- """Clear current hover."""
- self._current_hover = None
- self._hover_position = None
-
- @property
- def has_hover(self) -> bool:
- """Check if there's an active hover."""
- return self._current_hover is not None
-
- @property
- def text(self) -> str:
- """Get current hover text."""
- return self._current_hover or ""
diff --git a/xml_pipeline/console/lsp/client.py b/xml_pipeline/console/lsp/client.py
deleted file mode 100644
index 2efceb4..0000000
--- a/xml_pipeline/console/lsp/client.py
+++ /dev/null
@@ -1,538 +0,0 @@
-"""
-YAML Language Server Protocol client.
-
-Wraps communication with yaml-language-server for:
-- Autocompletion
-- Diagnostics (validation errors)
-- Hover information
-"""
-
-from __future__ import annotations
-
-import asyncio
-import json
-import subprocess
-import shutil
-from dataclasses import dataclass, field
-from pathlib import Path
-from typing import Optional, Any
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-# Check for lsp-client availability
-def _check_lsp_client() -> bool:
- """Check if lsp-client package is available."""
- try:
- import lsp_client # noqa: F401
- return True
- except ImportError:
- return False
-
-
-def _check_yaml_language_server() -> bool:
- """Check if yaml-language-server is installed."""
- return shutil.which("yaml-language-server") is not None
-
-
-def is_lsp_available() -> tuple[bool, str]:
- """
- Check if LSP support is available.
-
- Returns (available, reason) tuple.
- """
- if not _check_lsp_client():
- return False, "lsp-client package not installed (pip install lsp-client)"
-
- if not _check_yaml_language_server():
- return False, "yaml-language-server not found (npm install -g yaml-language-server)"
-
- return True, "LSP available"
-
-
-@dataclass
-class LSPCompletion:
- """Normalized completion item from LSP."""
-
- label: str
- kind: str = "text" # text, keyword, property, value, snippet
- detail: str = ""
- documentation: str = ""
- insert_text: str = ""
- sort_text: str = ""
-
- @classmethod
- def from_lsp(cls, item: dict[str, Any]) -> "LSPCompletion":
- """Create from LSP CompletionItem."""
- kind_map = {
- 1: "text",
- 2: "method",
- 3: "function",
- 5: "field",
- 6: "variable",
- 9: "module",
- 10: "property",
- 12: "value",
- 14: "keyword",
- 15: "snippet",
- }
-
- return cls(
- label=item.get("label", ""),
- kind=kind_map.get(item.get("kind", 1), "text"),
- detail=item.get("detail", ""),
- documentation=_extract_documentation(item.get("documentation")),
- insert_text=item.get("insertText", item.get("label", "")),
- sort_text=item.get("sortText", item.get("label", "")),
- )
-
-
-@dataclass
-class LSPDiagnostic:
- """Normalized diagnostic from LSP."""
-
- line: int
- column: int
- end_line: int
- end_column: int
- message: str
- severity: str = "error" # error, warning, info, hint
- source: str = "yaml-language-server"
-
- @classmethod
- def from_lsp(cls, diag: dict[str, Any]) -> "LSPDiagnostic":
- """Create from LSP Diagnostic."""
- severity_map = {1: "error", 2: "warning", 3: "info", 4: "hint"}
-
- range_data = diag.get("range", {})
- start = range_data.get("start", {})
- end = range_data.get("end", {})
-
- return cls(
- line=start.get("line", 0),
- column=start.get("character", 0),
- end_line=end.get("line", 0),
- end_column=end.get("character", 0),
- message=diag.get("message", ""),
- severity=severity_map.get(diag.get("severity", 1), "error"),
- source=diag.get("source", "yaml-language-server"),
- )
-
-
-@dataclass
-class LSPHover:
- """Normalized hover information from LSP."""
-
- contents: str
- range_start_line: Optional[int] = None
- range_start_col: Optional[int] = None
-
- @classmethod
- def from_lsp(cls, hover: dict[str, Any]) -> Optional["LSPHover"]:
- """Create from LSP Hover response."""
- if not hover:
- return None
-
- contents = hover.get("contents")
- if isinstance(contents, str):
- text = contents
- elif isinstance(contents, dict):
- text = contents.get("value", str(contents))
- elif isinstance(contents, list):
- text = "\n".join(
- c.get("value", str(c)) if isinstance(c, dict) else str(c)
- for c in contents
- )
- else:
- return None
-
- range_data = hover.get("range", {})
- start = range_data.get("start", {})
-
- return cls(
- contents=text,
- range_start_line=start.get("line"),
- range_start_col=start.get("character"),
- )
-
-
-def _extract_documentation(doc: Any) -> str:
- """Extract documentation string from LSP documentation field."""
- if doc is None:
- return ""
- if isinstance(doc, str):
- return doc
- if isinstance(doc, dict):
- return doc.get("value", "")
- return str(doc)
-
-
-class YAMLLSPClient:
- """
- Client for communicating with yaml-language-server.
-
- Uses stdio for communication with the language server process.
- """
-
- def __init__(self, schema_uri: Optional[str] = None):
- """
- Initialize the LSP client.
-
- Args:
- schema_uri: Default schema URI for YAML files
- """
- self.schema_uri = schema_uri
- self._process: Optional[subprocess.Popen] = None
- self._reader_task: Optional[asyncio.Task] = None
- self._request_id = 0
- self._pending_requests: dict[int, asyncio.Future] = {}
- self._diagnostics: dict[str, list[LSPDiagnostic]] = {}
- self._initialized = False
- self._lock = asyncio.Lock()
-
- async def start(self) -> bool:
- """
- Start the language server.
-
- Returns True if started successfully.
- """
- available, reason = is_lsp_available()
- if not available:
- logger.warning(f"LSP not available: {reason}")
- return False
-
- try:
- self._process = subprocess.Popen(
- ["yaml-language-server", "--stdio"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
-
- # Start reader task
- self._reader_task = asyncio.create_task(self._read_messages())
-
- # Initialize LSP
- await self._initialize()
- self._initialized = True
- return True
-
- except Exception as e:
- logger.error(f"Failed to start yaml-language-server: {e}")
- await self.stop()
- return False
-
- async def stop(self) -> None:
- """Stop the language server."""
- self._initialized = False
-
- if self._reader_task:
- self._reader_task.cancel()
- try:
- await self._reader_task
- except asyncio.CancelledError:
- pass
- self._reader_task = None
-
- if self._process:
- self._process.terminate()
- try:
- self._process.wait(timeout=2)
- except subprocess.TimeoutExpired:
- self._process.kill()
- self._process = None
-
- # Cancel pending requests
- for future in self._pending_requests.values():
- if not future.done():
- future.cancel()
- self._pending_requests.clear()
-
- async def _initialize(self) -> None:
- """Send LSP initialize request."""
- result = await self._request(
- "initialize",
- {
- "processId": None,
- "rootUri": None,
- "capabilities": {
- "textDocument": {
- "completion": {
- "completionItem": {
- "snippetSupport": True,
- "documentationFormat": ["markdown", "plaintext"],
- }
- },
- "hover": {
- "contentFormat": ["markdown", "plaintext"],
- },
- "publishDiagnostics": {},
- }
- },
- "initializationOptions": {
- "yaml": {
- "validate": True,
- "hover": True,
- "completion": True,
- "schemas": {},
- }
- },
- },
- )
- logger.debug(f"LSP initialized: {result}")
-
- # Send initialized notification
- await self._notify("initialized", {})
-
- async def did_open(self, uri: str, content: str) -> None:
- """Notify server that a document was opened."""
- if not self._initialized:
- return
-
- await self._notify(
- "textDocument/didOpen",
- {
- "textDocument": {
- "uri": uri,
- "languageId": "yaml",
- "version": 1,
- "text": content,
- }
- },
- )
-
- async def did_change(self, uri: str, content: str, version: int = 1) -> list[LSPDiagnostic]:
- """
- Notify server of document change.
-
- Returns current diagnostics for the document.
- """
- if not self._initialized:
- return []
-
- await self._notify(
- "textDocument/didChange",
- {
- "textDocument": {"uri": uri, "version": version},
- "contentChanges": [{"text": content}],
- },
- )
-
- # Wait briefly for diagnostics
- await asyncio.sleep(0.1)
-
- return self._diagnostics.get(uri, [])
-
- async def did_close(self, uri: str) -> None:
- """Notify server that a document was closed."""
- if not self._initialized:
- return
-
- await self._notify(
- "textDocument/didClose",
- {"textDocument": {"uri": uri}},
- )
-
- # Clear diagnostics
- self._diagnostics.pop(uri, None)
-
- async def completion(
- self, uri: str, line: int, column: int
- ) -> list[LSPCompletion]:
- """
- Request completions at a position.
-
- Args:
- uri: Document URI
- line: 0-indexed line number
- column: 0-indexed column number
-
- Returns list of completion items.
- """
- if not self._initialized:
- return []
-
- try:
- result = await self._request(
- "textDocument/completion",
- {
- "textDocument": {"uri": uri},
- "position": {"line": line, "character": column},
- },
- )
-
- if result is None:
- return []
-
- items = result.get("items", []) if isinstance(result, dict) else result
- return [LSPCompletion.from_lsp(item) for item in items]
-
- except Exception as e:
- logger.debug(f"Completion request failed: {e}")
- return []
-
- async def hover(self, uri: str, line: int, column: int) -> Optional[LSPHover]:
- """
- Request hover information at a position.
-
- Args:
- uri: Document URI
- line: 0-indexed line number
- column: 0-indexed column number
- """
- if not self._initialized:
- return None
-
- try:
- result = await self._request(
- "textDocument/hover",
- {
- "textDocument": {"uri": uri},
- "position": {"line": line, "character": column},
- },
- )
-
- return LSPHover.from_lsp(result) if result else None
-
- except Exception as e:
- logger.debug(f"Hover request failed: {e}")
- return None
-
- def get_diagnostics(self, uri: str) -> list[LSPDiagnostic]:
- """Get current diagnostics for a document."""
- return self._diagnostics.get(uri, [])
-
- async def _request(self, method: str, params: dict[str, Any]) -> Any:
- """Send a request and wait for response."""
- async with self._lock:
- self._request_id += 1
- req_id = self._request_id
-
- message = {
- "jsonrpc": "2.0",
- "id": req_id,
- "method": method,
- "params": params,
- }
-
- future: asyncio.Future = asyncio.Future()
- self._pending_requests[req_id] = future
-
- try:
- await self._send_message(message)
- return await asyncio.wait_for(future, timeout=5.0)
- except asyncio.TimeoutError:
- logger.warning(f"LSP request timed out: {method}")
- return None
- finally:
- self._pending_requests.pop(req_id, None)
-
- async def _notify(self, method: str, params: dict[str, Any]) -> None:
- """Send a notification (no response expected)."""
- message = {
- "jsonrpc": "2.0",
- "method": method,
- "params": params,
- }
- await self._send_message(message)
-
- async def _send_message(self, message: dict[str, Any]) -> None:
- """Send a JSON-RPC message to the server."""
- if not self._process or not self._process.stdin:
- return
-
- content = json.dumps(message)
- header = f"Content-Length: {len(content)}\r\n\r\n"
-
- try:
- self._process.stdin.write(header.encode())
- self._process.stdin.write(content.encode())
- self._process.stdin.flush()
- except (BrokenPipeError, OSError) as e:
- logger.error(f"Failed to send LSP message: {e}")
-
- async def _read_messages(self) -> None:
- """Read messages from the server."""
- if not self._process or not self._process.stdout:
- return
-
- loop = asyncio.get_event_loop()
-
- try:
- while True:
- # Read header
- header = b""
- while b"\r\n\r\n" not in header:
- chunk = await loop.run_in_executor(
- None, self._process.stdout.read, 1
- )
- if not chunk:
- return # EOF
- header += chunk
-
- # Parse content length
- content_length = 0
- for line in header.decode().split("\r\n"):
- if line.startswith("Content-Length:"):
- content_length = int(line.split(":")[1].strip())
- break
-
- if content_length == 0:
- continue
-
- # Read content
- content = await loop.run_in_executor(
- None, self._process.stdout.read, content_length
- )
-
- if not content:
- return
-
- # Parse and handle message
- try:
- message = json.loads(content.decode())
- await self._handle_message(message)
- except json.JSONDecodeError as e:
- logger.error(f"Failed to parse LSP message: {e}")
-
- except asyncio.CancelledError:
- pass
- except Exception as e:
- logger.error(f"LSP reader error: {e}")
-
- async def _handle_message(self, message: dict[str, Any]) -> None:
- """Handle an incoming LSP message."""
- if "id" in message and "result" in message:
- # Response to a request
- req_id = message["id"]
- if req_id in self._pending_requests:
- future = self._pending_requests[req_id]
- if not future.done():
- future.set_result(message.get("result"))
-
- elif "id" in message and "error" in message:
- # Error response
- req_id = message["id"]
- if req_id in self._pending_requests:
- future = self._pending_requests[req_id]
- if not future.done():
- error = message["error"]
- future.set_exception(
- Exception(f"LSP error: {error.get('message', error)}")
- )
-
- elif message.get("method") == "textDocument/publishDiagnostics":
- # Diagnostics notification
- params = message.get("params", {})
- uri = params.get("uri", "")
- diagnostics = [
- LSPDiagnostic.from_lsp(d)
- for d in params.get("diagnostics", [])
- ]
- self._diagnostics[uri] = diagnostics
- logger.debug(f"Received {len(diagnostics)} diagnostics for {uri}")
-
- elif "method" in message:
- # Other notification
- logger.debug(f"LSP notification: {message.get('method')}")
diff --git a/xml_pipeline/console/lsp/manager.py b/xml_pipeline/console/lsp/manager.py
deleted file mode 100644
index 2882969..0000000
--- a/xml_pipeline/console/lsp/manager.py
+++ /dev/null
@@ -1,211 +0,0 @@
-"""
-LSP Server lifecycle manager.
-
-Manages language server instances that can be shared across
-multiple editor sessions. Supports multiple language servers:
-- yaml-language-server (for config files)
-- asls (for AssemblyScript listener source)
-"""
-
-from __future__ import annotations
-
-import asyncio
-import logging
-from enum import Enum
-from typing import Optional, Union
-
-from .client import YAMLLSPClient, is_lsp_available
-from .asls_client import ASLSClient, ASLSConfig, is_asls_available
-
-logger = logging.getLogger(__name__)
-
-
-class LSPServerType(Enum):
- """Supported language server types."""
- YAML = "yaml"
- ASSEMBLYSCRIPT = "assemblyscript"
-
-
-# Type alias for any LSP client
-LSPClient = Union[YAMLLSPClient, ASLSClient]
-
-
-class LSPServerManager:
- """
- Manages the lifecycle of LSP servers.
-
- Provides singleton client instances that start on first use
- and stop when explicitly requested or when the process exits.
-
- Supports multiple language servers running concurrently.
- """
-
- def __init__(self):
- self._clients: dict[LSPServerType, LSPClient] = {}
- self._ref_counts: dict[LSPServerType, int] = {}
- self._lock = asyncio.Lock()
-
- def is_running(self, server_type: LSPServerType = LSPServerType.YAML) -> bool:
- """Check if a specific LSP server is running."""
- client = self._clients.get(server_type)
- return client is not None and client._initialized
-
- async def get_client(
- self,
- server_type: LSPServerType = LSPServerType.YAML,
- asls_config: Optional[ASLSConfig] = None,
- ) -> Optional[LSPClient]:
- """
- Get an LSP client, starting the server if needed.
-
- Args:
- server_type: Which language server to get
- asls_config: Configuration for ASLS (only used if server_type is ASSEMBLYSCRIPT)
-
- Returns None if the requested LSP is not available.
- """
- async with self._lock:
- # Check if already running
- if server_type in self._clients:
- client = self._clients[server_type]
- if client._initialized:
- self._ref_counts[server_type] = self._ref_counts.get(server_type, 0) + 1
- return client
-
- # Start the appropriate server
- if server_type == LSPServerType.YAML:
- return await self._start_yaml_server()
- elif server_type == LSPServerType.ASSEMBLYSCRIPT:
- return await self._start_asls_server(asls_config)
- else:
- logger.error(f"Unknown LSP server type: {server_type}")
- return None
-
- async def _start_yaml_server(self) -> Optional[YAMLLSPClient]:
- """Start the YAML language server."""
- available, reason = is_lsp_available()
- if not available:
- logger.info(f"YAML LSP not available: {reason}")
- return None
-
- client = YAMLLSPClient()
- success = await client.start()
-
- if success:
- self._clients[LSPServerType.YAML] = client
- self._ref_counts[LSPServerType.YAML] = 1
- logger.info("yaml-language-server started")
- return client
- else:
- return None
-
- async def _start_asls_server(
- self, config: Optional[ASLSConfig] = None
- ) -> Optional[ASLSClient]:
- """Start the AssemblyScript language server."""
- available, reason = is_asls_available()
- if not available:
- logger.info(f"ASLS not available: {reason}")
- return None
-
- client = ASLSClient(config=config)
- success = await client.start()
-
- if success:
- self._clients[LSPServerType.ASSEMBLYSCRIPT] = client
- self._ref_counts[LSPServerType.ASSEMBLYSCRIPT] = 1
- logger.info("AssemblyScript language server started")
- return client
- else:
- return None
-
- async def release_client(
- self, server_type: LSPServerType = LSPServerType.YAML
- ) -> None:
- """
- Release a reference to a client.
-
- Stops the server when the last reference is released.
- """
- async with self._lock:
- if server_type not in self._ref_counts:
- return
-
- self._ref_counts[server_type] -= 1
-
- if self._ref_counts[server_type] <= 0:
- client = self._clients.pop(server_type, None)
- self._ref_counts.pop(server_type, None)
-
- if client is not None:
- await client.stop()
- logger.info(f"{server_type.value} language server stopped")
-
- async def stop(self, server_type: Optional[LSPServerType] = None) -> None:
- """
- Force stop LSP server(s).
-
- Args:
- server_type: Specific server to stop, or None to stop all
- """
- async with self._lock:
- if server_type is not None:
- # Stop specific server
- client = self._clients.pop(server_type, None)
- self._ref_counts.pop(server_type, None)
- if client is not None:
- await client.stop()
- logger.info(f"{server_type.value} language server stopped (forced)")
- else:
- # Stop all servers
- for st, client in list(self._clients.items()):
- await client.stop()
- logger.info(f"{st.value} language server stopped (forced)")
- self._clients.clear()
- self._ref_counts.clear()
-
- async def stop_all(self) -> None:
- """Force stop all LSP servers."""
- await self.stop(None)
-
- # Convenience methods for YAML (backwards compatible)
-
- async def get_yaml_client(self) -> Optional[YAMLLSPClient]:
- """Get YAML LSP client (convenience method)."""
- client = await self.get_client(LSPServerType.YAML)
- return client if isinstance(client, YAMLLSPClient) else None
-
- async def get_asls_client(
- self, config: Optional[ASLSConfig] = None
- ) -> Optional[ASLSClient]:
- """Get AssemblyScript LSP client (convenience method)."""
- client = await self.get_client(LSPServerType.ASSEMBLYSCRIPT, asls_config=config)
- return client if isinstance(client, ASLSClient) else None
-
- # Context manager for YAML (backwards compatible)
-
- async def __aenter__(self) -> Optional[YAMLLSPClient]:
- """Context manager entry - get YAML client."""
- return await self.get_yaml_client()
-
- async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
- """Context manager exit - release YAML client."""
- await self.release_client(LSPServerType.YAML)
-
-
-# Global singleton
-_manager: Optional[LSPServerManager] = None
-
-
-def get_lsp_manager() -> LSPServerManager:
- """Get the global LSP server manager."""
- global _manager
- if _manager is None:
- _manager = LSPServerManager()
- return _manager
-
-
-async def ensure_lsp_stopped() -> None:
- """Ensure all LSP servers are stopped. Call on application shutdown."""
- if _manager is not None:
- await _manager.stop_all()
diff --git a/xml_pipeline/console/secure_console.py b/xml_pipeline/console/secure_console.py
deleted file mode 100644
index 910f46a..0000000
--- a/xml_pipeline/console/secure_console.py
+++ /dev/null
@@ -1,980 +0,0 @@
-"""
-secure_console.py — Password-protected console for privileged operations.
-
-The console is the sole privileged interface to the organism. Privileged
-operations are only accessible via local keyboard input, never over the network.
-
-Features:
-- Password protection with Argon2id hashing
-- Protected commands require password re-entry
-- Attach/detach model with idle timeout
-- Integration with context buffer for /monitor
-
-Security model:
-- Keyboard = Local = Trusted
-- No network port for privileged operations
-- Password hash stored in ~/.xml-pipeline/console.key (mode 600)
-"""
-
-from __future__ import annotations
-
-import asyncio
-import getpass
-import os
-import stat
-import sys
-from datetime import datetime, timezone
-from pathlib import Path
-from typing import TYPE_CHECKING, Optional, Callable, Awaitable
-
-import yaml
-from argon2 import PasswordHasher
-from argon2.exceptions import VerifyMismatchError
-
-# prompt_toolkit may not work in all terminals (e.g., Git Bash on Windows)
-# We provide a fallback to simple input()
-try:
- from prompt_toolkit import PromptSession
- from prompt_toolkit.history import FileHistory
- from prompt_toolkit.patch_stdout import patch_stdout
- PROMPT_TOOLKIT_AVAILABLE = True
-except ImportError:
- PROMPT_TOOLKIT_AVAILABLE = False
-
-if TYPE_CHECKING:
- from xml_pipeline.message_bus.stream_pump import StreamPump
-
-
-# ============================================================================
-# Constants
-# ============================================================================
-
-CONFIG_DIR = Path.home() / ".xml-pipeline"
-KEY_FILE = CONFIG_DIR / "console.key"
-HISTORY_FILE = CONFIG_DIR / "history"
-
-# Commands that require password re-entry
-PROTECTED_COMMANDS = {"restart", "kill", "pause", "resume"}
-
-# Idle timeout before auto-detach (seconds, 0 = disabled)
-DEFAULT_IDLE_TIMEOUT = 30 * 60 # 30 minutes
-
-
-# ============================================================================
-# ANSI Colors
-# ============================================================================
-
-class Colors:
- RESET = "\033[0m"
- BOLD = "\033[1m"
- DIM = "\033[2m"
- RED = "\033[31m"
- GREEN = "\033[32m"
- YELLOW = "\033[33m"
- BLUE = "\033[34m"
- MAGENTA = "\033[35m"
- CYAN = "\033[36m"
-
-
-def cprint(text: str, color: str = Colors.RESET):
- """Print with ANSI color."""
- try:
- print(f"{color}{text}{Colors.RESET}")
- except UnicodeEncodeError:
- print(text)
-
-
-# ============================================================================
-# Password Management
-# ============================================================================
-
-class PasswordManager:
- """Manages password hashing and verification."""
-
- def __init__(self, key_path: Path = KEY_FILE):
- self.key_path = key_path
- self.hasher = PasswordHasher()
- self._hash: Optional[str] = None
-
- def ensure_config_dir(self):
- """Create config directory if needed."""
- self.key_path.parent.mkdir(parents=True, exist_ok=True)
-
- def has_password(self) -> bool:
- """Check if password has been set."""
- return self.key_path.exists()
-
- def load_hash(self) -> Optional[str]:
- """Load password hash from file."""
- if not self.key_path.exists():
- return None
- try:
- with open(self.key_path) as f:
- data = yaml.safe_load(f)
- self._hash = data.get("hash")
- return self._hash
- except Exception:
- return None
-
- def save_hash(self, password: str) -> None:
- """Hash password and save to file."""
- self.ensure_config_dir()
-
- hash_value = self.hasher.hash(password)
- data = {
- "algorithm": "argon2id",
- "hash": hash_value,
- "created": datetime.now(timezone.utc).isoformat(),
- }
-
- with open(self.key_path, "w") as f:
- yaml.dump(data, f)
-
- # Set file permissions to 600 (owner read/write only)
- if sys.platform != "win32":
- os.chmod(self.key_path, stat.S_IRUSR | stat.S_IWUSR)
-
- self._hash = hash_value
-
- def verify(self, password: str) -> bool:
- """Verify password against stored hash."""
- if self._hash is None:
- self.load_hash()
- if self._hash is None:
- return False
- try:
- self.hasher.verify(self._hash, password)
- return True
- except VerifyMismatchError:
- return False
-
-
-# ============================================================================
-# Secure Console
-# ============================================================================
-
-class SecureConsole:
- """
- Password-protected console with privileged command support.
-
- The console can be in one of two states:
- - Attached: Full access, can send messages and run commands
- - Detached: Limited access, only /commands work, @messages rejected
- """
-
- def __init__(
- self,
- pump: StreamPump,
- idle_timeout: int = DEFAULT_IDLE_TIMEOUT,
- ):
- self.pump = pump
- self.idle_timeout = idle_timeout
- self.password_mgr = PasswordManager()
-
- # State
- self.authenticated = False
- self.attached = True # Start attached
- self.running = False
-
- # prompt_toolkit session (may be None if fallback mode)
- self.session: Optional[PromptSession] = None
- self.use_simple_input = False # Fallback mode flag
-
- # ------------------------------------------------------------------
- # Startup
- # ------------------------------------------------------------------
-
- def _init_prompt_session(self) -> None:
- """Initialize prompt session (with fallback)."""
- if self.session is not None:
- return # Already initialized
-
- self.password_mgr.ensure_config_dir()
- if PROMPT_TOOLKIT_AVAILABLE:
- try:
- self.session = PromptSession(
- history=FileHistory(str(HISTORY_FILE))
- )
- except Exception as e:
- cprint(f"Note: Using simple input mode ({type(e).__name__})", Colors.DIM)
- self.use_simple_input = True
- else:
- self.use_simple_input = True
-
- async def authenticate(self) -> bool:
- """
- Authenticate user (call before starting pump).
-
- Returns True if authenticated, False otherwise.
- """
- self._init_prompt_session()
-
- # Ensure password is set up
- if not await self._ensure_password():
- return False
-
- # Authenticate
- if not await self._authenticate():
- cprint("Authentication failed.", Colors.RED)
- return False
-
- self.authenticated = True
- return True
-
- async def run_command_loop(self) -> None:
- """
- Run the command loop (call after authentication).
-
- This shows the banner and enters the main input loop.
- """
- if not self.authenticated:
- cprint("Not authenticated. Call authenticate() first.", Colors.RED)
- return
-
- self.running = True
- self._print_banner()
- await self._main_loop()
-
- async def run(self) -> None:
- """Main console loop (combines authenticate + run_command_loop)."""
- if await self.authenticate():
- await self.run_command_loop()
-
- async def _ensure_password(self) -> bool:
- """Ensure password is set up (first run setup)."""
- if self.password_mgr.has_password():
- return True
-
- cprint("\n" + "=" * 50, Colors.CYAN)
- cprint(" First-time setup: Create console password", Colors.CYAN)
- cprint("=" * 50 + "\n", Colors.CYAN)
-
- cprint("This password protects privileged operations.", Colors.DIM)
- cprint("It will be required at startup and for protected commands.\n", Colors.DIM)
-
- # Get password with confirmation
- while True:
- password = await self._prompt_password("New password: ")
- if not password:
- cprint("Password cannot be empty.", Colors.RED)
- continue
-
- if len(password) < 4:
- cprint("Password must be at least 4 characters.", Colors.RED)
- continue
-
- confirm = await self._prompt_password("Confirm password: ")
- if password != confirm:
- cprint("Passwords do not match.", Colors.RED)
- continue
-
- break
-
- self.password_mgr.save_hash(password)
- cprint("\nPassword set successfully.\n", Colors.GREEN)
- return True
-
- async def _authenticate(self) -> bool:
- """Authenticate user at startup."""
- self.password_mgr.load_hash()
-
- for attempt in range(3):
- password = await self._prompt_password("Password: ")
- if self.password_mgr.verify(password):
- self.authenticated = True
- return True
- cprint("Incorrect password.", Colors.RED)
-
- return False
-
- async def _prompt_password(self, prompt: str) -> str:
- """Prompt for password (hidden input when possible)."""
- if self.use_simple_input:
- # Simple input mode: use visible input (getpass unreliable in some terminals)
- cprint("(password will be visible)", Colors.DIM)
- print(prompt, end="", flush=True)
- loop = asyncio.get_event_loop()
- try:
- line = await loop.run_in_executor(None, sys.stdin.readline)
- return line.strip() if line else ""
- except (EOFError, KeyboardInterrupt):
- return ""
- else:
- # Use prompt_toolkit for password input (hidden)
- try:
- session = PromptSession()
- return await session.prompt_async(prompt, is_password=True)
- except (EOFError, KeyboardInterrupt):
- return ""
- except Exception:
- # Fallback if prompt_toolkit fails mid-session
- self.use_simple_input = True
- return await self._prompt_password(prompt)
-
- # ------------------------------------------------------------------
- # Main Loop
- # ------------------------------------------------------------------
-
- async def _main_loop(self) -> None:
- """Main input loop."""
- while self.running:
- try:
- # Determine prompt based on attach state
- prompt_str = "> " if self.attached else "# "
-
- # Read input
- line = await self._read_input(prompt_str)
-
- await self._handle_input(line.strip())
-
- except EOFError:
- cprint("\nEOF received. Shutting down.", Colors.YELLOW)
- break
- except KeyboardInterrupt:
- continue
-
- async def _read_input(self, prompt: str) -> str:
- """Read a line of input (with fallback for non-TTY terminals)."""
- if self.use_simple_input:
- # Fallback: simple blocking input
- loop = asyncio.get_event_loop()
- print(prompt, end="", flush=True)
- try:
- line = await loop.run_in_executor(None, sys.stdin.readline)
- if not line:
- raise EOFError()
- return line.strip()
- except (EOFError, KeyboardInterrupt):
- raise
- else:
- # Use prompt_toolkit with optional timeout
- try:
- with patch_stdout():
- if self.idle_timeout > 0:
- try:
- return await asyncio.wait_for(
- self.session.prompt_async(prompt),
- timeout=self.idle_timeout
- )
- except asyncio.TimeoutError:
- cprint("\nIdle timeout. Detaching console.", Colors.YELLOW)
- self.attached = False
- return ""
- else:
- return await self.session.prompt_async(prompt)
- except Exception:
- # Fall back to simple input if prompt_toolkit fails
- self.use_simple_input = True
- return await self._read_input(prompt)
-
- async def _handle_input(self, line: str) -> None:
- """Route input to appropriate handler."""
- if not line:
- return
-
- if line.startswith("/"):
- await self._handle_command(line)
- elif line.startswith("@"):
- await self._handle_message(line)
- else:
- cprint("Use @listener message or /command", Colors.DIM)
-
- # ------------------------------------------------------------------
- # Command Handling
- # ------------------------------------------------------------------
-
- async def _handle_command(self, line: str) -> None:
- """Handle /command."""
- parts = line[1:].split(None, 1)
- cmd = parts[0].lower() if parts else ""
- args = parts[1] if len(parts) > 1 else ""
-
- # Check if protected command
- if cmd in PROTECTED_COMMANDS:
- if not await self._verify_password():
- cprint("Password required for this command.", Colors.RED)
- return
-
- # Dispatch to handler
- handler = getattr(self, f"_cmd_{cmd}", None)
- if handler:
- await handler(args)
- else:
- cprint(f"Unknown command: /{cmd}", Colors.RED)
- cprint("Type /help for available commands.", Colors.DIM)
-
- async def _verify_password(self) -> bool:
- """Verify password for protected commands."""
- password = await self._prompt_password("Password: ")
- return self.password_mgr.verify(password)
-
- # ------------------------------------------------------------------
- # Message Handling
- # ------------------------------------------------------------------
-
- async def _handle_message(self, line: str) -> None:
- """Handle @listener message."""
- if not self.attached:
- cprint("Console detached. Use /attach first.", Colors.RED)
- return
-
- parts = line[1:].split(None, 1)
- if not parts:
- cprint("Usage: @listener message", Colors.DIM)
- return
-
- target = parts[0].lower()
- message = parts[1] if len(parts) > 1 else ""
-
- # Check if listener exists
- if target not in self.pump.listeners:
- cprint(f"Unknown listener: {target}", Colors.RED)
- cprint("Use /listeners to see available listeners.", Colors.DIM)
- return
-
- cprint(f"[sending to {target}]", Colors.DIM)
-
- # Create payload based on target listener
- listener = self.pump.listeners[target]
- payload = self._create_payload(listener, message)
- if payload is None:
- cprint(f"Cannot create payload for {target}", Colors.RED)
- return
-
- # Create thread and inject message
- import uuid
- thread_id = str(uuid.uuid4())
-
- envelope = self.pump._wrap_in_envelope(
- payload=payload,
- from_id="console",
- to_id=target,
- thread_id=thread_id,
- )
-
- await self.pump.inject(envelope, thread_id=thread_id, from_id="console")
-
- def _create_payload(self, listener, message: str):
- """Create payload instance for a listener from message text."""
- payload_class = listener.payload_class
-
- # Try to create payload with common field patterns
- # Most payloads have a single text field like 'name', 'message', 'text', etc.
- if hasattr(payload_class, '__dataclass_fields__'):
- fields = payload_class.__dataclass_fields__
- field_names = list(fields.keys())
-
- if len(field_names) == 1:
- # Single field - use the message as its value
- return payload_class(**{field_names[0]: message})
- elif 'name' in field_names:
- return payload_class(name=message)
- elif 'message' in field_names:
- return payload_class(message=message)
- elif 'text' in field_names:
- return payload_class(text=message)
-
- # Fallback: try with no args
- try:
- return payload_class()
- except Exception:
- return None
-
- # ------------------------------------------------------------------
- # Commands: Informational
- # ------------------------------------------------------------------
-
- async def _cmd_help(self, args: str) -> None:
- """Show available commands."""
- cprint("\nCommands:", Colors.CYAN)
- cprint(" /help Show this help", Colors.DIM)
- cprint(" /status Show organism status", Colors.DIM)
- cprint(" /listeners List registered listeners", Colors.DIM)
- cprint(" /threads List active threads", Colors.DIM)
- cprint(" /buffer Inspect thread's context buffer", Colors.DIM)
- cprint(" /monitor Show recent messages from thread", Colors.DIM)
- cprint(" /monitor * Show recent messages from all threads", Colors.DIM)
- cprint("")
- cprint("Configuration:", Colors.CYAN)
- cprint(" /config Show current config", Colors.DIM)
- cprint(" /config -e Edit organism.yaml", Colors.DIM)
- cprint(" /config @name Edit listener config", Colors.DIM)
- cprint(" /config --list List listener configs", Colors.DIM)
- cprint("")
- cprint("Protected (require password):", Colors.YELLOW)
- cprint(" /restart Restart the pipeline", Colors.DIM)
- cprint(" /kill Terminate a thread", Colors.DIM)
- cprint(" /pause Pause message processing", Colors.DIM)
- cprint(" /resume Resume message processing", Colors.DIM)
- cprint("")
- cprint("Session:", Colors.CYAN)
- cprint(" /attach Attach console (enable @messages)", Colors.DIM)
- cprint(" /detach Detach console (organism keeps running)", Colors.DIM)
- cprint(" /passwd Change console password", Colors.DIM)
- cprint(" /quit Graceful shutdown", Colors.DIM)
- cprint("")
-
- async def _cmd_status(self, args: str) -> None:
- """Show organism status."""
- from xml_pipeline.memory import get_context_buffer
- from xml_pipeline.message_bus.thread_registry import get_registry
-
- buffer = get_context_buffer()
- registry = get_registry()
- stats = buffer.get_stats()
-
- cprint(f"\nOrganism: {self.pump.config.name}", Colors.CYAN)
- cprint(f"Status: {'attached' if self.attached else 'detached'}",
- Colors.GREEN if self.attached else Colors.YELLOW)
- cprint(f"Listeners: {len(self.pump.listeners)}", Colors.DIM)
- cprint(f"Threads: {stats['thread_count']} active", Colors.DIM)
- cprint(f"Buffer: {stats['total_slots']} slots across threads", Colors.DIM)
- cprint("")
-
- async def _cmd_listeners(self, args: str) -> None:
- """List registered listeners."""
- cprint("\nRegistered listeners:", Colors.CYAN)
- for name, listener in self.pump.listeners.items():
- agent_tag = "[agent] " if listener.is_agent else ""
- cprint(f" {name:20} {agent_tag}{listener.description}", Colors.DIM)
- cprint("")
-
- async def _cmd_threads(self, args: str) -> None:
- """List active threads."""
- from xml_pipeline.memory import get_context_buffer
-
- buffer = get_context_buffer()
- stats = buffer.get_stats()
-
- if stats["thread_count"] == 0:
- cprint("\nNo active threads.", Colors.DIM)
- return
-
- cprint(f"\nActive threads ({stats['thread_count']}):", Colors.CYAN)
-
- # Access internal threads dict (not ideal but works for now)
- for thread_id, ctx in buffer._threads.items():
- slot_count = len(ctx)
- age = datetime.now(timezone.utc) - ctx._created_at
- age_str = str(age).split(".")[0] # Remove microseconds
-
- # Get last sender/receiver
- if slot_count > 0:
- last = ctx[-1]
- flow = f"{last.from_id} -> {last.to_id}"
- else:
- flow = "(empty)"
-
- cprint(f" {thread_id[:12]}... slots={slot_count:3} age={age_str} {flow}", Colors.DIM)
- cprint("")
-
- async def _cmd_buffer(self, args: str) -> None:
- """Inspect a thread's context buffer."""
- if not args:
- cprint("Usage: /buffer ", Colors.DIM)
- return
-
- from xml_pipeline.memory import get_context_buffer
- buffer = get_context_buffer()
-
- # Find thread by prefix
- thread_id = None
- for tid in buffer._threads.keys():
- if tid.startswith(args):
- thread_id = tid
- break
-
- if not thread_id:
- cprint(f"Thread not found: {args}", Colors.RED)
- return
-
- ctx = buffer.get_thread(thread_id)
- cprint(f"\nThread: {thread_id}", Colors.CYAN)
- cprint(f"Slots: {len(ctx)}", Colors.DIM)
- cprint("-" * 60, Colors.DIM)
-
- for slot in ctx:
- payload_type = type(slot.payload).__name__
- cprint(f"[{slot.index}] {slot.from_id} -> {slot.to_id}: {payload_type}", Colors.DIM)
- # Show first 100 chars of payload repr
- payload_repr = repr(slot.payload)[:100]
- cprint(f" {payload_repr}", Colors.DIM)
- cprint("")
-
- async def _cmd_monitor(self, args: str) -> None:
- """Show recent messages from a thread's context buffer."""
- if not args:
- cprint("Usage: /monitor ", Colors.DIM)
- cprint(" /monitor * (show all threads)", Colors.DIM)
- return
-
- from xml_pipeline.memory import get_context_buffer
- buffer = get_context_buffer()
-
- # Find thread by prefix (or * for all)
- monitor_all = args.strip() == "*"
- thread_id = None
-
- if not monitor_all:
- for tid in buffer._threads.keys():
- if tid.startswith(args):
- thread_id = tid
- break
-
- if not thread_id:
- cprint(f"Thread not found: {args}", Colors.RED)
- return
-
- # Show header
- if monitor_all:
- cprint("\nAll threads:", Colors.CYAN)
- else:
- cprint(f"\nThread {thread_id[:12]}...:", Colors.CYAN)
- cprint("-" * 60, Colors.DIM)
-
- # Show messages
- if monitor_all:
- for tid, ctx in buffer._threads.items():
- if len(ctx) > 0:
- cprint(f"\n[{tid[:12]}...] ({len(ctx)} messages)", Colors.YELLOW)
- # Show last 5 messages per thread
- for slot in ctx.get_slice(-5):
- self._print_monitor_slot(tid, slot)
- else:
- ctx = buffer.get_thread(thread_id)
- # Show all messages (up to 20)
- slots = ctx.get_slice(-20)
- for slot in slots:
- self._print_monitor_slot(thread_id, slot)
- if len(ctx) > 20:
- cprint(f" ... ({len(ctx) - 20} earlier messages)", Colors.DIM)
-
- cprint("")
-
- def _print_monitor_slot(self, thread_id: str, slot) -> None:
- """Print a single slot in monitor format."""
- payload_type = type(slot.payload).__name__
- tid_short = thread_id[:8]
- timestamp = slot.metadata.timestamp.split("T")[1][:8] if "T" in slot.metadata.timestamp else ""
-
- # Color based on direction
- if slot.from_id == "console":
- color = Colors.GREEN
- elif "response" in slot.to_id.lower() or "console" in slot.to_id.lower():
- color = Colors.CYAN
- else:
- color = Colors.DIM
-
- # Format: [time] thread: from -> to: Type
- cprint(f"[{timestamp}] {tid_short}: {slot.from_id} -> {slot.to_id}: {payload_type}", color)
-
- # Show payload content (abbreviated)
- payload_str = repr(slot.payload)
- if len(payload_str) > 80:
- payload_str = payload_str[:77] + "..."
- cprint(f" {payload_str}", Colors.DIM)
-
- async def _cmd_config(self, args: str) -> None:
- """
- Edit configuration files.
-
- /config - Edit organism.yaml
- /config @name - Edit listener config (e.g., /config @greeter)
- /config --list - List available listener configs
- /config --show - Show current config (read-only)
- """
- args = args.strip() if args else ""
-
- if args == "--list":
- await self._config_list()
- elif args == "--show" or args == "":
- await self._config_show()
- elif args.startswith("@"):
- listener_name = args[1:].strip()
- if listener_name:
- await self._config_edit_listener(listener_name)
- else:
- cprint("Usage: /config @listener_name", Colors.RED)
- elif args == "--edit" or args == "-e":
- await self._config_edit_organism()
- else:
- cprint(f"Unknown option: {args}", Colors.RED)
- cprint("Usage:", Colors.DIM)
- cprint(" /config Show current config", Colors.DIM)
- cprint(" /config -e Edit organism.yaml", Colors.DIM)
- cprint(" /config @name Edit listener config", Colors.DIM)
- cprint(" /config --list List listener configs", Colors.DIM)
-
- async def _config_show(self) -> None:
- """Show current configuration (read-only)."""
- cprint(f"\nOrganism: {self.pump.config.name}", Colors.CYAN)
- cprint(f"Port: {self.pump.config.port}", Colors.DIM)
- cprint(f"Thread scheduling: {self.pump.config.thread_scheduling}", Colors.DIM)
- cprint(f"Max concurrent pipelines: {self.pump.config.max_concurrent_pipelines}", Colors.DIM)
- cprint(f"Max concurrent handlers: {self.pump.config.max_concurrent_handlers}", Colors.DIM)
- cprint(f"Max concurrent per agent: {self.pump.config.max_concurrent_per_agent}", Colors.DIM)
- cprint("\nUse /config -e to edit organism.yaml", Colors.DIM)
- cprint("Use /config @listener to edit a listener config", Colors.DIM)
- cprint("")
-
- async def _config_list(self) -> None:
- """List available listener configs."""
- from xml_pipeline.config import get_listener_config_store
-
- store = get_listener_config_store()
- listeners = store.list_listeners()
-
- cprint("\nListener configurations:", Colors.CYAN)
- cprint(f"Directory: {store.listeners_dir}", Colors.DIM)
- cprint("")
-
- if not listeners:
- cprint(" No listener configs found.", Colors.DIM)
- cprint(" Use /config @name to create one.", Colors.DIM)
- else:
- for name in sorted(listeners):
- config = store.get(name)
- agent_tag = "[agent]" if config.agent else "[tool]" if config.tool else ""
- cprint(f" @{name:20} {agent_tag} {config.description or ''}", Colors.DIM)
-
- # Also show registered listeners without config files
- unconfigured = [
- name for name in self.pump.listeners.keys()
- if name not in listeners
- ]
- if unconfigured:
- cprint("\nRegistered listeners without config files:", Colors.YELLOW)
- for name in sorted(unconfigured):
- listener = self.pump.listeners[name]
- agent_tag = "[agent]" if listener.is_agent else ""
- cprint(f" @{name:20} {agent_tag} {listener.description}", Colors.DIM)
-
- cprint("")
-
- async def _config_edit_organism(self) -> None:
- """Edit organism.yaml in the full-screen editor."""
- from xml_pipeline.console.editor import edit_text_async
- from xml_pipeline.config.schema import ensure_schemas
- from xml_pipeline.config.split_loader import (
- get_organism_yaml_path,
- load_organism_yaml_content,
- save_organism_yaml_content,
- )
-
- # Ensure schemas are written for LSP
- try:
- ensure_schemas()
- except Exception as e:
- cprint(f"Warning: Could not write schemas: {e}", Colors.YELLOW)
-
- # Find organism.yaml
- config_path = get_organism_yaml_path()
- if config_path is None:
- cprint("No organism.yaml found.", Colors.RED)
- cprint("Searched in:", Colors.DIM)
- cprint(" ~/.xml-pipeline/organism.yaml", Colors.DIM)
- cprint(" ./organism.yaml", Colors.DIM)
- cprint(" ./config/organism.yaml", Colors.DIM)
- return
-
- # Load content
- try:
- content = load_organism_yaml_content(config_path)
- except Exception as e:
- cprint(f"Failed to load config: {e}", Colors.RED)
- return
-
- # Edit
- cprint(f"Editing: {config_path}", Colors.CYAN)
- cprint("Press Ctrl+S to save, Ctrl+Q to cancel", Colors.DIM)
- cprint("")
-
- edited_text, saved = await edit_text_async(
- content,
- title=f"organism.yaml ({config_path.name})",
- schema_type="organism",
- )
-
- if saved and edited_text is not None:
- try:
- save_organism_yaml_content(config_path, edited_text)
- cprint("Configuration saved.", Colors.GREEN)
- cprint("Note: Restart required for changes to take effect.", Colors.YELLOW)
- except yaml.YAMLError as e:
- cprint(f"Invalid YAML: {e}", Colors.RED)
- except Exception as e:
- cprint(f"Failed to save: {e}", Colors.RED)
- else:
- cprint("Edit cancelled.", Colors.DIM)
-
- async def _config_edit_listener(self, name: str) -> None:
- """Edit a listener config in the full-screen editor."""
- from xml_pipeline.config import get_listener_config_store
- from xml_pipeline.console.editor import edit_text_async
- from xml_pipeline.config.schema import ensure_schemas
-
- # Ensure schemas are written for LSP
- try:
- ensure_schemas()
- except Exception as e:
- cprint(f"Warning: Could not write schemas: {e}", Colors.YELLOW)
-
- store = get_listener_config_store()
-
- # Load or create content
- if store.exists(name):
- content = store.load_yaml(name)
- cprint(f"Editing: {store.path_for(name)}", Colors.CYAN)
- else:
- # Check if it's a registered listener
- if name in self.pump.listeners:
- cprint(f"Creating new config for registered listener: {name}", Colors.CYAN)
- else:
- cprint(f"Creating new config for: {name}", Colors.CYAN)
- content = store._default_template(name)
-
- cprint("Press Ctrl+S to save, Ctrl+Q to cancel", Colors.DIM)
- cprint("")
-
- # Edit
- edited_text, saved = await edit_text_async(
- content,
- title=f"{name}.yaml",
- schema_type="listener",
- )
-
- if saved and edited_text is not None:
- try:
- path = store.save_yaml(name, edited_text)
- cprint(f"Saved: {path}", Colors.GREEN)
- cprint("Note: Restart required for changes to take effect.", Colors.YELLOW)
- except yaml.YAMLError as e:
- cprint(f"Invalid YAML: {e}", Colors.RED)
- except Exception as e:
- cprint(f"Failed to save: {e}", Colors.RED)
- else:
- cprint("Edit cancelled.", Colors.DIM)
-
- # ------------------------------------------------------------------
- # Commands: Protected
- # ------------------------------------------------------------------
-
- async def _cmd_restart(self, args: str) -> None:
- """Restart the pipeline."""
- cprint("Restarting pipeline...", Colors.YELLOW)
- await self.pump.shutdown()
-
- # Re-bootstrap
- from xml_pipeline.message_bus.stream_pump import bootstrap
- self.pump = await bootstrap()
-
- # Start pump in background
- asyncio.create_task(self.pump.run())
- cprint("Pipeline restarted.", Colors.GREEN)
-
- async def _cmd_kill(self, args: str) -> None:
- """Terminate a thread."""
- if not args:
- cprint("Usage: /kill ", Colors.DIM)
- return
-
- from xml_pipeline.memory import get_context_buffer
- buffer = get_context_buffer()
-
- # Find thread by prefix
- thread_id = None
- for tid in buffer._threads.keys():
- if tid.startswith(args):
- thread_id = tid
- break
-
- if not thread_id:
- cprint(f"Thread not found: {args}", Colors.RED)
- return
-
- buffer.delete_thread(thread_id)
- cprint(f"Thread {thread_id[:12]}... terminated.", Colors.YELLOW)
-
- async def _cmd_pause(self, args: str) -> None:
- """Pause message processing."""
- cprint("Pause not yet implemented.", Colors.YELLOW)
- # TODO: Implement pump pause
-
- async def _cmd_resume(self, args: str) -> None:
- """Resume message processing."""
- cprint("Resume not yet implemented.", Colors.YELLOW)
- # TODO: Implement pump resume
-
- # ------------------------------------------------------------------
- # Commands: Session
- # ------------------------------------------------------------------
-
- async def _cmd_attach(self, args: str) -> None:
- """Attach console."""
- if self.attached:
- cprint("Already attached.", Colors.DIM)
- return
-
- if not await self._verify_password():
- cprint("Password required to attach.", Colors.RED)
- return
-
- self.attached = True
- cprint("Console attached.", Colors.GREEN)
-
- async def _cmd_detach(self, args: str) -> None:
- """Detach console."""
- if not self.attached:
- cprint("Already detached.", Colors.DIM)
- return
-
- self.attached = False
- cprint("Console detached. Organism continues running.", Colors.YELLOW)
- cprint("Use /attach to re-attach.", Colors.DIM)
-
- async def _cmd_passwd(self, args: str) -> None:
- """Change console password."""
- # Verify current password
- current = await self._prompt_password("Current password: ")
- if not self.password_mgr.verify(current):
- cprint("Incorrect password.", Colors.RED)
- return
-
- # Get new password
- while True:
- new_pass = await self._prompt_password("New password: ")
- if not new_pass or len(new_pass) < 4:
- cprint("Password must be at least 4 characters.", Colors.RED)
- continue
-
- confirm = await self._prompt_password("Confirm new password: ")
- if new_pass != confirm:
- cprint("Passwords do not match.", Colors.RED)
- continue
-
- break
-
- self.password_mgr.save_hash(new_pass)
- cprint("Password changed successfully.", Colors.GREEN)
-
- async def _cmd_quit(self, args: str) -> None:
- """Graceful shutdown."""
- cprint("Shutting down...", Colors.YELLOW)
- self.running = False
- await self.pump.shutdown()
-
- # ------------------------------------------------------------------
- # UI Helpers
- # ------------------------------------------------------------------
-
- def _print_banner(self) -> None:
- """Print startup banner."""
- print()
- cprint("+" + "=" * 44 + "+", Colors.CYAN)
- cprint("|" + " " * 8 + "xml-pipeline console v3.0" + " " * 9 + "|", Colors.CYAN)
- cprint("+" + "=" * 44 + "+", Colors.CYAN)
- print()
- cprint(f"Organism '{self.pump.config.name}' ready.", Colors.GREEN)
- cprint(f"{len(self.pump.listeners)} listeners registered.", Colors.DIM)
- cprint("Type /help for commands.", Colors.DIM)
- print()
diff --git a/xml_pipeline/console/tui_console.py b/xml_pipeline/console/tui_console.py
deleted file mode 100644
index 9bdaf84..0000000
--- a/xml_pipeline/console/tui_console.py
+++ /dev/null
@@ -1,469 +0,0 @@
-"""
-tui_console.py — Split-screen TUI console using prompt_toolkit.
-
-Features:
-- Fixed Command History (Up/Down arrows)
-- Robust Scrolling with snap-to-bottom and blank line spacer
-- Fully implemented /monitor, /status, /listeners commands
-"""
-
-from __future__ import annotations
-
-import asyncio
-import os
-from datetime import datetime
-from pathlib import Path
-from typing import TYPE_CHECKING, List, Optional
-
-try:
- from prompt_toolkit import Application
- from prompt_toolkit.buffer import Buffer
- from prompt_toolkit.document import Document
- from prompt_toolkit.formatted_text import FormattedText, HTML
- from prompt_toolkit.key_binding import KeyBindings
- from prompt_toolkit.layout import (
- Layout,
- HSplit,
- Window,
- FormattedTextControl,
- BufferControl,
- )
- from prompt_toolkit.layout.dimension import Dimension
- from prompt_toolkit.layout.margins import ScrollbarMargin
- from prompt_toolkit.styles import Style
- from prompt_toolkit.history import FileHistory
- from prompt_toolkit.patch_stdout import patch_stdout
- from prompt_toolkit.output.win32 import NoConsoleScreenBufferError
- PROMPT_TOOLKIT_AVAILABLE = True
-except ImportError:
- PROMPT_TOOLKIT_AVAILABLE = False
- NoConsoleScreenBufferError = Exception
-
-if TYPE_CHECKING:
- from xml_pipeline.message_bus.stream_pump import StreamPump
-
-
-# ============================================================================
-# Constants
-# ============================================================================
-
-CONFIG_DIR = Path.home() / ".xml-pipeline"
-HISTORY_FILE = CONFIG_DIR / "history"
-
-STYLE = Style.from_dict({
- "output": "#ffffff",
- "output.system": "#888888 italic",
- "output.greeter": "#00ff00",
- "output.shouter": "#ffff00",
- "output.response": "#00ffff",
- "output.error": "#ff0000",
- "output.dim": "#666666",
- "separator": "#444444",
- "separator.text": "#888888",
- "input": "#ffffff",
- "prompt": "#00ff00 bold",
-})
-
-
-# ============================================================================
-# Output Buffer
-# ============================================================================
-
-class OutputBuffer:
- """Manages scrolling output history using a text Buffer."""
-
- def __init__(self, max_lines: int = 1000):
- self.max_lines = max_lines
- self._lines: List[str] = []
- self.buffer = Buffer(read_only=True, name="output")
- self._user_scrolled = False # Track if user manually scrolled
-
- def append(self, text: str, style: str = "output"):
- timestamp = datetime.now().strftime("%H:%M:%S")
- self._lines.append(f"[{timestamp}] {text}")
- self._update_buffer()
-
- def append_raw(self, text: str, style: str = "output"):
- self._lines.append(text)
- self._update_buffer()
-
- def _update_buffer(self):
- """Update buffer content. Auto-scroll only if user hasn't scrolled up."""
- if len(self._lines) > self.max_lines:
- self._lines = self._lines[-self.max_lines:]
-
- text = "\n".join(self._lines)
-
- # If user scrolled up, preserve their position; otherwise snap to bottom
- if self._user_scrolled:
- old_pos = self.buffer.cursor_position
- self.buffer.set_document(
- Document(text=text, cursor_position=min(old_pos, len(text))),
- bypass_readonly=True
- )
- else:
- # Auto-scroll to bottom for new content
- self.buffer.set_document(
- Document(text=text, cursor_position=len(text)),
- bypass_readonly=True
- )
-
- def is_at_bottom(self) -> bool:
- """Check if we should show the spacer (user hasn't scrolled away)."""
- return not self._user_scrolled
-
- def scroll_to_bottom(self):
- """Force cursor to the end and mark as 'at bottom'."""
- self.buffer.cursor_position = len(self.buffer.text)
- self._user_scrolled = False # Reset flag when explicitly scrolling to bottom
-
- def mark_scrolled(self):
- """Called when user manually scrolls up."""
- self._user_scrolled = True
-
- def mark_unscrolled(self):
- """Called when user scrolls to bottom."""
- self._user_scrolled = False
-
- def clear(self):
- self._lines.clear()
- self.buffer.set_document(Document(text=""), bypass_readonly=True)
- self._user_scrolled = False
-
-
-# ============================================================================
-# TUI Console
-# ============================================================================
-
-class TUIConsole:
- def __init__(self, pump: StreamPump):
- self.pump = pump
- self.output = OutputBuffer()
- self.running = False
- self.attached = True
- self.use_simple_mode = False
-
- CONFIG_DIR.mkdir(parents=True, exist_ok=True)
-
- try:
- if not PROMPT_TOOLKIT_AVAILABLE:
- raise ImportError("prompt_toolkit not available")
-
- # Command history setup
- if HISTORY_FILE.exists() and not os.access(HISTORY_FILE, os.W_OK):
- os.chmod(HISTORY_FILE, 0o666)
-
- self.input_buffer = Buffer(
- history=FileHistory(str(HISTORY_FILE)),
- multiline=False,
- accept_handler=self._accept_handler
- )
-
- self._build_ui()
- except (NoConsoleScreenBufferError, ImportError, Exception) as e:
- self.use_simple_mode = True
- self.app = None
- print(f"\033[2mNote: Using simple mode ({type(e).__name__})\033[0m")
-
- def _accept_handler(self, buffer: Buffer) -> bool:
- text = buffer.text.strip()
- if text:
- asyncio.create_task(self._process_input(text))
- return False
-
- def _build_ui(self):
- kb = KeyBindings()
-
- @kb.add("c-c")
- @kb.add("c-d")
- def _(event):
- self.running = False
- event.app.exit()
-
- @kb.add("c-l")
- def _(event):
- self.output.clear()
-
- @kb.add("up")
- def _(event):
- self.input_buffer.history_backward()
-
- @kb.add("down")
- def _(event):
- self.input_buffer.history_forward()
-
- @kb.add("pageup")
- def _(event):
- buf = self.output.buffer
- doc = buf.document
- new_row = max(0, doc.cursor_position_row - 20)
- buf.cursor_position = doc.translate_row_col_to_index(new_row, 0)
- self._invalidate()
-
- @kb.add("pagedown")
- def _(event):
- buf = self.output.buffer
- doc = buf.document
- lines = doc.line_count
- new_row = doc.cursor_position_row + 20
-
- if new_row >= lines - 1:
- self.output.scroll_to_bottom()
- else:
- buf.cursor_position = doc.translate_row_col_to_index(new_row, 0)
- self._invalidate()
-
- @kb.add("c-home")
- def _(event):
- self.output.buffer.cursor_position = 0
- self._invalidate()
-
- @kb.add("c-end")
- def _(event):
- self.output.scroll_to_bottom()
- self._invalidate()
-
- output_control = BufferControl(
- buffer=self.output.buffer,
- focusable=False,
- include_default_input_processors=False,
- )
-
- self.output_window = Window(
- content=output_control,
- wrap_lines=True,
- right_margins=[ScrollbarMargin(display_arrows=True)],
- )
-
- def get_spacer_height():
- return 1 if self.output.is_at_bottom() else 0
-
- spacer = Window(height=lambda: Dimension.exact(get_spacer_height()))
-
- def get_separator():
- name = self.pump.config.name
- width = 60
- padding = "─" * ((width - len(name) - 4) // 2)
- return FormattedText([
- ("class:separator", padding),
- ("class:separator.text", f" {name} "),
- ("class:separator", padding),
- ])
-
- separator = Window(
- content=FormattedTextControl(text=get_separator),
- height=1,
- )
-
- input_window = Window(
- content=BufferControl(buffer=self.input_buffer),
- height=1,
- )
-
- from prompt_toolkit.layout import VSplit
- input_row = VSplit([
- Window(
- content=FormattedTextControl(text=lambda: FormattedText([("class:prompt", "> ")])),
- width=2,
- ),
- input_window,
- ])
-
- root = HSplit([
- self.output_window,
- spacer,
- separator,
- input_row,
- ])
-
- self.layout = Layout(root, focused_element=input_window)
-
- self.app = Application(
- layout=self.layout,
- key_bindings=kb,
- style=STYLE,
- full_screen=True,
- mouse_support=True,
- )
-
- def print(self, text: str, style: str = "output"):
- if self.use_simple_mode:
- self._print_simple(text, style)
- else:
- self.output.append(text, style)
- self._invalidate()
-
- def print_raw(self, text: str, style: str = "output"):
- if self.use_simple_mode:
- self._print_simple(text, style)
- else:
- self.output.append_raw(text, style)
- self._invalidate()
-
- def print_system(self, text: str):
- self.print(text, "output.system")
-
- def print_error(self, text: str):
- self.print(text, "output.error")
-
- def _invalidate(self):
- if self.app:
- try:
- self.app.invalidate()
- except Exception:
- pass
-
- def _print_simple(self, text: str, style: str = "output"):
- colors = {
- "output.system": "\033[2m",
- "output.error": "\033[31m",
- "output.dim": "\033[2m",
- "output.greeter": "\033[32m",
- "output.shouter": "\033[33m",
- "output.response": "\033[36m",
- }
- color = colors.get(style, "")
- print(f"{color}{text}\033[0m")
-
- async def run(self):
- self.running = True
- if self.use_simple_mode:
- await self._run_simple()
- return
-
- self.print_raw(f"xml-pipeline console v3.0", "output.system")
- self.print_raw(f"Organism: {self.pump.config.name}", "output.system")
- self.print_raw(f"Type /help for commands, @listener message to chat", "output.dim")
- self.print_raw("", "output")
-
- try:
- async def refresh_loop():
- while self.running:
- await asyncio.sleep(0.1)
- if self.app and self.app.is_running:
- self.app.invalidate()
-
- refresh_task = asyncio.create_task(refresh_loop())
- try:
- await self.app.run_async()
- finally:
- refresh_task.cancel()
- except Exception as e:
- print(f"Console error: {e}")
- finally:
- self.running = False
-
- async def _run_simple(self):
- print(f"\033[36mxml-pipeline console v3.0 (simple mode)\033[0m")
- while self.running:
- try:
- line = await asyncio.get_event_loop().run_in_executor(None, lambda: input("> "))
- if line: await self._process_input(line.strip())
- except (EOFError, KeyboardInterrupt): break
- self.running = False
-
- async def _process_input(self, line: str):
- if not self.use_simple_mode:
- self.print_raw(f"> {line}", "output.dim")
- if line.startswith("/"):
- await self._handle_command(line)
- elif line.startswith("@"):
- await self._handle_message(line)
- else:
- self.print("Use @listener message or /command", "output.dim")
-
- async def _handle_command(self, line: str):
- parts = line[1:].split(None, 1)
- cmd = parts[0].lower() if parts else ""
- args = parts[1] if len(parts) > 1 else ""
-
- handler = getattr(self, f"_cmd_{cmd}", None)
- if handler:
- await handler(args)
- else:
- self.print_error(f"Unknown command: /{cmd}")
-
- async def _cmd_help(self, args: str):
- self.print_raw("Commands:", "output.system")
- self.print_raw(" /status, /listeners, /threads, /monitor, /clear, /quit", "output.dim")
-
- async def _cmd_status(self, args: str):
- from xml_pipeline.memory import get_context_buffer
- buffer = get_context_buffer()
- stats = buffer.get_stats()
- self.print_raw(f"Organism: {self.pump.config.name}", "output.system")
- self.print_raw(f"Threads: {stats['thread_count']} active, {stats['total_slots']} slots total", "output.dim")
-
- async def _cmd_listeners(self, args: str):
- self.print_raw("Listeners:", "output.system")
- for name, l in self.pump.listeners.items():
- tag = "[agent]" if l.is_agent else "[handler]"
- self.print_raw(f" {name:15} {tag} {l.description}", "output.dim")
-
- async def _cmd_threads(self, args: str):
- from xml_pipeline.memory import get_context_buffer
- buffer = get_context_buffer()
- for tid, ctx in buffer._threads.items():
- self.print_raw(f" {tid[:8]}... slots: {len(ctx)}", "output.dim")
-
- async def _cmd_monitor(self, args: str):
- from xml_pipeline.memory import get_context_buffer
- buffer = get_context_buffer()
- if args == "*":
- for tid, ctx in buffer._threads.items():
- self.print_raw(f"--- Thread {tid[:8]} ---", "output.system")
- for slot in list(ctx)[-3:]:
- self.print_raw(f" {slot.from_id} -> {slot.to_id}: {type(slot.payload).__name__}", "output.dim")
- elif args:
- matches = [t for t in buffer._threads if t.startswith(args)]
- if not matches:
- self.print_error(f"No thread matching {args}")
- return
- ctx = buffer.get_thread(matches[0])
- for slot in ctx:
- self.print_raw(f" [{slot.from_id} -> {slot.to_id}] {type(slot.payload).__name__}", "output.dim")
- else:
- self.print("Usage: /monitor or /monitor *", "output.dim")
-
- async def _cmd_clear(self, args: str):
- self.output.clear()
-
- async def _cmd_quit(self, args: str):
- self.running = False
- if self.app: self.app.exit()
-
- async def _handle_message(self, line: str):
- parts = line[1:].split(None, 1)
- if not parts: return
- target, message = parts[0].lower(), (parts[1] if len(parts) > 1 else "")
- if target not in self.pump.listeners:
- self.print_error(f"Unknown listener: {target}")
- return
-
- listener = self.pump.listeners[target]
- payload = self._create_payload(listener, message)
- if payload is None:
- self.print_error(f"Cannot create payload for {target}")
- return
-
- import uuid
- thread_id = str(uuid.uuid4())
- envelope = self.pump._wrap_in_envelope(payload, "console", target, thread_id)
- await self.pump.inject(envelope, thread_id, "console")
-
- def _create_payload(self, listener, message: str):
- payload_class = listener.payload_class
- if hasattr(payload_class, '__dataclass_fields__'):
- fields = list(payload_class.__dataclass_fields__.keys())
- if len(fields) == 1: return payload_class(**{fields[0]: message})
- if 'message' in fields: return payload_class(message=message)
- if 'text' in fields: return payload_class(text=message)
- return None
-
- def on_response(self, from_id: str, payload):
- style = "output.response" if from_id == "response-handler" else "output"
- text = f"[{from_id}] {getattr(payload, 'message', payload)}"
- self.print_raw(text, style)
-
-def create_tui_console(pump: StreamPump) -> TUIConsole:
- return TUIConsole(pump)
\ No newline at end of file
diff --git a/xml_pipeline/main.py b/xml_pipeline/main.py
deleted file mode 100644
index e69de29..0000000
diff --git a/xml_pipeline/server/__init__.py b/xml_pipeline/server/__init__.py
deleted file mode 100644
index 55ef269..0000000
--- a/xml_pipeline/server/__init__.py
+++ /dev/null
@@ -1,11 +0,0 @@
-"""
-HTTP/WebSocket server for xml-pipeline.
-
-Provides:
-- REST API for auth and management
-- WebSocket for console and GUI clients
-"""
-
-from .app import create_app, run_server
-
-__all__ = ["create_app", "run_server"]
diff --git a/xml_pipeline/server/app.py b/xml_pipeline/server/app.py
deleted file mode 100644
index cd3a83c..0000000
--- a/xml_pipeline/server/app.py
+++ /dev/null
@@ -1,266 +0,0 @@
-"""
-aiohttp-based HTTP/WebSocket server.
-
-Provides:
-- REST API for authentication
-- WebSocket for console/GUI message sending
-- Integration with SystemPipeline for message injection
-"""
-
-from __future__ import annotations
-
-import asyncio
-import json
-import logging
-from typing import TYPE_CHECKING, Optional, Callable
-
-try:
- from aiohttp import web, WSMsgType
- AIOHTTP_AVAILABLE = True
-except ImportError:
- AIOHTTP_AVAILABLE = False
- web = None
- WSMsgType = None
-
-from ..auth.users import get_user_store, UserStore
-from ..auth.sessions import get_session_manager, SessionManager, Session
-
-if TYPE_CHECKING:
- from ..message_bus.stream_pump import StreamPump
- from ..message_bus.system_pipeline import SystemPipeline
-
-logger = logging.getLogger(__name__)
-
-
-def auth_middleware():
- @web.middleware
- async def middleware(request, handler):
- if request.path in ("/auth/login", "/health"):
- return await handler(request)
-
- auth_header = request.headers.get("Authorization", "")
- if not auth_header.startswith("Bearer "):
- return web.json_response({"error": "Missing Authorization"}, status=401)
-
- token = auth_header[7:]
- session = request.app["session_manager"].validate(token)
-
- if not session:
- return web.json_response({"error": "Invalid token"}, status=401)
-
- request["session"] = session
- return await handler(request)
-
- return middleware
-
-
-async def handle_login(request):
- try:
- data = await request.json()
- except:
- return web.json_response({"error": "Invalid JSON"}, status=400)
-
- username = data.get("username", "")
- password = data.get("password", "")
-
- if not username or not password:
- return web.json_response({"error": "Credentials required"}, status=400)
-
- user = request.app["user_store"].authenticate(username, password)
- if not user:
- return web.json_response({"error": "Invalid credentials"}, status=401)
-
- session = request.app["session_manager"].create(user.username, user.role)
- return web.json_response(session.to_dict())
-
-
-async def handle_logout(request):
- session = request["session"]
- request.app["session_manager"].revoke(session.token)
- return web.json_response({"message": "Logged out"})
-
-
-async def handle_me(request):
- session = request["session"]
- return web.json_response({
- "username": session.username,
- "role": session.role,
- "expires_at": session.expires_at.isoformat(),
- })
-
-
-async def handle_health(request):
- return web.json_response({"status": "ok"})
-
-
-async def handle_websocket(request):
- session = request["session"]
- pump = request.app.get("pump")
- system_pipeline = request.app.get("system_pipeline")
-
- ws = web.WebSocketResponse()
- await ws.prepare(request)
-
- # Track this WebSocket for response delivery
- ws_id = id(ws)
- request.app["websockets"][ws_id] = {
- "ws": ws,
- "user": session.username,
- "threads": set(), # Thread IDs this client is subscribed to
- }
-
- await ws.send_json({"type": "connected", "username": session.username})
-
- try:
- async for msg in ws:
- if msg.type == WSMsgType.TEXT:
- try:
- data = json.loads(msg.data)
- resp = await handle_ws_msg(
- data, session, pump, system_pipeline,
- request.app["websockets"][ws_id]
- )
- await ws.send_json(resp)
- except Exception as e:
- logger.exception(f"WebSocket error: {e}")
- await ws.send_json({"type": "error", "error": str(e)})
- finally:
- # Cleanup on disconnect
- del request.app["websockets"][ws_id]
-
- return ws
-
-
-async def handle_ws_msg(data, session, pump, system_pipeline, ws_state):
- """
- Handle WebSocket message.
-
- Message types:
- ping - Keepalive
- status - Get server status
- listeners - List available listeners
- targets - Alias for listeners
- send - Send message to pipeline (@target or explicit)
- """
- t = data.get("type", "")
-
- if t == "ping":
- return {"type": "pong"}
-
- elif t == "status":
- from ..memory import get_context_buffer
- stats = get_context_buffer().get_stats()
- return {"type": "status", "threads": stats["thread_count"]}
-
- elif t == "listeners" or t == "targets":
- if not pump:
- return {"type": "listeners", "listeners": []}
- return {"type": "listeners", "listeners": list(pump.listeners.keys())}
-
- elif t == "send":
- # Send message to pipeline
- if not system_pipeline:
- return {"type": "error", "error": "Pipeline not available"}
-
- # Support two formats:
- # 1. {"type": "send", "raw": "@greeter Dan"}
- # 2. {"type": "send", "target": "greeter", "content": "Dan"}
- raw = data.get("raw")
- if raw:
- # Parse @target message format
- try:
- thread_id = await system_pipeline.inject_console(
- raw=raw,
- user=session.username,
- )
- except ValueError as e:
- return {"type": "error", "error": str(e)}
- else:
- target = data.get("target")
- content = data.get("content", data.get("text", data.get("message", "")))
-
- if not target:
- return {"type": "error", "error": "Missing target"}
- if not content:
- return {"type": "error", "error": "Missing content"}
-
- try:
- thread_id = await system_pipeline.inject_raw(
- target=target,
- content=content,
- source="websocket",
- user=session.username,
- )
- except ValueError as e:
- return {"type": "error", "error": str(e)}
-
- # Track thread for response delivery
- ws_state["threads"].add(thread_id)
-
- return {
- "type": "sent",
- "thread_id": thread_id,
- "target": data.get("target") or raw.split()[0].lstrip("@") if raw else None,
- }
-
- return {"type": "error", "error": f"Unknown message type: {t}"}
-
-
-def create_app(pump=None, system_pipeline=None):
- """
- Create the aiohttp application.
-
- Args:
- pump: StreamPump instance (optional)
- system_pipeline: SystemPipeline instance (optional, created from pump if not provided)
- """
- if not AIOHTTP_AVAILABLE:
- raise RuntimeError("aiohttp not installed")
-
- app = web.Application(middlewares=[auth_middleware()])
- app["user_store"] = get_user_store()
- app["session_manager"] = get_session_manager()
- app["pump"] = pump
- app["websockets"] = {} # Track connected WebSocket clients
-
- # Create SystemPipeline if pump provided but system_pipeline not
- if pump and not system_pipeline:
- from ..message_bus.system_pipeline import SystemPipeline
- system_pipeline = SystemPipeline(pump)
-
- app["system_pipeline"] = system_pipeline
-
- app.router.add_post("/auth/login", handle_login)
- app.router.add_post("/auth/logout", handle_logout)
- app.router.add_get("/auth/me", handle_me)
- app.router.add_get("/health", handle_health)
- app.router.add_get("/ws", handle_websocket)
-
- return app
-
-
-async def run_server(pump=None, host="127.0.0.1", port=8765):
- """
- Run the server.
-
- Args:
- pump: StreamPump instance for message handling
- host: Bind address
- port: Port number
- """
- app = create_app(pump)
- runner = web.AppRunner(app)
- await runner.setup()
-
- site = web.TCPSite(runner, host, port)
- await site.start()
-
- print(f"Server on http://{host}:{port}")
-
- try:
- while True:
- await asyncio.sleep(3600)
- except asyncio.CancelledError:
- pass
- finally:
- await runner.cleanup()
diff --git a/xml_pipeline/xml_listener.py b/xml_pipeline/xml_listener.py
deleted file mode 100644
index 5954061..0000000
--- a/xml_pipeline/xml_listener.py
+++ /dev/null
@@ -1,83 +0,0 @@
-"""
-xml_listener.py — The Sovereign Contract for All Capabilities (v1.3)
-"""
-
-from __future__ import annotations
-from typing import Optional, Type, Callable
-from pydantic import BaseModel
-
-class XMLListener:
- """
- Base class for all reactive capabilities.
- Now supports Autonomous Registration via Pydantic payload classes.
- """
-
- def __init__(
- self,
- name: str,
- payload_class: Type[BaseModel],
- handler: Callable[[dict], bytes],
- description: Optional[str] = None
- ):
- self.agent_name = name
- self.payload_class = payload_class
- self.handler = handler
- self.description = description or payload_class.__doc__ or "No description provided."
-
- # In v1.3, the root tag is derived from the payload class name
- self.root_tag = payload_class.__name__
- self.listens_to = [self.root_tag]
-
- async def handle(
- self,
- payload_dict: dict,
- thread_id: str,
- sender_name: str,
- ) -> Optional[bytes]:
- """
- React to a pre-validated dictionary payload.
- Returns raw response XML bytes.
- """
- # 1. Execute the handler logic
- # Note: In v1.3, the Bus/Lark handles the XML -> Dict conversion
- return await self.handler(payload_dict)
-
- def generate_xsd(self) -> str:
- """
- Autonomous XSD Synthesis.
- Inspects the payload_class and generates an XSD string.
- """
- # Logic to iterate over self.payload_class.model_fields
- # and build the definitions.
- pass
-
- def generate_prompt_fragment(self) -> str:
- """
- Prompt Synthesis (The 'Mente').
- Generates the tool usage instructions for other agents.
- """
- fragment = [
- f"Capability: {self.agent_name}",
- f"Root Tag: <{self.root_tag}>",
- f"Description: {self.description}",
- "\nParameters:"
- ]
-
- for name, field in self.payload_class.model_fields.items():
- field_type = field.annotation.__name__
- field_desc = field.description or "No description"
- fragment.append(f" - {name} ({field_type}): {field_desc}")
-
- return "\n".join(fragment)
-
- def make_response_envelope(
- self,
- payload_bytes: bytes,
- thread_id: str,
- to: Optional[str] = None
- ) -> bytes:
- """
- Wraps response bytes in a standard envelope.
- """
- # Logic to build the meta block and append the payload_bytes
- pass
\ No newline at end of file