xml-pipeline/xml_pipeline/tools/shell.py
dullfig 06eeea3dee
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
CI / docker (push) Has been cancelled
Add AgentOS container foundation, security hardening, and management plane
Invert the agent model: the agent IS the computer. The message pump
becomes the kernel, handlers are sandboxed apps, and all access is
mediated by the platform.

Phase 1 — Container foundation:
- Multi-stage Dockerfile (python:3.12-slim, non-root user, /data volume)
- deploy/entrypoint.py with --dry-run config validation
- docker-compose.yml (cap_drop ALL, read_only, no-new-privileges)
- docker-compose.dev.yml overlay for development
- CI Docker build smoke test

Phase 2 — Security hardening:
- xml_pipeline/security/ module with default-deny container mode
- Permission gate: per-listener tool allowlist enforcement
- Network policy: egress control (only declared LLM backend domains)
- Shell tool: disabled in container mode
- File tool: restricted to /data and /config in container mode
- Fetch tool: integrates network egress policy
- Config loader: parses security and network YAML sections

Phase 3 — Management plane:
- Agent app (port 8080): minimal /health, /inject, /ws only
- Management app (port 9090): full API, audit log, dashboard
- SQLite-backed audit log for tool invocations and security events
- Static web dashboard (no framework, WebSocket-driven)
- CLI --split flag for dual-port serving

All 439 existing tests pass with zero regressions.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 21:37:24 -08:00

186 lines
5.6 KiB
Python

"""
Shell tool - sandboxed command execution.
Provides controlled command execution with security restrictions.
In container mode, shell is disabled entirely.
"""
from __future__ import annotations
import asyncio
import shlex
from typing import Optional, List
from .base import tool, ToolResult
# Container mode flag — when True, all shell commands are rejected
_container_mode: bool = False
def set_container_mode(enabled: bool) -> None:
"""Enable or disable container mode (disables shell entirely)."""
global _container_mode
_container_mode = enabled
def is_container_mode() -> bool:
"""Check if shell is in container mode (disabled)."""
return _container_mode
# Security configuration
ALLOWED_COMMANDS: List[str] = [] # Empty = check blocklist only
BLOCKED_COMMANDS: List[str] = [
# Destructive commands
"rm", "rmdir", "del", "erase", "format", "mkfs", "dd",
# System modification
"shutdown", "reboot", "init", "systemctl",
# Network tools that could be abused
"nc", "netcat", "ncat",
# Privilege escalation
"sudo", "su", "doas", "runas",
# Shell escapes
"bash", "sh", "zsh", "fish", "cmd", "powershell", "pwsh",
]
DEFAULT_TIMEOUT = 30
MAX_TIMEOUT = 300
MAX_OUTPUT_SIZE = 1024 * 1024 # 1 MB
def configure_allowed_commands(commands: List[str]) -> None:
"""Set an allowlist of commands (empty = blocklist mode)."""
global ALLOWED_COMMANDS
ALLOWED_COMMANDS = commands
def configure_blocked_commands(commands: List[str]) -> None:
"""Set additional blocked commands."""
global BLOCKED_COMMANDS
BLOCKED_COMMANDS = commands
def _validate_command(command: str) -> Optional[str]:
"""Validate command against allow/block lists. Returns error or None."""
try:
# Parse command to get the executable
parts = shlex.split(command)
if not parts:
return "Empty command"
executable = parts[0].lower()
# Strip path to get just the command name
if "/" in executable or "\\" in executable:
executable = executable.split("/")[-1].split("\\")[-1]
# Check allowlist first (if configured)
if ALLOWED_COMMANDS:
if executable not in ALLOWED_COMMANDS:
return f"Command '{executable}' not in allowlist"
# Check blocklist
if executable in BLOCKED_COMMANDS:
return f"Command '{executable}' is blocked for security"
# Check for shell operators that could be dangerous
dangerous_operators = [";", "&&", "||", "|", "`", "$(", "${"]
for op in dangerous_operators:
if op in command:
return f"Shell operator '{op}' not allowed"
return None
except ValueError as e:
return f"Invalid command syntax: {e}"
@tool
async def run_command(
command: str,
timeout: int = DEFAULT_TIMEOUT,
cwd: Optional[str] = None,
env: Optional[dict] = None,
) -> ToolResult:
"""
Execute a shell command (sandboxed).
Args:
command: Command to execute
timeout: Timeout in seconds (default: 30, max: 300)
cwd: Working directory (optional)
env: Environment variables to add (optional)
Returns:
exit_code: Process exit code
stdout: Standard output
stderr: Standard error
timed_out: True if command was killed due to timeout
Security:
- Dangerous commands are blocked
- Shell operators (;, &&, |, etc.) are blocked
- Timeout enforced
- Output size limited to 1 MB
"""
# Container mode: shell disabled entirely
if _container_mode:
return ToolResult(
success=False,
error="Shell access is disabled in container mode.",
)
# Validate command
if error := _validate_command(command):
return ToolResult(success=False, error=error)
# Clamp timeout
timeout = min(max(1, timeout), MAX_TIMEOUT)
try:
# Parse command into args (no shell)
args = shlex.split(command)
# Create subprocess
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=env,
)
timed_out = False
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
timed_out = True
stdout = b""
stderr = b"Command timed out"
# Decode output, truncating if too large
stdout_str = stdout[:MAX_OUTPUT_SIZE].decode("utf-8", errors="replace")
stderr_str = stderr[:MAX_OUTPUT_SIZE].decode("utf-8", errors="replace")
truncated = len(stdout) > MAX_OUTPUT_SIZE or len(stderr) > MAX_OUTPUT_SIZE
return ToolResult(
success=proc.returncode == 0 and not timed_out,
data={
"exit_code": proc.returncode,
"stdout": stdout_str,
"stderr": stderr_str,
"timed_out": timed_out,
"truncated": truncated,
}
)
except FileNotFoundError:
return ToolResult(success=False, error=f"Command not found: {args[0]}")
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {args[0]}")
except Exception as e:
return ToolResult(success=False, error=f"Execution error: {e}")