Add tool stubs for native agent tools

Stub implementations for:
- base.py: Tool, ToolResult, @tool decorator, registry
- calculate.py: Math expressions (simpleeval)
- fetch.py: HTTP requests (aiohttp)
- files.py: read_file, write_file, list_dir
- shell.py: run_command (sandboxed)
- search.py: web_search
- keyvalue.py: key_value_get/set/delete (in-memory stub)
- librarian.py: exist-db integration (store, get, query, search)

All stubs return "Not implemented" - ready for real implementation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
dullfig 2026-01-12 22:40:58 -08:00
parent 7950be66f3
commit 3764c30628
9 changed files with 692 additions and 0 deletions

View file

@ -0,0 +1,38 @@
"""
Native tools for agents.
Tools are sandboxed, permission-controlled functions that agents can invoke
to interact with the outside world.
"""
from .base import Tool, ToolResult, tool, get_tool_registry
from .calculate import calculate
from .fetch import fetch_url
from .files import read_file, write_file, list_dir
from .shell import run_command
from .search import web_search
from .keyvalue import key_value_get, key_value_set, key_value_delete
from .librarian import librarian_store, librarian_get, librarian_query, librarian_search
__all__ = [
# Base
"Tool",
"ToolResult",
"tool",
"get_tool_registry",
# Tools
"calculate",
"fetch_url",
"read_file",
"write_file",
"list_dir",
"run_command",
"web_search",
"key_value_get",
"key_value_set",
"key_value_delete",
"librarian_store",
"librarian_get",
"librarian_query",
"librarian_search",
]

108
agentserver/tools/base.py Normal file
View file

@ -0,0 +1,108 @@
"""
Base classes and registry for tools.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from functools import wraps
import inspect
@dataclass
class ToolResult:
"""Result from a tool invocation."""
success: bool
data: Any = None
error: Optional[str] = None
@dataclass
class Tool:
"""Tool metadata and implementation."""
name: str
description: str
func: Callable
parameters: Dict[str, Any] = field(default_factory=dict)
async def invoke(self, **kwargs) -> ToolResult:
"""Invoke the tool with given parameters."""
try:
result = await self.func(**kwargs)
if isinstance(result, ToolResult):
return result
return ToolResult(success=True, data=result)
except Exception as e:
return ToolResult(success=False, error=str(e))
class ToolRegistry:
"""Registry of available tools."""
def __init__(self):
self._tools: Dict[str, Tool] = {}
def register(self, tool: Tool):
"""Register a tool."""
self._tools[tool.name] = tool
def get(self, name: str) -> Optional[Tool]:
"""Get a tool by name."""
return self._tools.get(name)
def list(self) -> List[str]:
"""List all registered tool names."""
return list(self._tools.keys())
def all(self) -> Dict[str, Tool]:
"""Get all tools."""
return dict(self._tools)
# Global registry
_registry: Optional[ToolRegistry] = None
def get_tool_registry() -> ToolRegistry:
"""Get the global tool registry."""
global _registry
if _registry is None:
_registry = ToolRegistry()
return _registry
def tool(func: Callable) -> Callable:
"""Decorator to register a function as a tool."""
# Extract metadata from function
name = func.__name__
description = func.__doc__ or ""
# Extract parameters from signature
sig = inspect.signature(func)
parameters = {}
for param_name, param in sig.parameters.items():
param_info = {"name": param_name}
if param.annotation != inspect.Parameter.empty:
param_info["type"] = param.annotation.__name__
if param.default != inspect.Parameter.empty:
param_info["default"] = param.default
parameters[param_name] = param_info
# Create tool
t = Tool(
name=name,
description=description.strip(),
func=func,
parameters=parameters,
)
# Register
get_tool_registry().register(t)
@wraps(func)
async def wrapper(**kwargs):
return await t.invoke(**kwargs)
wrapper._tool = t
return wrapper

View file

@ -0,0 +1,60 @@
"""
Calculate tool - evaluate mathematical expressions.
Uses simpleeval for safe expression evaluation with Python syntax.
"""
from .base import tool, ToolResult
# TODO: pip install simpleeval
# from simpleeval import simple_eval
# import math
MATH_FUNCTIONS = {
# "abs": abs,
# "round": round,
# "min": min,
# "max": max,
# "sqrt": math.sqrt,
# "sin": math.sin,
# "cos": math.cos,
# "tan": math.tan,
# "log": math.log,
# "log10": math.log10,
}
MATH_CONSTANTS = {
# "pi": math.pi,
# "e": math.e,
}
@tool
async def calculate(expression: str) -> ToolResult:
"""
Evaluate a mathematical expression using Python syntax.
Supported:
- Basic ops: + - * / // % **
- Comparisons: < > <= >= == !=
- Functions: abs, round, min, max, sqrt, sin, cos, tan, log, log10
- Constants: pi, e
- Parentheses for grouping
Examples:
- "2 + 2" 4
- "(10 + 5) * 3" 45
- "sqrt(16) + pi" 7.141592...
"""
# TODO: Implement with simpleeval
# try:
# result = simple_eval(
# expression,
# functions=MATH_FUNCTIONS,
# names=MATH_CONSTANTS,
# )
# return ToolResult(success=True, data=result)
# except Exception as e:
# return ToolResult(success=False, error=str(e))
return ToolResult(success=False, error="Not implemented - install simpleeval")

View file

@ -0,0 +1,48 @@
"""
Fetch tool - HTTP requests.
Uses aiohttp for async HTTP operations.
"""
from typing import Optional, Dict
from .base import tool, ToolResult
@tool
async def fetch_url(
url: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
body: Optional[str] = None,
timeout: int = 30,
) -> ToolResult:
"""
Fetch content from a URL.
Args:
url: The URL to fetch
method: HTTP method (GET, POST, PUT, DELETE)
headers: Optional HTTP headers
body: Optional request body for POST/PUT
timeout: Request timeout in seconds
Returns:
status_code, headers, body
Security:
- URL allowlist/blocklist configurable
- Timeout enforced
- Response size limit
- No file:// or internal IPs by default
"""
# TODO: Implement with aiohttp
# import aiohttp
# async with aiohttp.ClientSession() as session:
# async with session.request(method, url, headers=headers, data=body, timeout=timeout) as resp:
# return ToolResult(success=True, data={
# "status_code": resp.status,
# "headers": dict(resp.headers),
# "body": await resp.text(),
# })
return ToolResult(success=False, error="Not implemented")

135
agentserver/tools/files.py Normal file
View file

@ -0,0 +1,135 @@
"""
File tools - sandboxed file system operations.
"""
from typing import Optional, List
from pathlib import Path
from .base import tool, ToolResult
# TODO: Configure allowed paths
ALLOWED_PATHS: List[Path] = []
def _validate_path(path: str) -> Optional[str]:
"""Validate path is within allowed directories."""
# TODO: Implement chroot validation
# resolved = Path(path).resolve()
# for allowed in ALLOWED_PATHS:
# if resolved.is_relative_to(allowed):
# return None
# return f"Path {path} not in allowed directories"
return None # Stub: allow all for now
@tool
async def read_file(
path: str,
encoding: str = "utf-8",
binary: bool = False,
) -> ToolResult:
"""
Read contents of a file.
Args:
path: Path to file
encoding: Text encoding (default: utf-8)
binary: Return base64 if true (default: false)
Security:
- Chroot to allowed directories
- No path traversal (..)
- Size limit enforced
"""
if error := _validate_path(path):
return ToolResult(success=False, error=error)
# TODO: Implement
# try:
# p = Path(path)
# if binary:
# import base64
# content = base64.b64encode(p.read_bytes()).decode()
# else:
# content = p.read_text(encoding=encoding)
# return ToolResult(success=True, data=content)
# except Exception as e:
# return ToolResult(success=False, error=str(e))
return ToolResult(success=False, error="Not implemented")
@tool
async def write_file(
path: str,
content: str,
mode: str = "overwrite",
encoding: str = "utf-8",
) -> ToolResult:
"""
Write content to a file.
Args:
path: Path to file
content: Content to write
mode: "overwrite" or "append" (default: overwrite)
encoding: Text encoding (default: utf-8)
Security:
- Chroot to allowed directories
- No path traversal
- Max file size enforced
"""
if error := _validate_path(path):
return ToolResult(success=False, error=error)
# TODO: Implement
# try:
# p = Path(path)
# if mode == "append":
# with open(p, "a", encoding=encoding) as f:
# f.write(content)
# else:
# p.write_text(content, encoding=encoding)
# return ToolResult(success=True, data={"bytes_written": len(content.encode(encoding))})
# except Exception as e:
# return ToolResult(success=False, error=str(e))
return ToolResult(success=False, error="Not implemented")
@tool
async def list_dir(
path: str,
pattern: str = "*",
) -> ToolResult:
"""
List directory contents.
Args:
path: Directory path
pattern: Glob pattern filter (default: *)
Returns:
Array of {name, type, size, modified}
"""
if error := _validate_path(path):
return ToolResult(success=False, error=error)
# TODO: Implement
# try:
# p = Path(path)
# entries = []
# for entry in p.glob(pattern):
# stat = entry.stat()
# entries.append({
# "name": entry.name,
# "type": "dir" if entry.is_dir() else "file",
# "size": stat.st_size,
# "modified": stat.st_mtime,
# })
# return ToolResult(success=True, data=entries)
# except Exception as e:
# return ToolResult(success=False, error=str(e))
return ToolResult(success=False, error="Not implemented")

View file

@ -0,0 +1,78 @@
"""
Key-value store tool - persistent agent state.
"""
from typing import Any, Optional
from .base import tool, ToolResult
# TODO: Implement backend (Redis, SQLite, or in-memory)
_store: dict = {} # Temporary in-memory store
@tool
async def key_value_get(
key: str,
namespace: Optional[str] = None,
) -> ToolResult:
"""
Get a value from the key-value store.
Args:
key: Key to retrieve
namespace: Namespace for isolation (default: agent name)
Returns:
Stored value, or null if not found
"""
# TODO: Implement with Redis/SQLite
ns_key = f"{namespace or 'default'}:{key}"
value = _store.get(ns_key)
return ToolResult(success=True, data=value)
@tool
async def key_value_set(
key: str,
value: Any,
namespace: Optional[str] = None,
ttl: Optional[int] = None,
) -> ToolResult:
"""
Set a value in the key-value store.
Args:
key: Key to store
value: Value to store (JSON-serializable)
namespace: Namespace for isolation (default: agent name)
ttl: Time-to-live in seconds (optional)
Returns:
success (bool)
"""
# TODO: Implement with Redis/SQLite, handle TTL
ns_key = f"{namespace or 'default'}:{key}"
_store[ns_key] = value
return ToolResult(success=True, data=True)
@tool
async def key_value_delete(
key: str,
namespace: Optional[str] = None,
) -> ToolResult:
"""
Delete a key from the key-value store.
Args:
key: Key to delete
namespace: Namespace for isolation
Returns:
deleted (bool)
"""
# TODO: Implement with Redis/SQLite
ns_key = f"{namespace or 'default'}:{key}"
deleted = ns_key in _store
_store.pop(ns_key, None)
return ToolResult(success=True, data=deleted)

View file

@ -0,0 +1,128 @@
"""
Librarian tools - exist-db XML database integration.
Provides XQuery-based document storage and retrieval.
"""
from typing import Optional, Dict, List
from .base import tool, ToolResult
# TODO: Configure exist-db connection
EXISTDB_URL = "http://localhost:8080/exist/rest"
EXISTDB_USER = "admin"
EXISTDB_PASS = "" # Configure via env
@tool
async def librarian_store(
collection: str,
document_name: str,
content: str,
) -> ToolResult:
"""
Store an XML document in exist-db.
Args:
collection: Target collection path (e.g., "/db/agents/greeter")
document_name: Document filename (e.g., "conversation-001.xml")
content: XML content
Returns:
path: Full path to stored document
"""
# TODO: Implement with exist-db REST API
# import aiohttp
# url = f"{EXISTDB_URL}{collection}/{document_name}"
# async with aiohttp.ClientSession() as session:
# async with session.put(
# url,
# data=content,
# headers={"Content-Type": "application/xml"},
# auth=aiohttp.BasicAuth(EXISTDB_USER, EXISTDB_PASS),
# ) as resp:
# if resp.status in (200, 201):
# return ToolResult(success=True, data={"path": f"{collection}/{document_name}"})
# return ToolResult(success=False, error=await resp.text())
return ToolResult(success=False, error="Not implemented - configure exist-db")
@tool
async def librarian_get(
path: str,
) -> ToolResult:
"""
Retrieve a document by path.
Args:
path: Full document path (e.g., "/db/agents/greeter/conversation-001.xml")
Returns:
content: XML content
"""
# TODO: Implement with exist-db REST API
# import aiohttp
# url = f"{EXISTDB_URL}{path}"
# async with aiohttp.ClientSession() as session:
# async with session.get(
# url,
# auth=aiohttp.BasicAuth(EXISTDB_USER, EXISTDB_PASS),
# ) as resp:
# if resp.status == 200:
# return ToolResult(success=True, data=await resp.text())
# return ToolResult(success=False, error=f"Not found: {path}")
return ToolResult(success=False, error="Not implemented - configure exist-db")
@tool
async def librarian_query(
query: str,
collection: Optional[str] = None,
variables: Optional[Dict[str, str]] = None,
) -> ToolResult:
"""
Execute an XQuery against exist-db.
Args:
query: XQuery expression
collection: Limit to collection (optional)
variables: External variables to bind (optional)
Returns:
results: Array of matching XML fragments
Examples:
- '//message[@from="greeter"]'
- 'for $m in //message where $m/@timestamp > $since return $m'
"""
# TODO: Implement with exist-db REST API
# The exist-db REST API accepts XQuery via POST to /exist/rest/db
# with _query parameter or as request body
return ToolResult(success=False, error="Not implemented - configure exist-db")
@tool
async def librarian_search(
query: str,
collection: Optional[str] = None,
num_results: int = 10,
) -> ToolResult:
"""
Full-text search across documents.
Args:
query: Search terms
collection: Limit to collection (optional)
num_results: Max results (default: 10)
Returns:
results: Array of {path, score, snippet}
"""
# TODO: Implement with exist-db full-text search
# exist-db supports Lucene-based full-text indexing
# Query using ft:query() function in XQuery
return ToolResult(success=False, error="Not implemented - configure exist-db")

View file

@ -0,0 +1,36 @@
"""
Search tool - web search.
"""
from .base import tool, ToolResult
@tool
async def web_search(
query: str,
num_results: int = 5,
) -> ToolResult:
"""
Search the web.
Args:
query: Search query
num_results: Number of results (default: 5, max: 20)
Returns:
Array of {title, url, snippet}
Implementation options:
- SerpAPI
- Google Custom Search
- Bing Search API
- DuckDuckGo (scraping)
"""
# TODO: Implement with search provider
# Options:
# 1. SerpAPI (paid, reliable)
# 2. Google Custom Search API (limited free tier)
# 3. Bing Search API (Azure)
# 4. DuckDuckGo scraping (free, fragile)
return ToolResult(success=False, error="Not implemented - configure search provider")

View file

@ -0,0 +1,61 @@
"""
Shell tool - sandboxed command execution.
"""
from typing import Optional
from .base import tool, ToolResult
# TODO: Configure command restrictions
ALLOWED_COMMANDS: list = [] # Empty = allow all (dangerous!)
BLOCKED_COMMANDS: list = ["rm", "del", "format", "mkfs", "dd"]
@tool
async def run_command(
command: str,
timeout: int = 30,
cwd: Optional[str] = None,
) -> ToolResult:
"""
Execute a shell command (sandboxed).
Args:
command: Command to execute
timeout: Timeout in seconds (default: 30)
cwd: Working directory
Returns:
exit_code, stdout, stderr
Security:
- Command allowlist (or blocklist dangerous commands)
- No shell expansion by default
- Resource limits (CPU, memory)
- Chroot to safe directory
- Timeout enforced
"""
# TODO: Implement with asyncio.subprocess
# import asyncio
# try:
# proc = await asyncio.create_subprocess_shell(
# command,
# stdout=asyncio.subprocess.PIPE,
# stderr=asyncio.subprocess.PIPE,
# cwd=cwd,
# )
# try:
# stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
# except asyncio.TimeoutError:
# proc.kill()
# return ToolResult(success=False, error=f"Command timed out after {timeout}s")
#
# return ToolResult(success=True, data={
# "exit_code": proc.returncode,
# "stdout": stdout.decode(),
# "stderr": stderr.decode(),
# })
# except Exception as e:
# return ToolResult(success=False, error=str(e))
return ToolResult(success=False, error="Not implemented")