Some checks failed
Invert the agent model: the agent IS the computer. The message pump becomes the kernel, handlers are sandboxed apps, and all access is mediated by the platform. Phase 1 — Container foundation: - Multi-stage Dockerfile (python:3.12-slim, non-root user, /data volume) - deploy/entrypoint.py with --dry-run config validation - docker-compose.yml (cap_drop ALL, read_only, no-new-privileges) - docker-compose.dev.yml overlay for development - CI Docker build smoke test Phase 2 — Security hardening: - xml_pipeline/security/ module with default-deny container mode - Permission gate: per-listener tool allowlist enforcement - Network policy: egress control (only declared LLM backend domains) - Shell tool: disabled in container mode - File tool: restricted to /data and /config in container mode - Fetch tool: integrates network egress policy - Config loader: parses security and network YAML sections Phase 3 — Management plane: - Agent app (port 8080): minimal /health, /inject, /ws only - Management app (port 9090): full API, audit log, dashboard - SQLite-backed audit log for tool invocations and security events - Static web dashboard (no framework, WebSocket-driven) - CLI --split flag for dual-port serving All 439 existing tests pass with zero regressions. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
92 lines
2.6 KiB
Python
92 lines
2.6 KiB
Python
"""
|
|
Permission gate — Per-listener tool allowlist enforcement.
|
|
|
|
In container mode, handlers get NO tools unless explicitly declared
|
|
in their listener config via `allowed_tools`.
|
|
|
|
In development mode, all tools are available to all handlers (current behavior).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from xml_pipeline.tools.base import ToolResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Module state
|
|
_gate_enabled: bool = False
|
|
_listener_allowlists: dict[str, set[str]] = {}
|
|
|
|
|
|
def enable_permission_gate() -> None:
|
|
"""Enable permission gate (tools require explicit allowlist)."""
|
|
global _gate_enabled
|
|
_gate_enabled = True
|
|
|
|
|
|
def disable_permission_gate() -> None:
|
|
"""Disable permission gate (all tools available)."""
|
|
global _gate_enabled
|
|
_gate_enabled = False
|
|
|
|
|
|
def is_gate_enabled() -> bool:
|
|
"""Check if permission gate is active."""
|
|
return _gate_enabled
|
|
|
|
|
|
def register_listener_tools(listener_name: str, allowed_tools: list[str]) -> None:
|
|
"""
|
|
Register the tool allowlist for a listener.
|
|
|
|
Args:
|
|
listener_name: The listener's registered name
|
|
allowed_tools: List of tool names this listener may invoke
|
|
"""
|
|
_listener_allowlists[listener_name] = set(allowed_tools)
|
|
if allowed_tools:
|
|
logger.debug(f"Listener '{listener_name}' tools: {allowed_tools}")
|
|
|
|
|
|
def check_permission(listener_name: str, tool_name: str) -> Optional[ToolResult]:
|
|
"""
|
|
Check if a listener is allowed to invoke a tool.
|
|
|
|
Returns None if allowed, or a ToolResult error if denied.
|
|
"""
|
|
if not _gate_enabled:
|
|
return None
|
|
|
|
allowed = _listener_allowlists.get(listener_name)
|
|
if allowed is None:
|
|
# No allowlist registered — deny by default in container mode
|
|
logger.warning(
|
|
f"Permission denied: listener '{listener_name}' has no tool allowlist, "
|
|
f"attempted to use '{tool_name}'"
|
|
)
|
|
return ToolResult(
|
|
success=False,
|
|
error="Tool access denied. No tools are configured for this listener.",
|
|
)
|
|
|
|
if tool_name not in allowed:
|
|
logger.warning(
|
|
f"Permission denied: listener '{listener_name}' "
|
|
f"not allowed to use tool '{tool_name}'"
|
|
)
|
|
return ToolResult(
|
|
success=False,
|
|
error=f"Tool '{tool_name}' is not in the allowed tools for this listener.",
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def reset() -> None:
|
|
"""Reset permission gate state (for testing)."""
|
|
global _gate_enabled
|
|
_gate_enabled = False
|
|
_listener_allowlists.clear()
|