From 1e2073a81c60c8b952f5a92e3c69648bb947e0a1 Mon Sep 17 00:00:00 2001 From: Donna Date: Mon, 26 Jan 2026 07:23:03 +0000 Subject: [PATCH] Add edge_mappings table for AI-assisted field mapping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New table stores: - Edge identification (from_node → to_node) - Analysis results (confidence, level, method) - Proposed mapping (AI-generated) - User mapping (overrides) - Confirmation status Indexes: - By flow_id for listing - Unique on (flow_id, from_node, to_node) for upsert This supports the edge analysis API for visual wiring in the canvas. Co-authored-by: Dan --- .../fedaf93bff56_add_edge_mappings_table.py | 49 ++++++++++++++ bloxserver/api/models/__init__.py | 2 + bloxserver/api/models/tables.py | 65 +++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 bloxserver/alembic/versions/fedaf93bff56_add_edge_mappings_table.py diff --git a/bloxserver/alembic/versions/fedaf93bff56_add_edge_mappings_table.py b/bloxserver/alembic/versions/fedaf93bff56_add_edge_mappings_table.py new file mode 100644 index 0000000..b96c1a0 --- /dev/null +++ b/bloxserver/alembic/versions/fedaf93bff56_add_edge_mappings_table.py @@ -0,0 +1,49 @@ +"""add edge_mappings table + +Revision ID: fedaf93bff56 +Revises: 7136cc209524 +Create Date: 2026-01-26 07:22:32.557309 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'fedaf93bff56' +down_revision: Union[str, Sequence[str], None] = '7136cc209524' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_table('edge_mappings', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('flow_id', sa.UUID(), nullable=False), + sa.Column('from_node', sa.String(length=100), nullable=False), + sa.Column('to_node', sa.String(length=100), nullable=False), + sa.Column('confidence', sa.Numeric(precision=3, scale=2), nullable=True), + sa.Column('level', sa.Enum('HIGH', 'MEDIUM', 'LOW', name='confidencelevel'), nullable=True), + sa.Column('analysis_method', sa.String(length=20), nullable=True), + sa.Column('proposed_mapping', sa.JSON(), nullable=True), + sa.Column('user_mapping', sa.JSON(), nullable=True), + sa.Column('user_confirmed', sa.Boolean(), nullable=False), + sa.Column('analyzed_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('confirmed_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=False), + sa.ForeignKeyConstraint(['flow_id'], ['flows.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('idx_edge_mappings_edge', 'edge_mappings', ['flow_id', 'from_node', 'to_node'], unique=True) + op.create_index('idx_edge_mappings_flow', 'edge_mappings', ['flow_id'], unique=False) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_index('idx_edge_mappings_flow', table_name='edge_mappings') + op.drop_index('idx_edge_mappings_edge', table_name='edge_mappings') + op.drop_table('edge_mappings') diff --git a/bloxserver/api/models/__init__.py b/bloxserver/api/models/__init__.py index f8a248d..3f7cb40 100644 --- a/bloxserver/api/models/__init__.py +++ b/bloxserver/api/models/__init__.py @@ -2,6 +2,7 @@ from bloxserver.api.models.database import Base, get_db, init_db from bloxserver.api.models.tables import ( + EdgeMappingRecord, ExecutionRecord, FlowRecord, TriggerRecord, @@ -20,4 +21,5 @@ __all__ = [ "ExecutionRecord", "UserApiKeyRecord", "UsageRecord", + "EdgeMappingRecord", ] diff --git a/bloxserver/api/models/tables.py b/bloxserver/api/models/tables.py index a3c33f6..c5a09dd 100644 --- a/bloxserver/api/models/tables.py +++ b/bloxserver/api/models/tables.py @@ -379,3 +379,68 @@ class StripeEventRecord(Base): __table_args__ = ( Index("idx_stripe_events_processed", "processed_at"), ) + + +# ============================================================================= +# Edge Mappings (AI-assisted field mapping between nodes) +# ============================================================================= + + +class ConfidenceLevel(str, enum.Enum): + """Confidence level for edge mapping analysis.""" + + HIGH = "high" # Green - auto-mapped, ready to run + MEDIUM = "medium" # Yellow - review suggested + LOW = "low" # Red - manual mapping needed + + +class EdgeMappingRecord(Base): + """ + Mapping between two nodes in a flow. + + Stores both AI-proposed mappings and user overrides. + Used by the sequencer to transform outputs to inputs. + """ + + __tablename__ = "edge_mappings" + + id: Mapped[UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid4 + ) + flow_id: Mapped[UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("flows.id", ondelete="CASCADE"), nullable=False + ) + + # Edge identification + from_node: Mapped[str] = mapped_column(String(100), nullable=False) + to_node: Mapped[str] = mapped_column(String(100), nullable=False) + + # Analysis results + confidence: Mapped[float | None] = mapped_column(Numeric(3, 2)) + level: Mapped[ConfidenceLevel | None] = mapped_column(Enum(ConfidenceLevel)) + analysis_method: Mapped[str | None] = mapped_column(String(20)) # 'heuristic' or 'llm' + + # The mappings (JSON) + proposed_mapping: Mapped[dict[str, Any] | None] = mapped_column(JSON) + user_mapping: Mapped[dict[str, Any] | None] = mapped_column(JSON) # User overrides + + # Status + user_confirmed: Mapped[bool] = mapped_column(Boolean, default=False) + + # Timestamps + analyzed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + confirmed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now() + ) + + # Relationships + flow: Mapped[FlowRecord] = relationship("FlowRecord") + + __table_args__ = ( + Index("idx_edge_mappings_flow", "flow_id"), + Index("idx_edge_mappings_edge", "flow_id", "from_node", "to_node", unique=True), + )