""" Permission gate — Per-listener tool allowlist enforcement. In container mode, handlers get NO tools unless explicitly declared in their listener config via `allowed_tools`. In development mode, all tools are available to all handlers (current behavior). """ from __future__ import annotations import logging from typing import Optional from xml_pipeline.tools.base import ToolResult logger = logging.getLogger(__name__) # Module state _gate_enabled: bool = False _listener_allowlists: dict[str, set[str]] = {} def enable_permission_gate() -> None: """Enable permission gate (tools require explicit allowlist).""" global _gate_enabled _gate_enabled = True def disable_permission_gate() -> None: """Disable permission gate (all tools available).""" global _gate_enabled _gate_enabled = False def is_gate_enabled() -> bool: """Check if permission gate is active.""" return _gate_enabled def register_listener_tools(listener_name: str, allowed_tools: list[str]) -> None: """ Register the tool allowlist for a listener. Args: listener_name: The listener's registered name allowed_tools: List of tool names this listener may invoke """ _listener_allowlists[listener_name] = set(allowed_tools) if allowed_tools: logger.debug(f"Listener '{listener_name}' tools: {allowed_tools}") def check_permission(listener_name: str, tool_name: str) -> Optional[ToolResult]: """ Check if a listener is allowed to invoke a tool. Returns None if allowed, or a ToolResult error if denied. """ if not _gate_enabled: return None allowed = _listener_allowlists.get(listener_name) if allowed is None: # No allowlist registered — deny by default in container mode logger.warning( f"Permission denied: listener '{listener_name}' has no tool allowlist, " f"attempted to use '{tool_name}'" ) return ToolResult( success=False, error="Tool access denied. No tools are configured for this listener.", ) if tool_name not in allowed: logger.warning( f"Permission denied: listener '{listener_name}' " f"not allowed to use tool '{tool_name}'" ) return ToolResult( success=False, error=f"Tool '{tool_name}' is not in the allowed tools for this listener.", ) return None def reset() -> None: """Reset permission gate state (for testing).""" global _gate_enabled _gate_enabled = False _listener_allowlists.clear()