Three workstreams implemented: W1 (Repo Split): Remove proprietary BloxServer files and docs, update pyproject.toml URLs to public GitHub, clean doc references, add CI workflow (.github/workflows/ci.yml) and CONTRIBUTING.md. W2 (Message Journal): Add DispatchHook protocol for dispatch lifecycle events, SQLite-backed MessageJournal with WAL mode for certified-mail delivery guarantees (PENDING→DISPATCHED→ACKED/FAILED), integrate hooks into StreamPump._dispatch_to_handlers(), add journal REST endpoints, and aiosqlite dependency. W3 (Hot Deployment): Add RestartOrchestrator for graceful restart with queue drain and journal stats collection, SIGHUP signal handler in CLI, POST /organism/restart endpoint, restart-aware app lifespan with journal recovery on boot, and os.execv/subprocess re-exec for Unix/Windows. All 439 tests pass (37 new tests for W2/W3). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
115 lines
3.2 KiB
Python
115 lines
3.2 KiB
Python
"""
|
|
dispatch_hook.py — Hook protocol for message dispatch lifecycle.
|
|
|
|
Defines a Protocol that consumers implement to observe dispatch events.
|
|
The pump calls hooks at key lifecycle points: intent, dispatched,
|
|
acknowledged, failed, and thread completion.
|
|
|
|
Multiple hooks can be registered. No hook registered = no overhead.
|
|
|
|
Example:
|
|
class MyAuditHook:
|
|
async def on_intent(self, thread_id, from_id, to_id,
|
|
payload_type, payload_bytes) -> str:
|
|
print(f"Intent: {from_id} -> {to_id}")
|
|
return str(uuid.uuid4())
|
|
|
|
async def on_acknowledged(self, entry_id):
|
|
print(f"Acked: {entry_id}")
|
|
|
|
# ... other methods
|
|
|
|
pump.register_dispatch_hook(MyAuditHook())
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Protocol, runtime_checkable
|
|
|
|
|
|
@runtime_checkable
|
|
class DispatchHook(Protocol):
|
|
"""
|
|
Hook into message dispatch lifecycle.
|
|
|
|
Implement this protocol for logging, journaling, auditing, or
|
|
any other dispatch observation. The pump calls these methods
|
|
at the corresponding lifecycle points.
|
|
|
|
Lifecycle:
|
|
1. on_intent — Before handler invocation (message about to be dispatched)
|
|
2. on_dispatched — Handler invocation has started
|
|
3. on_acknowledged — Handler returned successfully
|
|
4. on_failed — Handler raised an exception
|
|
5. on_thread_complete — Thread was pruned or terminated
|
|
"""
|
|
|
|
async def on_intent(
|
|
self,
|
|
thread_id: str,
|
|
from_id: str,
|
|
to_id: str,
|
|
payload_type: str,
|
|
payload_bytes: bytes,
|
|
) -> str:
|
|
"""
|
|
Called before dispatch. Returns entry_id for correlation.
|
|
|
|
Args:
|
|
thread_id: Opaque thread UUID
|
|
from_id: Sender's registered name
|
|
to_id: Target listener name
|
|
payload_type: Payload class name
|
|
payload_bytes: Serialized payload bytes (for replay)
|
|
|
|
Returns:
|
|
A unique entry_id string for correlating subsequent callbacks.
|
|
"""
|
|
...
|
|
|
|
async def on_dispatched(self, entry_id: str) -> None:
|
|
"""Called when handler invocation begins."""
|
|
...
|
|
|
|
async def on_acknowledged(self, entry_id: str) -> None:
|
|
"""Called when handler returns successfully."""
|
|
...
|
|
|
|
async def on_failed(self, entry_id: str, error: str) -> None:
|
|
"""Called when handler raises an exception."""
|
|
...
|
|
|
|
async def on_thread_complete(self, thread_id: str) -> None:
|
|
"""Called when a thread is pruned or terminated."""
|
|
...
|
|
|
|
|
|
class NullDispatchHook:
|
|
"""
|
|
No-op default hook. Zero overhead when no journal is configured.
|
|
|
|
All methods are synchronous no-ops that satisfy the DispatchHook
|
|
protocol but do nothing.
|
|
"""
|
|
|
|
async def on_intent(
|
|
self,
|
|
thread_id: str,
|
|
from_id: str,
|
|
to_id: str,
|
|
payload_type: str,
|
|
payload_bytes: bytes,
|
|
) -> str:
|
|
return ""
|
|
|
|
async def on_dispatched(self, entry_id: str) -> None:
|
|
pass
|
|
|
|
async def on_acknowledged(self, entry_id: str) -> None:
|
|
pass
|
|
|
|
async def on_failed(self, entry_id: str, error: str) -> None:
|
|
pass
|
|
|
|
async def on_thread_complete(self, thread_id: str) -> None:
|
|
pass
|