Major refactor of the message pump architecture: - Replace bus.py with stream_pump.py using aiostream for composable stream processing with natural fan-out via flatmap - Add to_id field to MessageState for explicit routing - Fix routing to use to_id.class format (e.g., "greeter.greeting") - Generate XSD schemas from xmlified payload classes - Fix xmlable imports (absolute -> relative) and parse_element ctx New features: - handlers/hello.py: Sample Greeting/GreetingResponse handler - config/organism.yaml: Sample organism configuration - 41 tests (31 unit + 10 integration) all passing Schema changes: - envelope.xsd: Allow any namespace payloads (##other -> ##any) Dependencies added to pyproject.toml: - aiostream>=0.5 (core dependency) - pyhumps, termcolor (for xmlable) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
45 lines
No EOL
1.6 KiB
Python
45 lines
No EOL
1.6 KiB
Python
"""
|
|
deserialization.py — Convert validated payload_tree into typed dataclass instance.
|
|
|
|
After xsd_validation_step confirms the payload conforms to the contract,
|
|
this step uses our customized xmlable routines to deserialize the lxml Element
|
|
directly in memory — no temporary files needed.
|
|
|
|
Part of AgentServer v2.1 message pump.
|
|
"""
|
|
|
|
from lxml.etree import _Element
|
|
from agentserver.message_bus.message_state import MessageState
|
|
|
|
# Import the customized parse_element from your forked xmlable
|
|
from third_party.xmlable import parse_element # adjust path if needed
|
|
|
|
|
|
async def deserialization_step(state: MessageState) -> MessageState:
|
|
"""
|
|
Deserialize the validated payload_tree into the listener's @xmlify dataclass.
|
|
|
|
Requires:
|
|
- state.payload_tree: validated lxml Element
|
|
- state.metadata["payload_class"]: the target dataclass
|
|
|
|
Uses the custom parse_element routine for direct in-memory deserialization.
|
|
"""
|
|
if state.payload_tree is None:
|
|
state.error = "deserialization_step: no payload_tree (previous step failed)"
|
|
return state
|
|
|
|
payload_class = state.metadata.get("payload_class")
|
|
if payload_class is None:
|
|
state.error = "deserialization_step: no payload_class in metadata (listener misconfigured)"
|
|
return state
|
|
|
|
try:
|
|
# Direct in-memory deserialization — fast and clean
|
|
instance = parse_element(payload_class, state.payload_tree)
|
|
state.payload = instance
|
|
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
state.error = f"deserialization_step failed: {exc}"
|
|
|
|
return state |