xml-pipeline/xml_pipeline/tools/fetch.py
dullfig 06eeea3dee
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
CI / docker (push) Has been cancelled
Add AgentOS container foundation, security hardening, and management plane
Invert the agent model: the agent IS the computer. The message pump
becomes the kernel, handlers are sandboxed apps, and all access is
mediated by the platform.

Phase 1 — Container foundation:
- Multi-stage Dockerfile (python:3.12-slim, non-root user, /data volume)
- deploy/entrypoint.py with --dry-run config validation
- docker-compose.yml (cap_drop ALL, read_only, no-new-privileges)
- docker-compose.dev.yml overlay for development
- CI Docker build smoke test

Phase 2 — Security hardening:
- xml_pipeline/security/ module with default-deny container mode
- Permission gate: per-listener tool allowlist enforcement
- Network policy: egress control (only declared LLM backend domains)
- Shell tool: disabled in container mode
- File tool: restricted to /data and /config in container mode
- Fetch tool: integrates network egress policy
- Config loader: parses security and network YAML sections

Phase 3 — Management plane:
- Agent app (port 8080): minimal /health, /inject, /ws only
- Management app (port 9090): full API, audit log, dashboard
- SQLite-backed audit log for tool invocations and security events
- Static web dashboard (no framework, WebSocket-driven)
- CLI --split flag for dual-port serving

All 439 existing tests pass with zero regressions.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 21:37:24 -08:00

196 lines
6.2 KiB
Python

"""
Fetch tool - HTTP requests with security controls.
Uses aiohttp for async HTTP operations.
In container mode, egress is restricted to allowlisted domains only.
"""
from __future__ import annotations
import ipaddress
import socket
from typing import Optional, Dict
from urllib.parse import urlparse
from .base import tool, ToolResult
# Try to import aiohttp - optional dependency
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
# Security configuration
MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB
DEFAULT_TIMEOUT = 30
ALLOWED_SCHEMES = {"http", "https"}
BLOCKED_HOSTS = {
"localhost",
"127.0.0.1",
"0.0.0.0",
"::1",
"metadata.google.internal", # GCP metadata
"169.254.169.254", # AWS/Azure/GCP metadata
}
# Container mode flag — when True, egress is restricted via network policy
_container_mode: bool = False
def set_container_mode(enabled: bool) -> None:
"""Enable or disable container mode (restricts egress to allowlisted domains)."""
global _container_mode
_container_mode = enabled
def is_container_mode() -> bool:
"""Check if fetch is in container mode."""
return _container_mode
def _is_private_ip(hostname: str) -> bool:
"""Check if hostname resolves to a private/internal IP."""
try:
# Try to parse as IP address first
try:
ip = ipaddress.ip_address(hostname)
return ip.is_private or ip.is_loopback or ip.is_link_local
except ValueError:
pass
# Resolve hostname to IP
ip_str = socket.gethostbyname(hostname)
ip = ipaddress.ip_address(ip_str)
return ip.is_private or ip.is_loopback or ip.is_link_local
except (socket.gaierror, socket.herror):
# Can't resolve - block by default for security
return True
def _validate_url(url: str, allow_internal: bool = False) -> Optional[str]:
"""Validate URL for security. Returns error message or None if OK."""
try:
parsed = urlparse(url)
except Exception:
return "Invalid URL format"
if parsed.scheme not in ALLOWED_SCHEMES:
return f"Scheme '{parsed.scheme}' not allowed. Use http or https."
if not parsed.netloc:
return "URL must have a host"
hostname = parsed.hostname or ""
if hostname in BLOCKED_HOSTS:
return f"Host '{hostname}' is blocked"
if not allow_internal and _is_private_ip(hostname):
return f"Access to internal/private IPs is not allowed"
return None
@tool
async def fetch_url(
url: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
body: Optional[str] = None,
timeout: int = DEFAULT_TIMEOUT,
allow_internal: bool = False,
) -> ToolResult:
"""
Fetch content from a URL.
Args:
url: The URL to fetch
method: HTTP method (GET, POST, PUT, DELETE, PATCH, HEAD)
headers: Optional HTTP headers
body: Optional request body for POST/PUT/PATCH
timeout: Request timeout in seconds (default: 30, max: 300)
allow_internal: Allow internal/private IPs (default: false)
Returns:
status_code, headers, body, url (final URL after redirects)
Security:
- Only http/https schemes allowed
- No access to localhost, metadata endpoints, or private IPs by default
- Response size limited to 10 MB
- Timeout enforced
"""
if not AIOHTTP_AVAILABLE:
return ToolResult(
success=False,
error="aiohttp not installed. Install with: pip install xml-pipeline[server]"
)
# Validate URL
if error := _validate_url(url, allow_internal):
return ToolResult(success=False, error=error)
# Container mode: enforce network egress policy
if _container_mode:
from xml_pipeline.tools.network_policy import check_egress
if egress_error := check_egress(url):
return ToolResult(success=False, error=egress_error)
# Validate method
method = method.upper()
allowed_methods = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"}
if method not in allowed_methods:
return ToolResult(success=False, error=f"Method '{method}' not allowed")
# Clamp timeout
timeout = min(max(1, timeout), 300)
try:
client_timeout = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=client_timeout) as session:
async with session.request(
method,
url,
headers=headers,
data=body,
) as resp:
# Check response size before reading
content_length = resp.headers.get("Content-Length")
if content_length and int(content_length) > MAX_RESPONSE_SIZE:
return ToolResult(
success=False,
error=f"Response too large: {content_length} bytes (max: {MAX_RESPONSE_SIZE})"
)
# Read response with size limit
body_bytes = await resp.content.read(MAX_RESPONSE_SIZE + 1)
if len(body_bytes) > MAX_RESPONSE_SIZE:
return ToolResult(
success=False,
error=f"Response exceeded {MAX_RESPONSE_SIZE} bytes"
)
# Try to decode as text
try:
body_text = body_bytes.decode("utf-8")
except UnicodeDecodeError:
# Return base64 for binary content
import base64
body_text = base64.b64encode(body_bytes).decode("ascii")
return ToolResult(success=True, data={
"status_code": resp.status,
"headers": dict(resp.headers),
"body": body_text,
"url": str(resp.url), # Final URL after redirects
})
except aiohttp.ClientError as e:
return ToolResult(success=False, error=f"HTTP error: {e}")
except TimeoutError:
return ToolResult(success=False, error=f"Request timed out after {timeout}s")
except Exception as e:
return ToolResult(success=False, error=f"Fetch error: {e}")