Tools (18 total): - calculate: Safe AST-based math expression evaluator - fetch: Async HTTP with SSRF protection - files: Sandboxed read/write/list/delete - shell: Command execution with blocklist - search: Web search (SerpAPI, Google, Bing) - keyvalue: In-memory key-value store - librarian: exist-db XML database integration - convert: XML↔JSON conversion + XPath extraction Infrastructure: - CLI with run/init/check/version commands - Config loader for organism.yaml - Feature detection for optional dependencies - Optional extras in pyproject.toml LLM: - Fixed llm_connection.py to wrap working router WASM: - Documented WASM listener interface - Stub implementation for future work MCP: - Reddit sentiment MCP server example Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
192 lines
6.3 KiB
Python
192 lines
6.3 KiB
Python
"""
|
|
File tools - sandboxed file system operations.
|
|
|
|
All paths are validated against configured allowed directories.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
|
|
from .base import tool, ToolResult
|
|
|
|
|
|
# Security configuration
|
|
_allowed_paths: List[Path] = []
|
|
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
|
|
MAX_LISTING_ENTRIES = 1000
|
|
|
|
|
|
def configure_allowed_paths(paths: List[str | Path]) -> None:
|
|
global _allowed_paths
|
|
_allowed_paths = [Path(p).resolve() for p in paths]
|
|
|
|
|
|
def _validate_path(path: str) -> tuple[Optional[str], Optional[Path]]:
|
|
if not _allowed_paths:
|
|
try:
|
|
return None, Path(path).resolve()
|
|
except Exception as e:
|
|
return f"Invalid path: {e}", None
|
|
try:
|
|
resolved = Path(path).resolve()
|
|
except Exception as e:
|
|
return f"Invalid path: {e}", None
|
|
if ".." in str(path):
|
|
return "Path traversal (..) not allowed", None
|
|
for allowed in _allowed_paths:
|
|
try:
|
|
resolved.relative_to(allowed)
|
|
return None, resolved
|
|
except ValueError:
|
|
continue
|
|
return "Path not in allowed directories", None
|
|
|
|
|
|
@tool
|
|
async def read_file(
|
|
path: str,
|
|
encoding: str = "utf-8",
|
|
binary: bool = False,
|
|
offset: int = 0,
|
|
limit: Optional[int] = None,
|
|
) -> ToolResult:
|
|
error, resolved = _validate_path(path)
|
|
if error:
|
|
return ToolResult(success=False, error=error)
|
|
if not resolved.exists():
|
|
return ToolResult(success=False, error=f"File not found: {path}")
|
|
if not resolved.is_file():
|
|
return ToolResult(success=False, error=f"Not a file: {path}")
|
|
try:
|
|
file_size = resolved.stat().st_size
|
|
read_size = min(limit or MAX_FILE_SIZE, MAX_FILE_SIZE)
|
|
if binary:
|
|
with open(resolved, "rb") as f:
|
|
if offset:
|
|
f.seek(offset)
|
|
content = f.read(read_size)
|
|
return ToolResult(success=True, data={
|
|
"content": base64.b64encode(content).decode("ascii"),
|
|
"size": file_size,
|
|
"encoding": "base64",
|
|
})
|
|
else:
|
|
with open(resolved, "r", encoding=encoding) as f:
|
|
if offset:
|
|
f.seek(offset)
|
|
content = f.read(read_size)
|
|
return ToolResult(success=True, data={
|
|
"content": content,
|
|
"size": file_size,
|
|
"encoding": encoding,
|
|
})
|
|
except UnicodeDecodeError:
|
|
return ToolResult(success=False, error=f"Cannot decode as {encoding}. Try binary=true.")
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"Read error: {e}")
|
|
|
|
|
|
@tool
|
|
async def write_file(
|
|
path: str,
|
|
content: str,
|
|
mode: str = "overwrite",
|
|
encoding: str = "utf-8",
|
|
binary: bool = False,
|
|
create_dirs: bool = False,
|
|
) -> ToolResult:
|
|
error, resolved = _validate_path(path)
|
|
if error:
|
|
return ToolResult(success=False, error=error)
|
|
if binary:
|
|
try:
|
|
data = base64.b64decode(content)
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"Invalid base64: {e}")
|
|
else:
|
|
data = content.encode(encoding)
|
|
if len(data) > MAX_FILE_SIZE:
|
|
return ToolResult(success=False, error=f"Content too large: {len(data)} bytes")
|
|
try:
|
|
if create_dirs:
|
|
resolved.parent.mkdir(parents=True, exist_ok=True)
|
|
if binary:
|
|
write_mode = "ab" if mode == "append" else "wb"
|
|
with open(resolved, write_mode) as f:
|
|
f.write(data)
|
|
else:
|
|
if mode == "append":
|
|
with open(resolved, "a", encoding=encoding) as f:
|
|
f.write(content)
|
|
else:
|
|
resolved.write_text(content, encoding=encoding)
|
|
return ToolResult(success=True, data={"bytes_written": len(data), "path": str(resolved)})
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"Write error: {e}")
|
|
|
|
|
|
@tool
|
|
async def list_dir(
|
|
path: str,
|
|
pattern: str = "*",
|
|
recursive: bool = False,
|
|
include_hidden: bool = False,
|
|
) -> ToolResult:
|
|
error, resolved = _validate_path(path)
|
|
if error:
|
|
return ToolResult(success=False, error=error)
|
|
if not resolved.exists():
|
|
return ToolResult(success=False, error=f"Directory not found: {path}")
|
|
if not resolved.is_dir():
|
|
return ToolResult(success=False, error=f"Not a directory: {path}")
|
|
try:
|
|
entries = []
|
|
glob_method = resolved.rglob if recursive else resolved.glob
|
|
for entry in glob_method(pattern):
|
|
if not include_hidden and entry.name.startswith("."):
|
|
continue
|
|
try:
|
|
stat = entry.stat()
|
|
entries.append({
|
|
"name": str(entry.relative_to(resolved)),
|
|
"type": "dir" if entry.is_dir() else "file",
|
|
"size": stat.st_size if entry.is_file() else None,
|
|
"modified": stat.st_mtime,
|
|
})
|
|
except (OSError, PermissionError):
|
|
continue
|
|
if len(entries) >= MAX_LISTING_ENTRIES:
|
|
break
|
|
entries.sort(key=lambda e: e["name"])
|
|
return ToolResult(success=True, data={
|
|
"entries": entries,
|
|
"count": len(entries),
|
|
"truncated": len(entries) >= MAX_LISTING_ENTRIES,
|
|
})
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"List error: {e}")
|
|
|
|
|
|
@tool
|
|
async def delete_file(path: str, recursive: bool = False) -> ToolResult:
|
|
error, resolved = _validate_path(path)
|
|
if error:
|
|
return ToolResult(success=False, error=error)
|
|
if not resolved.exists():
|
|
return ToolResult(success=False, error=f"Path not found: {path}")
|
|
try:
|
|
if resolved.is_file():
|
|
resolved.unlink()
|
|
elif resolved.is_dir():
|
|
if not recursive:
|
|
return ToolResult(success=False, error="Cannot delete directory without recursive=true")
|
|
import shutil
|
|
shutil.rmtree(resolved)
|
|
else:
|
|
return ToolResult(success=False, error=f"Unknown file type: {path}")
|
|
return ToolResult(success=True, data={"deleted": True, "path": str(resolved)})
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"Delete error: {e}")
|