xml-pipeline/agentserver/utils/message.py
2025-12-30 23:35:31 -08:00

62 lines
2.3 KiB
Python

import logging
from typing import List, Tuple, Optional
from lxml import etree
logger = logging.getLogger("agentserver.message")
class XmlTamperError(Exception):
"""Raised when XML is fundamentally unparseable or violates security constraints."""
pass
def repair_and_canonicalize(raw_xml: bytes) -> etree.Element:
"""
The 'Immune System' of the Organism.
Parses, repairs, and injects the <huh/> scar tissue into the metadata.
"""
repairs: List[str] = []
# 1. Initial Parse with Recovery
parser = etree.XMLParser(recover=True, remove_blank_text=True)
try:
# If it's totally broken (not even XML-ish), this will still fail
root = etree.fromstring(raw_xml, parser=parser)
except etree.XMLSyntaxError as e:
raise XmlTamperError(f"Fatal XML corruption: {e}")
# 2. Check for parser-level repairs (structural fixes)
for error in parser.error_log:
repairs.append(f"Structural fix: {error.message} at line {error.line}")
# 3. Canonicalize Internal Logic (C14N)
# We strip comments and processing instructions to ensure the 'Skeleton' is clean
# Note: In a real C14N impl, you'd use etree.tostring(root, method="c14n")
# but here we keep it as a tree for the MessageBus.
# 4. Inject <huh/> Scar Tissue
if repairs:
_inject_huh_tag(root, repairs)
return root
def _inject_huh_tag(root: etree.Element, repairs: List[str]):
"""
Finds the <meta> block and inserts a <huh> log of repairs.
"""
# Find or create <meta>
# Note: Using namespaces if defined in your envelope
meta = root.find(".//{https://xml-pipeline.org/ns/envelope/1}meta")
if meta is None:
# If no meta exists, we can't safely log repairs in the standard way
# In a strict system, this might even be a rejection
return
huh = etree.SubElement(meta, "{https://xml-pipeline.org/ns/huh/1}huh")
for r in repairs:
repair_el = etree.SubElement(huh, "{https://xml-pipeline.org/ns/huh/1}repair")
repair_el.text = r
logger.warning(f"Repaired message from {root.tag}: {len(repairs)} issues fixed.")
def to_canonical_bytes(root: etree.Element) -> bytes:
"""Returns the exclusive C14N bytes for cryptographic signing."""
return etree.tostring(root, method="c14n", exclusive=True)