diff --git a/bloxserver/domain/__init__.py b/bloxserver/domain/__init__.py new file mode 100644 index 0000000..af2e5af --- /dev/null +++ b/bloxserver/domain/__init__.py @@ -0,0 +1,53 @@ +""" +BloxServer Domain Model. + +This module contains the core domain classes that represent flows (teams), +nodes (agents/tools), edges (connections), and triggers. + +The domain model serves as the bridge between: +- Frontend canvas JSON (React Flow) +- Database storage (FlowRecord) +- Execution engine (organism.yaml) +""" + +from bloxserver.domain.nodes import ( + Node, + NodeType, + AgentNode, + ToolNode, + GatewayNode, +) +from bloxserver.domain.edges import Edge, EdgeCondition +from bloxserver.domain.triggers import ( + Trigger, + TriggerType, + TriggerConfig, + create_webhook_trigger, + create_schedule_trigger, + create_manual_trigger, +) +from bloxserver.domain.flow import Flow, FlowSettings, LLMSettings, ValidationError + +__all__ = [ + # Nodes + "Node", + "NodeType", + "AgentNode", + "ToolNode", + "GatewayNode", + # Edges + "Edge", + "EdgeCondition", + # Triggers + "Trigger", + "TriggerType", + "TriggerConfig", + "create_webhook_trigger", + "create_schedule_trigger", + "create_manual_trigger", + # Flow + "Flow", + "FlowSettings", + "LLMSettings", + "ValidationError", +] diff --git a/bloxserver/domain/edges.py b/bloxserver/domain/edges.py new file mode 100644 index 0000000..a914c09 --- /dev/null +++ b/bloxserver/domain/edges.py @@ -0,0 +1,220 @@ +""" +Edge model for connections between nodes. + +Edges define how messages flow between nodes in a flow. +They can optionally have conditions for conditional routing. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any +from uuid import UUID, uuid4 + + +class EdgeCondition(str, Enum): + """Types of edge conditions.""" + + ALWAYS = "always" # Always route (default) + ON_SUCCESS = "on_success" # Route only if source succeeds + ON_ERROR = "on_error" # Route only if source fails + CONDITIONAL = "conditional" # Custom condition expression + + +@dataclass +class Edge: + """ + Connection between two nodes. + + Edges define the flow of messages between nodes. In organism.yaml terms, + they define the `peers` list for agents and the routing paths. + """ + + id: UUID + source_node_id: UUID + target_node_id: UUID + + # Optional label for UI + label: str | None = None + + # Condition for when this edge is followed + condition: EdgeCondition = EdgeCondition.ALWAYS + + # Custom condition expression (when condition == CONDITIONAL) + # Example: "payload.status == 'approved'" + condition_expression: str | None = None + + # Visual properties for canvas + source_handle: str | None = None # Which output port + target_handle: str | None = None # Which input port + animated: bool = False + edge_type: str = "default" # default, smoothstep, step, straight + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization (React Flow format).""" + return { + "id": str(self.id), + "source": str(self.source_node_id), + "target": str(self.target_node_id), + "label": self.label, + "data": { + "condition": self.condition.value, + "conditionExpression": self.condition_expression, + }, + "sourceHandle": self.source_handle, + "targetHandle": self.target_handle, + "animated": self.animated, + "type": self.edge_type, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Edge: + """Create edge from dictionary (React Flow format).""" + edge_data = data.get("data", {}) + + # Handle condition + condition_str = edge_data.get("condition", "always") + try: + condition = EdgeCondition(condition_str) + except ValueError: + condition = EdgeCondition.ALWAYS + + return cls( + id=UUID(data["id"]) if isinstance(data.get("id"), str) else data.get("id", uuid4()), + source_node_id=UUID(data["source"]) if isinstance(data.get("source"), str) else data["source"], + target_node_id=UUID(data["target"]) if isinstance(data.get("target"), str) else data["target"], + label=data.get("label"), + condition=condition, + condition_expression=edge_data.get("conditionExpression") or edge_data.get("condition_expression"), + source_handle=data.get("sourceHandle") or data.get("source_handle"), + target_handle=data.get("targetHandle") or data.get("target_handle"), + animated=data.get("animated", False), + edge_type=data.get("type", "default"), + ) + + +def compute_peers( + node_id: UUID, + edges: list[Edge], + node_map: dict[UUID, str], +) -> list[str]: + """ + Compute the peers list for a node based on outgoing edges. + + Args: + node_id: The node to compute peers for. + edges: All edges in the flow. + node_map: Mapping of node IDs to node names. + + Returns: + List of peer names (target nodes this node can send to). + """ + peers: list[str] = [] + + for edge in edges: + if edge.source_node_id == node_id: + target_name = node_map.get(edge.target_node_id) + if target_name and target_name not in peers: + peers.append(target_name) + + return peers + + +def compute_incoming( + node_id: UUID, + edges: list[Edge], + node_map: dict[UUID, str], +) -> list[str]: + """ + Compute the list of nodes that can send to this node. + + Args: + node_id: The target node. + edges: All edges in the flow. + node_map: Mapping of node IDs to node names. + + Returns: + List of source node names. + """ + incoming: list[str] = [] + + for edge in edges: + if edge.target_node_id == node_id: + source_name = node_map.get(edge.source_node_id) + if source_name and source_name not in incoming: + incoming.append(source_name) + + return incoming + + +def find_entry_nodes( + nodes: list[UUID], + edges: list[Edge], +) -> list[UUID]: + """ + Find nodes with no incoming edges (entry points). + + These are typically where external triggers connect. + """ + nodes_with_incoming = {edge.target_node_id for edge in edges} + return [node_id for node_id in nodes if node_id not in nodes_with_incoming] + + +def find_exit_nodes( + nodes: list[UUID], + edges: list[Edge], +) -> list[UUID]: + """ + Find nodes with no outgoing edges (exit points). + + These are typically terminal handlers or response nodes. + """ + nodes_with_outgoing = {edge.source_node_id for edge in edges} + return [node_id for node_id in nodes if node_id not in nodes_with_outgoing] + + +def detect_cycles( + nodes: list[UUID], + edges: list[Edge], +) -> list[list[UUID]]: + """ + Detect cycles in the flow graph. + + Returns a list of cycles (each cycle is a list of node IDs). + Cycles are allowed (agents can self-loop) but should be flagged for review. + """ + cycles: list[list[UUID]] = [] + + # Build adjacency list + adj: dict[UUID, list[UUID]] = {node_id: [] for node_id in nodes} + for edge in edges: + if edge.source_node_id in adj: + adj[edge.source_node_id].append(edge.target_node_id) + + # DFS for cycle detection + visited: set[UUID] = set() + rec_stack: set[UUID] = set() + path: list[UUID] = [] + + def dfs(node: UUID) -> None: + visited.add(node) + rec_stack.add(node) + path.append(node) + + for neighbor in adj.get(node, []): + if neighbor not in visited: + dfs(neighbor) + elif neighbor in rec_stack: + # Found a cycle + cycle_start = path.index(neighbor) + cycles.append(path[cycle_start:] + [neighbor]) + + path.pop() + rec_stack.remove(node) + + for node_id in nodes: + if node_id not in visited: + dfs(node_id) + + return cycles diff --git a/bloxserver/domain/flow.py b/bloxserver/domain/flow.py new file mode 100644 index 0000000..a79fdb6 --- /dev/null +++ b/bloxserver/domain/flow.py @@ -0,0 +1,559 @@ +""" +Flow domain model - the central aggregation point. + +A Flow (also called a Team) is the complete definition of an agent workflow: +- Nodes (agents, tools, gateways) +- Edges (connections between nodes) +- Triggers (how the flow is started) +- Settings (LLM config, rate limits, etc.) + +The Flow class provides: +- to_organism_yaml(): Convert to xml-pipeline organism configuration +- to_canvas_json(): Convert to frontend canvas format +- from_canvas_json(): Parse from frontend canvas format +- validate(): Check for errors before execution +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any +from uuid import UUID, uuid4 + +import yaml + +from bloxserver.domain.nodes import ( + Node, + NodeType, + AgentNode, + ToolNode, + GatewayNode, +) +from bloxserver.domain.edges import Edge, compute_peers, find_entry_nodes, detect_cycles +from bloxserver.domain.triggers import Trigger, TriggerType + + +# ============================================================================= +# Validation +# ============================================================================= + + +@dataclass +class ValidationError: + """A validation error in a flow.""" + + code: str + message: str + node_id: UUID | None = None + edge_id: UUID | None = None + trigger_id: UUID | None = None + severity: str = "error" # error, warning, info + + def to_dict(self) -> dict[str, Any]: + return { + "code": self.code, + "message": self.message, + "nodeId": str(self.node_id) if self.node_id else None, + "edgeId": str(self.edge_id) if self.edge_id else None, + "triggerId": str(self.trigger_id) if self.trigger_id else None, + "severity": self.severity, + } + + +# ============================================================================= +# Flow Settings +# ============================================================================= + + +@dataclass +class LLMSettings: + """LLM configuration for the flow.""" + + # Default model for agents without explicit model + default_model: str = "grok-4.1" + + # Strategy for backend selection + strategy: str = "failover" + + # Rate limits (override org-level limits) + max_tokens_per_minute: int | None = None + max_requests_per_minute: int | None = None + + # Retry settings + retries: int = 3 + retry_base_delay: float = 1.0 + + def to_dict(self) -> dict[str, Any]: + return { + "defaultModel": self.default_model, + "strategy": self.strategy, + "maxTokensPerMinute": self.max_tokens_per_minute, + "maxRequestsPerMinute": self.max_requests_per_minute, + "retries": self.retries, + "retryBaseDelay": self.retry_base_delay, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> LLMSettings: + return cls( + default_model=data.get("defaultModel") or data.get("default_model", "grok-4.1"), + strategy=data.get("strategy", "failover"), + max_tokens_per_minute=data.get("maxTokensPerMinute") or data.get("max_tokens_per_minute"), + max_requests_per_minute=data.get("maxRequestsPerMinute") or data.get("max_requests_per_minute"), + retries=data.get("retries", 3), + retry_base_delay=data.get("retryBaseDelay") or data.get("retry_base_delay", 1.0), + ) + + +@dataclass +class FlowSettings: + """Settings for a flow.""" + + # LLM configuration + llm: LLMSettings = field(default_factory=LLMSettings) + + # Execution settings + timeout_seconds: int = 300 # Max execution time + max_iterations: int = 100 # Max message loops (prevent infinite recursion) + + # Logging + log_level: str = "info" + log_payloads: bool = False # Log full message payloads + + # Thread settings + thread_ttl_seconds: int = 86400 # 24 hours + + def to_dict(self) -> dict[str, Any]: + return { + "llm": self.llm.to_dict(), + "timeoutSeconds": self.timeout_seconds, + "maxIterations": self.max_iterations, + "logLevel": self.log_level, + "logPayloads": self.log_payloads, + "threadTtlSeconds": self.thread_ttl_seconds, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> FlowSettings: + llm_data = data.get("llm", {}) + return cls( + llm=LLMSettings.from_dict(llm_data) if llm_data else LLMSettings(), + timeout_seconds=data.get("timeoutSeconds") or data.get("timeout_seconds", 300), + max_iterations=data.get("maxIterations") or data.get("max_iterations", 100), + log_level=data.get("logLevel") or data.get("log_level", "info"), + log_payloads=data.get("logPayloads") or data.get("log_payloads", False), + thread_ttl_seconds=data.get("threadTtlSeconds") or data.get("thread_ttl_seconds", 86400), + ) + + +# ============================================================================= +# Flow +# ============================================================================= + + +@dataclass +class Flow: + """ + The complete definition of an agent workflow. + + This is the domain model that bridges: + - Frontend canvas JSON + - Database storage + - Organism YAML for execution + """ + + # Identity + id: UUID + name: str + description: str = "" + + # Owner + owner_id: UUID | None = None + + # Components + nodes: list[Node] = field(default_factory=list) + edges: list[Edge] = field(default_factory=list) + triggers: list[Trigger] = field(default_factory=list) + + # Configuration + settings: FlowSettings = field(default_factory=FlowSettings) + + # State + is_active: bool = False + version: int = 1 + + # Metadata + created_at: datetime | None = None + updated_at: datetime | None = None + + # ========================================================================== + # Node Helpers + # ========================================================================== + + def get_node(self, node_id: UUID) -> Node | None: + """Get a node by ID.""" + for node in self.nodes: + if node.id == node_id: + return node + return None + + def get_node_by_name(self, name: str) -> Node | None: + """Get a node by name.""" + for node in self.nodes: + if node.name == name: + return node + return None + + def get_agents(self) -> list[AgentNode]: + """Get all agent nodes.""" + return [n for n in self.nodes if isinstance(n, AgentNode)] + + def get_tools(self) -> list[ToolNode]: + """Get all tool nodes.""" + return [n for n in self.nodes if isinstance(n, ToolNode)] + + def get_gateways(self) -> list[GatewayNode]: + """Get all gateway nodes.""" + return [n for n in self.nodes if isinstance(n, GatewayNode)] + + def _build_node_map(self) -> dict[UUID, str]: + """Build a mapping of node IDs to names.""" + return {node.id: node.name for node in self.nodes} + + # ========================================================================== + # Validation + # ========================================================================== + + def validate(self) -> list[ValidationError]: + """ + Validate the flow for errors. + + Returns a list of validation errors (empty if valid). + """ + errors: list[ValidationError] = [] + node_map = self._build_node_map() + node_ids = set(node_map.keys()) + + # Check for unique node names + names_seen: set[str] = set() + for node in self.nodes: + if node.name in names_seen: + errors.append(ValidationError( + code="duplicate_node_name", + message=f"Duplicate node name: {node.name}", + node_id=node.id, + )) + names_seen.add(node.name) + + # Check for valid node names (no spaces, special chars) + for node in self.nodes: + if not node.name or not node.name.replace("_", "").replace("-", "").isalnum(): + errors.append(ValidationError( + code="invalid_node_name", + message=f"Invalid node name: {node.name}. Use alphanumeric, underscore, or hyphen.", + node_id=node.id, + )) + + # Check agents have prompts + for agent in self.get_agents(): + if not agent.prompt or not agent.prompt.strip(): + errors.append(ValidationError( + code="missing_agent_prompt", + message=f"Agent '{agent.name}' has no prompt", + node_id=agent.id, + )) + + # Check custom tools have handler paths + for tool in self.get_tools(): + from bloxserver.domain.nodes import ToolType, BUILTIN_TOOLS + if tool.tool_type == ToolType.CUSTOM: + if not tool.handler_path or not tool.payload_class_path: + errors.append(ValidationError( + code="missing_custom_tool_paths", + message=f"Custom tool '{tool.name}' requires handler_path and payload_class_path", + node_id=tool.id, + )) + + # Check gateways have required config + for gateway in self.get_gateways(): + if not gateway.remote_url and not gateway.api_endpoint: + errors.append(ValidationError( + code="invalid_gateway_config", + message=f"Gateway '{gateway.name}' requires remote_url or api_endpoint", + node_id=gateway.id, + )) + + # Check edges reference valid nodes + for edge in self.edges: + if edge.source_node_id not in node_ids: + errors.append(ValidationError( + code="invalid_edge_source", + message=f"Edge source node not found: {edge.source_node_id}", + edge_id=edge.id, + )) + if edge.target_node_id not in node_ids: + errors.append(ValidationError( + code="invalid_edge_target", + message=f"Edge target node not found: {edge.target_node_id}", + edge_id=edge.id, + )) + + # Check triggers reference valid nodes + for trigger in self.triggers: + for target_id in trigger.target_node_ids: + if target_id not in node_ids: + errors.append(ValidationError( + code="invalid_trigger_target", + message=f"Trigger '{trigger.name}' targets unknown node: {target_id}", + trigger_id=trigger.id, + )) + + # Check for entry points (nodes reachable from triggers) + entry_nodes = find_entry_nodes(list(node_ids), self.edges) + trigger_targets = set() + for trigger in self.triggers: + trigger_targets.update(trigger.target_node_ids) + + orphan_entries = [nid for nid in entry_nodes if nid not in trigger_targets] + for nid in orphan_entries: + node = self.get_node(nid) + if node: + errors.append(ValidationError( + code="unreachable_entry_node", + message=f"Node '{node.name}' has no incoming edges and no trigger", + node_id=nid, + severity="warning", + )) + + # Check for cycles (warning, not error - agents can self-loop) + cycles = detect_cycles(list(node_ids), self.edges) + for cycle in cycles: + cycle_names = [node_map.get(nid, str(nid)) for nid in cycle] + errors.append(ValidationError( + code="cycle_detected", + message=f"Cycle detected: {' -> '.join(cycle_names)}", + severity="info", + )) + + return errors + + # ========================================================================== + # Serialization: Canvas JSON + # ========================================================================== + + def to_canvas_json(self) -> dict[str, Any]: + """ + Convert to React Flow canvas format. + + Returns a dict with 'nodes', 'edges', 'triggers', 'settings'. + """ + return { + "id": str(self.id), + "name": self.name, + "description": self.description, + "nodes": [node.to_dict() for node in self.nodes], + "edges": [edge.to_dict() for edge in self.edges], + "triggers": [trigger.to_dict() for trigger in self.triggers], + "settings": self.settings.to_dict(), + "isActive": self.is_active, + "version": self.version, + } + + @classmethod + def from_canvas_json( + cls, + data: dict[str, Any], + flow_id: UUID | None = None, + owner_id: UUID | None = None, + ) -> Flow: + """ + Create a Flow from React Flow canvas JSON. + + Args: + data: Canvas JSON with nodes, edges, triggers, settings. + flow_id: Override the flow ID (for updates). + owner_id: Owner user ID. + """ + # Parse nodes + nodes: list[Node] = [] + for node_data in data.get("nodes", []): + nodes.append(Node.from_dict(node_data)) + + # Parse edges + edges: list[Edge] = [] + for edge_data in data.get("edges", []): + edges.append(Edge.from_dict(edge_data)) + + # Parse triggers + triggers: list[Trigger] = [] + for trigger_data in data.get("triggers", []): + triggers.append(Trigger.from_dict(trigger_data)) + + # Parse settings + settings_data = data.get("settings", {}) + settings = FlowSettings.from_dict(settings_data) if settings_data else FlowSettings() + + # Determine flow ID + if flow_id: + fid = flow_id + elif "id" in data: + fid = UUID(data["id"]) if isinstance(data["id"], str) else data["id"] + else: + fid = uuid4() + + return cls( + id=fid, + name=data.get("name", "Untitled Flow"), + description=data.get("description", ""), + owner_id=owner_id, + nodes=nodes, + edges=edges, + triggers=triggers, + settings=settings, + is_active=data.get("isActive") or data.get("is_active", False), + version=data.get("version", 1), + ) + + # ========================================================================== + # Serialization: Organism YAML + # ========================================================================== + + def to_organism_yaml(self, port: int = 0) -> str: + """ + Convert to xml-pipeline organism.yaml format. + + Args: + port: Port for the organism (0 = auto-assign). + + Returns: + YAML string ready for the xml-pipeline runner. + """ + node_map = self._build_node_map() + + # Build listeners list + listeners: list[dict[str, Any]] = [] + gateways: list[dict[str, Any]] = [] + + for node in self.nodes: + if isinstance(node, AgentNode): + peers = compute_peers(node.id, self.edges, node_map) + listeners.append(node.to_listener_config(peers)) + + elif isinstance(node, ToolNode): + listeners.append(node.to_listener_config()) + + elif isinstance(node, GatewayNode): + config = node.to_listener_config() + if node.remote_url: + # Federation gateway goes in gateways section + gateways.append(config) + else: + # REST gateway is a listener + listeners.append(config) + + # Build organism config + organism_config: dict[str, Any] = { + "organism": { + "name": f"flow-{self.id}", + "port": port, + }, + "listeners": listeners, + } + + # Add gateways if any + if gateways: + organism_config["gateways"] = gateways + + # Add LLM config + organism_config["llm"] = { + "strategy": self.settings.llm.strategy, + "retries": self.settings.llm.retries, + "retry_base_delay": self.settings.llm.retry_base_delay, + } + + # Add execution settings as metadata + organism_config["execution"] = { + "timeout_seconds": self.settings.timeout_seconds, + "max_iterations": self.settings.max_iterations, + "log_level": self.settings.log_level, + "log_payloads": self.settings.log_payloads, + } + + # Add trigger metadata for runtime + if self.triggers: + organism_config["triggers"] = [ + { + "id": str(t.id), + "name": t.name, + "type": t.trigger_type.value, + "target_nodes": [node_map.get(nid, str(nid)) for nid in t.target_node_ids], + "enabled": t.enabled, + } + for t in self.triggers + ] + + return yaml.dump( + organism_config, + default_flow_style=False, + sort_keys=False, + allow_unicode=True, + ) + + # ========================================================================== + # Database Serialization + # ========================================================================== + + def to_db_dict(self) -> dict[str, Any]: + """ + Convert to dictionary for database storage. + + This is stored in FlowRecord.canvas_data as JSON. + """ + return { + "nodes": [node.to_dict() for node in self.nodes], + "edges": [edge.to_dict() for edge in self.edges], + "triggers": [trigger.to_dict() for trigger in self.triggers], + "settings": self.settings.to_dict(), + } + + @classmethod + def from_db_record(cls, record: Any) -> Flow: + """ + Create a Flow from a database FlowRecord. + + Args: + record: FlowRecord ORM instance. + """ + canvas_data = record.canvas_data or {} + + # Parse components from canvas_data + nodes: list[Node] = [] + for node_data in canvas_data.get("nodes", []): + nodes.append(Node.from_dict(node_data)) + + edges: list[Edge] = [] + for edge_data in canvas_data.get("edges", []): + edges.append(Edge.from_dict(edge_data)) + + triggers: list[Trigger] = [] + for trigger_data in canvas_data.get("triggers", []): + triggers.append(Trigger.from_dict(trigger_data)) + + settings_data = canvas_data.get("settings", {}) + settings = FlowSettings.from_dict(settings_data) if settings_data else FlowSettings() + + return cls( + id=record.id, + name=record.name, + description=record.description or "", + owner_id=record.owner_id, + nodes=nodes, + edges=edges, + triggers=triggers, + settings=settings, + is_active=record.status.value == "active" if hasattr(record.status, "value") else record.status == "active", + version=record.version, + created_at=record.created_at, + updated_at=record.updated_at, + ) diff --git a/bloxserver/domain/nodes.py b/bloxserver/domain/nodes.py new file mode 100644 index 0000000..669d8d0 --- /dev/null +++ b/bloxserver/domain/nodes.py @@ -0,0 +1,412 @@ +""" +Node types for BloxServer flows. + +Nodes are the building blocks of flows: +- AgentNode: LLM-powered agents with prompts and reasoning +- ToolNode: Built-in tools (calculate, fetch, shell, etc.) +- GatewayNode: External APIs or federated organisms +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any +from uuid import UUID, uuid4 + + +class NodeType(str, Enum): + """Types of nodes in a flow.""" + + AGENT = "agent" + TOOL = "tool" + GATEWAY = "gateway" + + +# ============================================================================= +# Built-in Tool Types +# ============================================================================= + + +class ToolType(str, Enum): + """Built-in tool types available in BloxServer.""" + + CALCULATE = "calculate" + FETCH = "fetch" + READ_FILE = "read_file" + WRITE_FILE = "write_file" + LIST_DIR = "list_dir" + SHELL = "shell" + WEB_SEARCH = "web_search" + KEY_VALUE = "key_value" + SEND_EMAIL = "send_email" + WEBHOOK = "webhook" + LIBRARIAN = "librarian" + CUSTOM = "custom" + + +# ============================================================================= +# Base Node +# ============================================================================= + + +@dataclass +class Node: + """Base class for all node types.""" + + id: UUID + name: str + description: str + node_type: NodeType + + # Canvas position (for frontend rendering) + position_x: float = 0.0 + position_y: float = 0.0 + + # Metadata for UI + color: str | None = None + icon: str | None = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + return { + "id": str(self.id), + "name": self.name, + "description": self.description, + "nodeType": self.node_type.value, + "position": {"x": self.position_x, "y": self.position_y}, + "color": self.color, + "icon": self.icon, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Node: + """Create node from dictionary. Dispatches to appropriate subclass.""" + node_type = NodeType(data.get("nodeType", data.get("node_type"))) + + if node_type == NodeType.AGENT: + return AgentNode.from_dict(data) + elif node_type == NodeType.TOOL: + return ToolNode.from_dict(data) + elif node_type == NodeType.GATEWAY: + return GatewayNode.from_dict(data) + else: + raise ValueError(f"Unknown node type: {node_type}") + + +# ============================================================================= +# Agent Node +# ============================================================================= + + +@dataclass +class AgentNode(Node): + """ + LLM-powered agent node. + + Agents have prompts, can reason, and communicate with other nodes. + They are the "brains" of a flow. + """ + + node_type: NodeType = field(default=NodeType.AGENT, init=False) + + # Agent configuration + prompt: str = "" + model: str | None = None # None = use default from org settings + + # Advanced settings + temperature: float = 0.7 + max_tokens: int | None = None + system_prompt_append: str | None = None # Extra instructions + + # Handler paths (auto-generated if not specified) + handler_path: str | None = None + payload_class_path: str | None = None + + def to_dict(self) -> dict[str, Any]: + base = super().to_dict() + base.update({ + "prompt": self.prompt, + "model": self.model, + "temperature": self.temperature, + "maxTokens": self.max_tokens, + "systemPromptAppend": self.system_prompt_append, + "handlerPath": self.handler_path, + "payloadClassPath": self.payload_class_path, + }) + return base + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> AgentNode: + position = data.get("position", {}) + return cls( + id=UUID(data["id"]) if isinstance(data.get("id"), str) else data.get("id", uuid4()), + name=data["name"], + description=data.get("description", ""), + position_x=position.get("x", 0.0), + position_y=position.get("y", 0.0), + color=data.get("color"), + icon=data.get("icon"), + prompt=data.get("prompt", ""), + model=data.get("model"), + temperature=data.get("temperature", 0.7), + max_tokens=data.get("maxTokens") or data.get("max_tokens"), + system_prompt_append=data.get("systemPromptAppend") or data.get("system_prompt_append"), + handler_path=data.get("handlerPath") or data.get("handler_path"), + payload_class_path=data.get("payloadClassPath") or data.get("payload_class_path"), + ) + + def to_listener_config(self, peers: list[str]) -> dict[str, Any]: + """ + Convert to organism.yaml listener configuration. + + Args: + peers: List of peer names derived from outgoing edges. + """ + config: dict[str, Any] = { + "name": self.name, + "description": self.description, + "agent": True, + "prompt": self.prompt, + } + + if peers: + config["peers"] = peers + + if self.handler_path: + config["handler"] = self.handler_path + if self.payload_class_path: + config["payload_class"] = self.payload_class_path + + # LLM settings as metadata (for future use) + if self.model: + config["model"] = self.model + if self.temperature != 0.7: + config["temperature"] = self.temperature + if self.max_tokens: + config["max_tokens"] = self.max_tokens + + return config + + +# ============================================================================= +# Tool Node +# ============================================================================= + + +# Tool handler/payload mappings for built-in tools +BUILTIN_TOOLS: dict[ToolType, dict[str, str]] = { + ToolType.CALCULATE: { + "handler": "xml_pipeline.tools.calculate.handle_calculate", + "payload_class": "xml_pipeline.tools.calculate.CalculatePayload", + }, + ToolType.FETCH: { + "handler": "xml_pipeline.tools.fetch.handle_fetch", + "payload_class": "xml_pipeline.tools.fetch.FetchPayload", + }, + ToolType.READ_FILE: { + "handler": "xml_pipeline.tools.files.handle_read", + "payload_class": "xml_pipeline.tools.files.ReadFilePayload", + }, + ToolType.WRITE_FILE: { + "handler": "xml_pipeline.tools.files.handle_write", + "payload_class": "xml_pipeline.tools.files.WriteFilePayload", + }, + ToolType.LIST_DIR: { + "handler": "xml_pipeline.tools.files.handle_list", + "payload_class": "xml_pipeline.tools.files.ListDirPayload", + }, + ToolType.SHELL: { + "handler": "xml_pipeline.tools.shell.handle_shell", + "payload_class": "xml_pipeline.tools.shell.ShellPayload", + }, + ToolType.WEB_SEARCH: { + "handler": "xml_pipeline.tools.search.handle_search", + "payload_class": "xml_pipeline.tools.search.SearchPayload", + }, + ToolType.KEY_VALUE: { + "handler": "xml_pipeline.tools.keyvalue.handle_keyvalue", + "payload_class": "xml_pipeline.tools.keyvalue.KeyValuePayload", + }, + ToolType.LIBRARIAN: { + "handler": "xml_pipeline.tools.librarian.handle_librarian", + "payload_class": "xml_pipeline.tools.librarian.LibrarianPayload", + }, +} + + +@dataclass +class ToolNode(Node): + """ + Built-in or custom tool node. + + Tools are stateless functions that perform specific operations + like calculations, HTTP requests, file operations, etc. + """ + + node_type: NodeType = field(default=NodeType.TOOL, init=False) + + # Tool configuration + tool_type: ToolType = ToolType.CUSTOM + config: dict[str, Any] = field(default_factory=dict) + + # Custom tool paths (only for CUSTOM type) + handler_path: str | None = None + payload_class_path: str | None = None + + def to_dict(self) -> dict[str, Any]: + base = super().to_dict() + base.update({ + "toolType": self.tool_type.value, + "config": self.config, + "handlerPath": self.handler_path, + "payloadClassPath": self.payload_class_path, + }) + return base + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ToolNode: + position = data.get("position", {}) + tool_type_str = data.get("toolType") or data.get("tool_type", "custom") + return cls( + id=UUID(data["id"]) if isinstance(data.get("id"), str) else data.get("id", uuid4()), + name=data["name"], + description=data.get("description", ""), + position_x=position.get("x", 0.0), + position_y=position.get("y", 0.0), + color=data.get("color"), + icon=data.get("icon"), + tool_type=ToolType(tool_type_str), + config=data.get("config", {}), + handler_path=data.get("handlerPath") or data.get("handler_path"), + payload_class_path=data.get("payloadClassPath") or data.get("payload_class_path"), + ) + + def to_listener_config(self) -> dict[str, Any]: + """Convert to organism.yaml listener configuration.""" + # Get handler/payload from built-in mapping or custom paths + if self.tool_type in BUILTIN_TOOLS: + tool_info = BUILTIN_TOOLS[self.tool_type] + handler = tool_info["handler"] + payload_class = tool_info["payload_class"] + else: + handler = self.handler_path + payload_class = self.payload_class_path + + if not handler or not payload_class: + raise ValueError( + f"Custom tool '{self.name}' requires handler_path and payload_class_path" + ) + + config: dict[str, Any] = { + "name": self.name, + "description": self.description, + "handler": handler, + "payload_class": payload_class, + } + + # Include tool-specific config if present + if self.config: + config["tool_config"] = self.config + + return config + + +# ============================================================================= +# Gateway Node +# ============================================================================= + + +@dataclass +class GatewayNode(Node): + """ + External API or federated organism gateway. + + Gateways connect flows to: + - External REST APIs + - Federated xml-pipeline organisms + - Third-party services (Slack, Discord, etc.) + """ + + node_type: NodeType = field(default=NodeType.GATEWAY, init=False) + + # Federation (other xml-pipeline organisms) + remote_url: str | None = None + trusted_identity: str | None = None # Path to public key + + # REST API gateway + api_endpoint: str | None = None + api_method: str = "POST" + api_headers: dict[str, str] = field(default_factory=dict) + api_auth_type: str | None = None # bearer, api_key, basic + api_auth_env_var: str | None = None # Env var containing the secret + + # Response mapping + response_mapping: dict[str, str] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + base = super().to_dict() + base.update({ + "remoteUrl": self.remote_url, + "trustedIdentity": self.trusted_identity, + "apiEndpoint": self.api_endpoint, + "apiMethod": self.api_method, + "apiHeaders": self.api_headers, + "apiAuthType": self.api_auth_type, + "apiAuthEnvVar": self.api_auth_env_var, + "responseMapping": self.response_mapping, + }) + return base + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> GatewayNode: + position = data.get("position", {}) + return cls( + id=UUID(data["id"]) if isinstance(data.get("id"), str) else data.get("id", uuid4()), + name=data["name"], + description=data.get("description", ""), + position_x=position.get("x", 0.0), + position_y=position.get("y", 0.0), + color=data.get("color"), + icon=data.get("icon"), + remote_url=data.get("remoteUrl") or data.get("remote_url"), + trusted_identity=data.get("trustedIdentity") or data.get("trusted_identity"), + api_endpoint=data.get("apiEndpoint") or data.get("api_endpoint"), + api_method=data.get("apiMethod") or data.get("api_method", "POST"), + api_headers=data.get("apiHeaders") or data.get("api_headers", {}), + api_auth_type=data.get("apiAuthType") or data.get("api_auth_type"), + api_auth_env_var=data.get("apiAuthEnvVar") or data.get("api_auth_env_var"), + response_mapping=data.get("responseMapping") or data.get("response_mapping", {}), + ) + + def to_listener_config(self) -> dict[str, Any]: + """Convert to organism.yaml listener/gateway configuration.""" + if self.remote_url: + # Federation gateway + return { + "name": self.name, + "remote_url": self.remote_url, + "trusted_identity": self.trusted_identity, + "description": self.description, + } + elif self.api_endpoint: + # REST API gateway (custom handler) + return { + "name": self.name, + "description": self.description, + "handler": "bloxserver.gateways.rest.handle_rest_gateway", + "payload_class": "bloxserver.gateways.rest.RestGatewayPayload", + "gateway_config": { + "endpoint": self.api_endpoint, + "method": self.api_method, + "headers": self.api_headers, + "auth_type": self.api_auth_type, + "auth_env_var": self.api_auth_env_var, + "response_mapping": self.response_mapping, + }, + } + else: + raise ValueError( + f"Gateway '{self.name}' requires either remote_url (federation) " + "or api_endpoint (REST)" + ) diff --git a/bloxserver/domain/triggers.py b/bloxserver/domain/triggers.py new file mode 100644 index 0000000..053dfbc --- /dev/null +++ b/bloxserver/domain/triggers.py @@ -0,0 +1,317 @@ +""" +Trigger types for starting flow execution. + +Triggers define how a flow is initiated: +- Webhook: External HTTP POST to a unique URL +- Schedule: Cron-based scheduled execution +- Manual: User-initiated from dashboard +- Event: Internal event subscription +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any +from uuid import UUID, uuid4 + + +class TriggerType(str, Enum): + """Types of triggers that can start a flow.""" + + WEBHOOK = "webhook" + SCHEDULE = "schedule" + MANUAL = "manual" + EVENT = "event" + + +# ============================================================================= +# Trigger Configs +# ============================================================================= + + +@dataclass +class WebhookConfig: + """Configuration for webhook triggers.""" + + # Auto-generated token for authentication + token: str | None = None + + # Optional: require specific headers + required_headers: dict[str, str] = field(default_factory=dict) + + # Optional: IP allowlist + allowed_ips: list[str] = field(default_factory=list) + + # Payload transformation + payload_template: str | None = None # Jinja2 template for input transformation + + def to_dict(self) -> dict[str, Any]: + return { + "token": self.token, + "requiredHeaders": self.required_headers, + "allowedIps": self.allowed_ips, + "payloadTemplate": self.payload_template, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> WebhookConfig: + return cls( + token=data.get("token"), + required_headers=data.get("requiredHeaders") or data.get("required_headers", {}), + allowed_ips=data.get("allowedIps") or data.get("allowed_ips", []), + payload_template=data.get("payloadTemplate") or data.get("payload_template"), + ) + + +@dataclass +class ScheduleConfig: + """Configuration for scheduled triggers.""" + + # Cron expression (e.g., "0 9 * * *" for 9 AM daily) + cron: str + + # Timezone (e.g., "America/New_York") + timezone: str = "UTC" + + # Optional: specific dates to skip + skip_dates: list[str] = field(default_factory=list) + + # Optional: payload to inject on trigger + default_payload: dict[str, Any] = field(default_factory=dict) + + # Execution window (skip if missed by more than N minutes) + grace_period_minutes: int = 5 + + def to_dict(self) -> dict[str, Any]: + return { + "cron": self.cron, + "timezone": self.timezone, + "skipDates": self.skip_dates, + "defaultPayload": self.default_payload, + "gracePeriodMinutes": self.grace_period_minutes, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ScheduleConfig: + return cls( + cron=data["cron"], + timezone=data.get("timezone", "UTC"), + skip_dates=data.get("skipDates") or data.get("skip_dates", []), + default_payload=data.get("defaultPayload") or data.get("default_payload", {}), + grace_period_minutes=data.get("gracePeriodMinutes") or data.get("grace_period_minutes", 5), + ) + + +@dataclass +class ManualConfig: + """Configuration for manual triggers.""" + + # Form fields to show in dashboard + input_fields: list[dict[str, Any]] = field(default_factory=list) + + # Default values + default_payload: dict[str, Any] = field(default_factory=dict) + + # Confirmation required before execution + require_confirmation: bool = False + confirmation_message: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "inputFields": self.input_fields, + "defaultPayload": self.default_payload, + "requireConfirmation": self.require_confirmation, + "confirmationMessage": self.confirmation_message, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ManualConfig: + return cls( + input_fields=data.get("inputFields") or data.get("input_fields", []), + default_payload=data.get("defaultPayload") or data.get("default_payload", {}), + require_confirmation=data.get("requireConfirmation") or data.get("require_confirmation", False), + confirmation_message=data.get("confirmationMessage") or data.get("confirmation_message"), + ) + + +@dataclass +class EventConfig: + """Configuration for internal event triggers.""" + + # Event name to subscribe to + event_name: str + + # Optional: filter expression + filter_expression: str | None = None + + # Source flow ID (if triggered by another flow) + source_flow_id: UUID | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "eventName": self.event_name, + "filterExpression": self.filter_expression, + "sourceFlowId": str(self.source_flow_id) if self.source_flow_id else None, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> EventConfig: + source_flow = data.get("sourceFlowId") or data.get("source_flow_id") + return cls( + event_name=data["eventName"] if "eventName" in data else data["event_name"], + filter_expression=data.get("filterExpression") or data.get("filter_expression"), + source_flow_id=UUID(source_flow) if source_flow else None, + ) + + +# Union type for trigger configs +TriggerConfig = WebhookConfig | ScheduleConfig | ManualConfig | EventConfig + + +# ============================================================================= +# Trigger +# ============================================================================= + + +@dataclass +class Trigger: + """ + A trigger that initiates flow execution. + + Triggers are connected to entry nodes in the flow. + When fired, they inject a message into the entry node(s). + """ + + id: UUID + name: str + trigger_type: TriggerType + config: TriggerConfig + + # Which node(s) to send the initial message to + target_node_ids: list[UUID] = field(default_factory=list) + + # Enabled state + enabled: bool = True + + # Metadata + created_at: datetime | None = None + last_triggered_at: datetime | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "id": str(self.id), + "name": self.name, + "triggerType": self.trigger_type.value, + "config": self.config.to_dict(), + "targetNodeIds": [str(nid) for nid in self.target_node_ids], + "enabled": self.enabled, + "createdAt": self.created_at.isoformat() if self.created_at else None, + "lastTriggeredAt": self.last_triggered_at.isoformat() if self.last_triggered_at else None, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Trigger: + trigger_type = TriggerType(data.get("triggerType") or data.get("trigger_type")) + config_data = data.get("config", {}) + + # Parse config based on trigger type + config: TriggerConfig + if trigger_type == TriggerType.WEBHOOK: + config = WebhookConfig.from_dict(config_data) + elif trigger_type == TriggerType.SCHEDULE: + config = ScheduleConfig.from_dict(config_data) + elif trigger_type == TriggerType.MANUAL: + config = ManualConfig.from_dict(config_data) + elif trigger_type == TriggerType.EVENT: + config = EventConfig.from_dict(config_data) + else: + raise ValueError(f"Unknown trigger type: {trigger_type}") + + # Parse target node IDs + target_ids_raw = data.get("targetNodeIds") or data.get("target_node_ids", []) + target_node_ids = [ + UUID(nid) if isinstance(nid, str) else nid + for nid in target_ids_raw + ] + + # Parse timestamps + created_at = None + if created := data.get("createdAt") or data.get("created_at"): + created_at = datetime.fromisoformat(created) if isinstance(created, str) else created + + last_triggered = None + if triggered := data.get("lastTriggeredAt") or data.get("last_triggered_at"): + last_triggered = datetime.fromisoformat(triggered) if isinstance(triggered, str) else triggered + + return cls( + id=UUID(data["id"]) if isinstance(data.get("id"), str) else data.get("id", uuid4()), + name=data["name"], + trigger_type=trigger_type, + config=config, + target_node_ids=target_node_ids, + enabled=data.get("enabled", True), + created_at=created_at, + last_triggered_at=last_triggered, + ) + + def get_webhook_url(self, base_url: str, flow_id: UUID) -> str | None: + """Get the webhook URL if this is a webhook trigger.""" + if self.trigger_type != TriggerType.WEBHOOK: + return None + + if not isinstance(self.config, WebhookConfig): + return None + + token = self.config.token or "" + return f"{base_url}/webhooks/{flow_id}/{self.id}?token={token}" + + +def create_webhook_trigger( + name: str, + target_node_ids: list[UUID], + token: str | None = None, +) -> Trigger: + """Factory function to create a webhook trigger.""" + return Trigger( + id=uuid4(), + name=name, + trigger_type=TriggerType.WEBHOOK, + config=WebhookConfig(token=token), + target_node_ids=target_node_ids, + enabled=True, + ) + + +def create_schedule_trigger( + name: str, + cron: str, + target_node_ids: list[UUID], + timezone: str = "UTC", +) -> Trigger: + """Factory function to create a scheduled trigger.""" + return Trigger( + id=uuid4(), + name=name, + trigger_type=TriggerType.SCHEDULE, + config=ScheduleConfig(cron=cron, timezone=timezone), + target_node_ids=target_node_ids, + enabled=True, + ) + + +def create_manual_trigger( + name: str, + target_node_ids: list[UUID], + input_fields: list[dict[str, Any]] | None = None, +) -> Trigger: + """Factory function to create a manual trigger.""" + return Trigger( + id=uuid4(), + name=name, + trigger_type=TriggerType.MANUAL, + config=ManualConfig(input_fields=input_fields or []), + target_node_ids=target_node_ids, + enabled=True, + )