""" journal.py — SQLite-backed certified-mail journal. Implements the DispatchHook protocol with SQLite persistence. Provides write-ahead logging for message dispatch, enabling crash recovery via replay of unacknowledged entries. Lifecycle: 1. on_intent → INSERT journal entry [PENDING] 2. on_dispatched → UPDATE status [DISPATCHED] 3. on_acknowledged → UPDATE status [ACKED] 4. on_failed → UPDATE status + error [FAILED] 5. on_thread_complete → compact acked entries 6. startup → scan DISPATCHED → re-inject [recovery] Example: from xml_pipeline.message_bus.journal import MessageJournal journal = MessageJournal(db_path="~/.xml-pipeline/journal.db") await journal.initialize() pump.register_dispatch_hook(journal) # On restart, replay unacknowledged entries entries = await journal.get_unacknowledged() for entry in entries: await pump.inject(entry["payload_bytes"], entry["thread_id"], entry["from_id"]) """ from __future__ import annotations import logging import uuid from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum from typing import Any, Dict, List, Optional from xml_pipeline.message_bus.journal_store import JournalStore logger = logging.getLogger(__name__) class JournalEntryStatus(Enum): """Status of a journal entry through its lifecycle.""" PENDING = "pending" DISPATCHED = "dispatched" ACKNOWLEDGED = "acked" FAILED = "failed" @dataclass(frozen=True) class JournalEntry: """Immutable snapshot of a journal entry.""" id: str thread_id: str from_id: str to_id: str payload_type: str payload_bytes: bytes status: JournalEntryStatus created_at: str dispatched_at: Optional[str] = None acked_at: Optional[str] = None retry_count: int = 0 error: Optional[str] = None class MessageJournal: """ SQLite-backed certified-mail journal. Implements DispatchHook to track message dispatch lifecycle. Provides crash recovery via replay of unacknowledged entries. """ def __init__( self, db_path: Optional[str] = None, *, compact_after_hours: int = 24, retry_after_seconds: float = 30.0, max_retries: int = 3, ) -> None: self._store = JournalStore(db_path) self._compact_after_hours = compact_after_hours self._retry_after_seconds = retry_after_seconds self._max_retries = max_retries async def initialize(self) -> None: """Initialize the underlying store.""" await self._store.initialize() # ------------------------------------------------------------------ # DispatchHook implementation # ------------------------------------------------------------------ async def on_intent( self, thread_id: str, from_id: str, to_id: str, payload_type: str, payload_bytes: bytes, ) -> str: """Record intent to dispatch. Returns entry_id for correlation.""" entry_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() await self._store.insert( entry_id=entry_id, thread_id=thread_id, from_id=from_id, to_id=to_id, payload_type=payload_type, payload_bytes=payload_bytes, status=JournalEntryStatus.PENDING.value, created_at=now, ) logger.debug(f"Journal: intent {entry_id[:8]}... {from_id}->{to_id}") return entry_id async def on_dispatched(self, entry_id: str) -> None: """Mark entry as dispatched (handler invocation started).""" if not entry_id: return now = datetime.now(timezone.utc).isoformat() await self._store.update_status( entry_id, JournalEntryStatus.DISPATCHED.value, timestamp_field="dispatched_at", timestamp_value=now, ) logger.debug(f"Journal: dispatched {entry_id[:8]}...") async def on_acknowledged(self, entry_id: str) -> None: """Mark entry as acknowledged (handler returned successfully).""" if not entry_id: return now = datetime.now(timezone.utc).isoformat() await self._store.update_status( entry_id, JournalEntryStatus.ACKNOWLEDGED.value, timestamp_field="acked_at", timestamp_value=now, ) logger.debug(f"Journal: acked {entry_id[:8]}...") async def on_failed(self, entry_id: str, error: str) -> None: """Mark entry as failed with error details.""" if not entry_id: return now = datetime.now(timezone.utc).isoformat() await self._store.update_status( entry_id, JournalEntryStatus.FAILED.value, error=error, timestamp_value=now, ) logger.warning(f"Journal: failed {entry_id[:8]}... — {error}") async def on_thread_complete(self, thread_id: str) -> None: """Compact acknowledged entries for the completed thread.""" count = await self._store.compact_thread(thread_id) if count: logger.debug( f"Journal: compacted {count} entries for thread {thread_id[:8]}..." ) # ------------------------------------------------------------------ # Journal-specific methods (not part of DispatchHook protocol) # ------------------------------------------------------------------ async def get_unacknowledged( self, older_than_seconds: Optional[float] = None, ) -> List[Dict[str, Any]]: """ Get entries that need replay (pending or dispatched but never acked). Used for crash recovery on startup. Args: older_than_seconds: Override default retry delay Returns: List of entry dicts with payload_bytes for re-injection """ delay = older_than_seconds or self._retry_after_seconds return await self._store.get_unacknowledged( older_than_seconds=delay, max_retries=self._max_retries, ) async def compact_old(self, max_age_hours: Optional[int] = None) -> int: """ Remove old acknowledged entries. Args: max_age_hours: Override default compaction age Returns: Number of entries removed """ from datetime import timedelta hours = max_age_hours or self._compact_after_hours cutoff = ( datetime.now(timezone.utc) - timedelta(hours=hours) ).isoformat() return await self._store.compact_old(cutoff) async def get_stats(self) -> Dict[str, int]: """Get entry counts by status.""" return await self._store.get_stats()