xml-pipeline/xml_pipeline/message_bus/journal.py
dullfig d97c24b1dd
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
Add message journal, graceful restart, and clean repo for public release
Three workstreams implemented:

W1 (Repo Split): Remove proprietary BloxServer files and docs, update
pyproject.toml URLs to public GitHub, clean doc references, add CI
workflow (.github/workflows/ci.yml) and CONTRIBUTING.md.

W2 (Message Journal): Add DispatchHook protocol for dispatch lifecycle
events, SQLite-backed MessageJournal with WAL mode for certified-mail
delivery guarantees (PENDING→DISPATCHED→ACKED/FAILED), integrate hooks
into StreamPump._dispatch_to_handlers(), add journal REST endpoints,
and aiosqlite dependency.

W3 (Hot Deployment): Add RestartOrchestrator for graceful restart with
queue drain and journal stats collection, SIGHUP signal handler in CLI,
POST /organism/restart endpoint, restart-aware app lifespan with journal
recovery on boot, and os.execv/subprocess re-exec for Unix/Windows.

All 439 tests pass (37 new tests for W2/W3).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 22:27:38 -08:00

217 lines
6.8 KiB
Python

"""
journal.py — SQLite-backed certified-mail journal.
Implements the DispatchHook protocol with SQLite persistence.
Provides write-ahead logging for message dispatch, enabling
crash recovery via replay of unacknowledged entries.
Lifecycle:
1. on_intent → INSERT journal entry [PENDING]
2. on_dispatched → UPDATE status [DISPATCHED]
3. on_acknowledged → UPDATE status [ACKED]
4. on_failed → UPDATE status + error [FAILED]
5. on_thread_complete → compact acked entries
6. startup → scan DISPATCHED → re-inject [recovery]
Example:
from xml_pipeline.message_bus.journal import MessageJournal
journal = MessageJournal(db_path="~/.xml-pipeline/journal.db")
await journal.initialize()
pump.register_dispatch_hook(journal)
# On restart, replay unacknowledged entries
entries = await journal.get_unacknowledged()
for entry in entries:
await pump.inject(entry["payload_bytes"], entry["thread_id"], entry["from_id"])
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional
from xml_pipeline.message_bus.journal_store import JournalStore
logger = logging.getLogger(__name__)
class JournalEntryStatus(Enum):
"""Status of a journal entry through its lifecycle."""
PENDING = "pending"
DISPATCHED = "dispatched"
ACKNOWLEDGED = "acked"
FAILED = "failed"
@dataclass(frozen=True)
class JournalEntry:
"""Immutable snapshot of a journal entry."""
id: str
thread_id: str
from_id: str
to_id: str
payload_type: str
payload_bytes: bytes
status: JournalEntryStatus
created_at: str
dispatched_at: Optional[str] = None
acked_at: Optional[str] = None
retry_count: int = 0
error: Optional[str] = None
class MessageJournal:
"""
SQLite-backed certified-mail journal.
Implements DispatchHook to track message dispatch lifecycle.
Provides crash recovery via replay of unacknowledged entries.
"""
def __init__(
self,
db_path: Optional[str] = None,
*,
compact_after_hours: int = 24,
retry_after_seconds: float = 30.0,
max_retries: int = 3,
) -> None:
self._store = JournalStore(db_path)
self._compact_after_hours = compact_after_hours
self._retry_after_seconds = retry_after_seconds
self._max_retries = max_retries
async def initialize(self) -> None:
"""Initialize the underlying store."""
await self._store.initialize()
# ------------------------------------------------------------------
# DispatchHook implementation
# ------------------------------------------------------------------
async def on_intent(
self,
thread_id: str,
from_id: str,
to_id: str,
payload_type: str,
payload_bytes: bytes,
) -> str:
"""Record intent to dispatch. Returns entry_id for correlation."""
entry_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
await self._store.insert(
entry_id=entry_id,
thread_id=thread_id,
from_id=from_id,
to_id=to_id,
payload_type=payload_type,
payload_bytes=payload_bytes,
status=JournalEntryStatus.PENDING.value,
created_at=now,
)
logger.debug(f"Journal: intent {entry_id[:8]}... {from_id}->{to_id}")
return entry_id
async def on_dispatched(self, entry_id: str) -> None:
"""Mark entry as dispatched (handler invocation started)."""
if not entry_id:
return
now = datetime.now(timezone.utc).isoformat()
await self._store.update_status(
entry_id,
JournalEntryStatus.DISPATCHED.value,
timestamp_field="dispatched_at",
timestamp_value=now,
)
logger.debug(f"Journal: dispatched {entry_id[:8]}...")
async def on_acknowledged(self, entry_id: str) -> None:
"""Mark entry as acknowledged (handler returned successfully)."""
if not entry_id:
return
now = datetime.now(timezone.utc).isoformat()
await self._store.update_status(
entry_id,
JournalEntryStatus.ACKNOWLEDGED.value,
timestamp_field="acked_at",
timestamp_value=now,
)
logger.debug(f"Journal: acked {entry_id[:8]}...")
async def on_failed(self, entry_id: str, error: str) -> None:
"""Mark entry as failed with error details."""
if not entry_id:
return
now = datetime.now(timezone.utc).isoformat()
await self._store.update_status(
entry_id,
JournalEntryStatus.FAILED.value,
error=error,
timestamp_value=now,
)
logger.warning(f"Journal: failed {entry_id[:8]}... — {error}")
async def on_thread_complete(self, thread_id: str) -> None:
"""Compact acknowledged entries for the completed thread."""
count = await self._store.compact_thread(thread_id)
if count:
logger.debug(
f"Journal: compacted {count} entries for thread {thread_id[:8]}..."
)
# ------------------------------------------------------------------
# Journal-specific methods (not part of DispatchHook protocol)
# ------------------------------------------------------------------
async def get_unacknowledged(
self,
older_than_seconds: Optional[float] = None,
) -> List[Dict[str, Any]]:
"""
Get entries that need replay (pending or dispatched but never acked).
Used for crash recovery on startup.
Args:
older_than_seconds: Override default retry delay
Returns:
List of entry dicts with payload_bytes for re-injection
"""
delay = older_than_seconds or self._retry_after_seconds
return await self._store.get_unacknowledged(
older_than_seconds=delay,
max_retries=self._max_retries,
)
async def compact_old(self, max_age_hours: Optional[int] = None) -> int:
"""
Remove old acknowledged entries.
Args:
max_age_hours: Override default compaction age
Returns:
Number of entries removed
"""
from datetime import timedelta
hours = max_age_hours or self._compact_after_hours
cutoff = (
datetime.now(timezone.utc) - timedelta(hours=hours)
).isoformat()
return await self._store.compact_old(cutoff)
async def get_stats(self) -> Dict[str, int]:
"""Get entry counts by status."""
return await self._store.get_stats()