xml-pipeline/xml_pipeline/tools/permission_gate.py
dullfig 06eeea3dee
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
CI / docker (push) Has been cancelled
Add AgentOS container foundation, security hardening, and management plane
Invert the agent model: the agent IS the computer. The message pump
becomes the kernel, handlers are sandboxed apps, and all access is
mediated by the platform.

Phase 1 — Container foundation:
- Multi-stage Dockerfile (python:3.12-slim, non-root user, /data volume)
- deploy/entrypoint.py with --dry-run config validation
- docker-compose.yml (cap_drop ALL, read_only, no-new-privileges)
- docker-compose.dev.yml overlay for development
- CI Docker build smoke test

Phase 2 — Security hardening:
- xml_pipeline/security/ module with default-deny container mode
- Permission gate: per-listener tool allowlist enforcement
- Network policy: egress control (only declared LLM backend domains)
- Shell tool: disabled in container mode
- File tool: restricted to /data and /config in container mode
- Fetch tool: integrates network egress policy
- Config loader: parses security and network YAML sections

Phase 3 — Management plane:
- Agent app (port 8080): minimal /health, /inject, /ws only
- Management app (port 9090): full API, audit log, dashboard
- SQLite-backed audit log for tool invocations and security events
- Static web dashboard (no framework, WebSocket-driven)
- CLI --split flag for dual-port serving

All 439 existing tests pass with zero regressions.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 21:37:24 -08:00

92 lines
2.6 KiB
Python

"""
Permission gate — Per-listener tool allowlist enforcement.
In container mode, handlers get NO tools unless explicitly declared
in their listener config via `allowed_tools`.
In development mode, all tools are available to all handlers (current behavior).
"""
from __future__ import annotations
import logging
from typing import Optional
from xml_pipeline.tools.base import ToolResult
logger = logging.getLogger(__name__)
# Module state
_gate_enabled: bool = False
_listener_allowlists: dict[str, set[str]] = {}
def enable_permission_gate() -> None:
"""Enable permission gate (tools require explicit allowlist)."""
global _gate_enabled
_gate_enabled = True
def disable_permission_gate() -> None:
"""Disable permission gate (all tools available)."""
global _gate_enabled
_gate_enabled = False
def is_gate_enabled() -> bool:
"""Check if permission gate is active."""
return _gate_enabled
def register_listener_tools(listener_name: str, allowed_tools: list[str]) -> None:
"""
Register the tool allowlist for a listener.
Args:
listener_name: The listener's registered name
allowed_tools: List of tool names this listener may invoke
"""
_listener_allowlists[listener_name] = set(allowed_tools)
if allowed_tools:
logger.debug(f"Listener '{listener_name}' tools: {allowed_tools}")
def check_permission(listener_name: str, tool_name: str) -> Optional[ToolResult]:
"""
Check if a listener is allowed to invoke a tool.
Returns None if allowed, or a ToolResult error if denied.
"""
if not _gate_enabled:
return None
allowed = _listener_allowlists.get(listener_name)
if allowed is None:
# No allowlist registered — deny by default in container mode
logger.warning(
f"Permission denied: listener '{listener_name}' has no tool allowlist, "
f"attempted to use '{tool_name}'"
)
return ToolResult(
success=False,
error="Tool access denied. No tools are configured for this listener.",
)
if tool_name not in allowed:
logger.warning(
f"Permission denied: listener '{listener_name}' "
f"not allowed to use tool '{tool_name}'"
)
return ToolResult(
success=False,
error=f"Tool '{tool_name}' is not in the allowed tools for this listener.",
)
return None
def reset() -> None:
"""Reset permission gate state (for testing)."""
global _gate_enabled
_gate_enabled = False
_listener_allowlists.clear()