xml-pipeline/xml_pipeline/tools/network_policy.py
dullfig 06eeea3dee
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
CI / docker (push) Has been cancelled
Add AgentOS container foundation, security hardening, and management plane
Invert the agent model: the agent IS the computer. The message pump
becomes the kernel, handlers are sandboxed apps, and all access is
mediated by the platform.

Phase 1 — Container foundation:
- Multi-stage Dockerfile (python:3.12-slim, non-root user, /data volume)
- deploy/entrypoint.py with --dry-run config validation
- docker-compose.yml (cap_drop ALL, read_only, no-new-privileges)
- docker-compose.dev.yml overlay for development
- CI Docker build smoke test

Phase 2 — Security hardening:
- xml_pipeline/security/ module with default-deny container mode
- Permission gate: per-listener tool allowlist enforcement
- Network policy: egress control (only declared LLM backend domains)
- Shell tool: disabled in container mode
- File tool: restricted to /data and /config in container mode
- Fetch tool: integrates network egress policy
- Config loader: parses security and network YAML sections

Phase 3 — Management plane:
- Agent app (port 8080): minimal /health, /inject, /ws only
- Management app (port 9090): full API, audit log, dashboard
- SQLite-backed audit log for tool invocations and security events
- Static web dashboard (no framework, WebSocket-driven)
- CLI --split flag for dual-port serving

All 439 existing tests pass with zero regressions.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 21:37:24 -08:00

109 lines
2.9 KiB
Python

"""
Network policy — Egress control for container mode.
Default-deny: only declared LLM backend domains and explicitly
allowlisted domains can be reached.
In development mode, no restrictions apply.
"""
from __future__ import annotations
import logging
from typing import Optional
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
# Module state
_egress_enabled: bool = False
_allowed_domains: set[str] = set()
# LLM provider domains that are auto-allowlisted based on configured backends
LLM_PROVIDER_DOMAINS: dict[str, list[str]] = {
"xai": ["api.x.ai"],
"anthropic": ["api.anthropic.com"],
"openai": ["api.openai.com"],
"ollama": ["localhost", "127.0.0.1"],
}
def enable_egress_control() -> None:
"""Enable egress control (deny by default)."""
global _egress_enabled
_egress_enabled = True
def disable_egress_control() -> None:
"""Disable egress control (allow all)."""
global _egress_enabled
_egress_enabled = False
def is_egress_controlled() -> bool:
"""Check if egress control is active."""
return _egress_enabled
def allow_domain(domain: str) -> None:
"""Add a domain to the egress allowlist."""
_allowed_domains.add(domain.lower())
def allow_domains(domains: list[str]) -> None:
"""Add multiple domains to the egress allowlist."""
for domain in domains:
_allowed_domains.add(domain.lower())
def allow_llm_provider(provider: str) -> None:
"""Auto-allowlist domains for an LLM provider."""
domains = LLM_PROVIDER_DOMAINS.get(provider, [])
for domain in domains:
_allowed_domains.add(domain.lower())
logger.debug(f"Auto-allowlisted domain for {provider}: {domain}")
def check_egress(url: str) -> Optional[str]:
"""
Check if a URL is allowed by the egress policy.
Returns None if allowed, or an error message if blocked.
"""
if not _egress_enabled:
return None
try:
parsed = urlparse(url)
hostname = (parsed.hostname or "").lower()
except Exception:
return "Invalid URL"
if not hostname:
return "URL must have a host"
# Check exact domain match
if hostname in _allowed_domains:
return None
# Check wildcard subdomain match (e.g., *.example.com)
parts = hostname.split(".")
for i in range(1, len(parts)):
parent = ".".join(parts[i:])
if f"*.{parent}" in _allowed_domains or parent in _allowed_domains:
return None
logger.warning(f"Egress blocked: {hostname} not in allowlist")
return f"Egress blocked: domain '{hostname}' is not in the allowed domains list"
def get_allowed_domains() -> set[str]:
"""Get the current set of allowed domains."""
return set(_allowed_domains)
def reset() -> None:
"""Reset network policy state (for testing)."""
global _egress_enabled
_egress_enabled = False
_allowed_domains.clear()