From 809862af3500a5787c0d13f270f5ea4eceb4a7f2 Mon Sep 17 00:00:00 2001 From: dullfig Date: Tue, 27 Jan 2026 20:00:35 -0800 Subject: [PATCH] Add Ed25519 envelope signing infrastructure Implement cryptographic signing for message envelopes using Ed25519: - Identity module: Generate, load, save Ed25519 keypairs - Signing module: Sign/verify envelopes using Exclusive C14N - Envelope utilities: Build envelopes with optional signing - CLI keygen command: xml-pipeline keygen [-o path] - Pump integration: Auto-sign when identity configured Signature is embedded in block using namespace https://xml-pipeline.org/ns/sig/v1, fitting existing xs:any in envelope.xsd. Usage: xml-pipeline keygen -o config/identity.key # organism.yaml organism: identity: "config/identity.key" Co-Authored-By: Claude Opus 4.5 --- tests/test_crypto.py | 431 ++++++++++++++++++++++++ xml_pipeline/cli.py | 46 +++ xml_pipeline/crypto/__init__.py | 46 +++ xml_pipeline/crypto/identity.py | 265 +++++++++++++++ xml_pipeline/crypto/signing.py | 264 +++++++++++++++ xml_pipeline/message_bus/envelope.py | 162 +++++++++ xml_pipeline/message_bus/stream_pump.py | 33 +- 7 files changed, 1243 insertions(+), 4 deletions(-) create mode 100644 tests/test_crypto.py create mode 100644 xml_pipeline/crypto/__init__.py create mode 100644 xml_pipeline/crypto/identity.py create mode 100644 xml_pipeline/crypto/signing.py diff --git a/tests/test_crypto.py b/tests/test_crypto.py new file mode 100644 index 0000000..c58b3ef --- /dev/null +++ b/tests/test_crypto.py @@ -0,0 +1,431 @@ +""" +test_crypto.py — Tests for Ed25519 identity and envelope signing. + +Tests: +1. Key generation and loading +2. Envelope signing +3. Signature verification +4. Tamper detection +5. Edge cases +""" + +import tempfile +from pathlib import Path + +import pytest +from lxml import etree + +from xml_pipeline.crypto.identity import ( + Identity, + IdentityError, + generate_identity, + load_public_key, + verify_with_public_key, +) +from xml_pipeline.crypto.signing import ( + SIGNATURE_NAMESPACE, + SignatureError, + extract_signature, + is_signed, + sign_envelope, + strip_signature, + verify_envelope, + verify_envelope_with_identity, +) +from xml_pipeline.message_bus.envelope import ( + build_envelope, + envelope_to_bytes, + extract_meta, + extract_payload, +) + + +class TestIdentityGeneration: + """Test identity key generation.""" + + def test_generate_creates_valid_keypair(self): + """generate_identity() should create a valid Ed25519 keypair.""" + identity = generate_identity() + + assert identity.private_key is not None + assert identity.public_key is not None + assert identity.path == "" + + def test_generated_key_can_sign_and_verify(self): + """Generated identity should be able to sign and verify.""" + identity = generate_identity() + data = b"test message" + + signature = identity.sign(data) + + assert len(signature) == 64 # Ed25519 signatures are 64 bytes + assert identity.verify(signature, data) is True + + def test_signature_fails_for_wrong_data(self): + """Verification should fail for modified data.""" + identity = generate_identity() + data = b"original message" + signature = identity.sign(data) + + assert identity.verify(signature, b"modified message") is False + + def test_sign_base64_produces_valid_encoding(self): + """sign_base64() should produce valid base64.""" + import base64 + + identity = generate_identity() + data = b"test" + + sig_b64 = identity.sign_base64(data) + + # Should decode without error + decoded = base64.b64decode(sig_b64) + assert len(decoded) == 64 + + +class TestIdentityPersistence: + """Test saving and loading identity keys.""" + + def test_save_and_load_roundtrip(self): + """Identity should survive save/load cycle.""" + identity = generate_identity() + data = b"test data to sign" + original_sig = identity.sign(data) + + with tempfile.TemporaryDirectory() as tmpdir: + private_path = Path(tmpdir) / "test.key" + public_path = Path(tmpdir) / "test.pub" + + identity.save(private_path, public_path) + + # Files should exist + assert private_path.exists() + assert public_path.exists() + + # Load and verify + loaded = Identity.load(private_path) + loaded_sig = loaded.sign(data) + + # Signatures should match + assert original_sig == loaded_sig + + def test_load_nonexistent_raises_error(self): + """Loading non-existent key should raise IdentityError.""" + with pytest.raises(IdentityError, match="not found"): + Identity.load("/nonexistent/path/key.pem") + + def test_load_invalid_key_raises_error(self): + """Loading invalid key data should raise IdentityError.""" + with tempfile.NamedTemporaryFile(mode="wb", suffix=".pem", delete=False) as f: + f.write(b"not a valid key") + f.flush() + + with pytest.raises(IdentityError, match="Failed to load"): + Identity.load(f.name) + + +class TestPublicKeyLoading: + """Test loading public keys for verification.""" + + def test_load_public_key(self): + """load_public_key() should load a valid public key.""" + identity = generate_identity() + + with tempfile.TemporaryDirectory() as tmpdir: + private_path = Path(tmpdir) / "test.key" + public_path = Path(tmpdir) / "test.pub" + identity.save(private_path, public_path) + + # Load public key + pubkey = load_public_key(public_path) + + # Should verify signatures + data = b"test" + sig = identity.sign(data) + assert verify_with_public_key(pubkey, sig, data) is True + + def test_load_public_key_nonexistent(self): + """Loading non-existent public key should raise error.""" + with pytest.raises(IdentityError, match="not found"): + load_public_key("/nonexistent/pubkey.pub") + + +class TestEnvelopeSigning: + """Test envelope signing with Ed25519.""" + + @pytest.fixture + def sample_envelope(self): + """Create a sample unsigned envelope.""" + ns = "https://xml-pipeline.org/ns/envelope/v1" + nsmap = {None: ns} + + message = etree.Element(f"{{{ns}}}message", nsmap=nsmap) + meta = etree.SubElement(message, f"{{{ns}}}meta") + + from_elem = etree.SubElement(meta, f"{{{ns}}}from") + from_elem.text = "greeter" + + thread_elem = etree.SubElement(meta, f"{{{ns}}}thread") + thread_elem.text = "test-thread-uuid" + + # Add a payload + payload = etree.SubElement(message, "Greeting") + name_elem = etree.SubElement(payload, "name") + name_elem.text = "Alice" + + return message + + @pytest.fixture + def identity(self): + """Create a test identity.""" + return generate_identity() + + def test_sign_envelope_adds_signature(self, sample_envelope, identity): + """sign_envelope() should add a signature element.""" + signed = sign_envelope(sample_envelope, identity) + + sig = extract_signature(signed) + assert sig is not None + assert len(sig) > 0 + + def test_sign_envelope_creates_copy_by_default(self, sample_envelope, identity): + """sign_envelope() should not modify original by default.""" + original_bytes = etree.tostring(sample_envelope) + + sign_envelope(sample_envelope, identity) + + # Original should be unchanged + assert etree.tostring(sample_envelope) == original_bytes + + def test_sign_envelope_in_place(self, sample_envelope, identity): + """sign_envelope(in_place=True) should modify original.""" + sign_envelope(sample_envelope, identity, in_place=True) + + sig = extract_signature(sample_envelope) + assert sig is not None + + def test_signed_envelope_verifies(self, sample_envelope, identity): + """Signed envelope should verify with same identity.""" + signed = sign_envelope(sample_envelope, identity) + + assert verify_envelope_with_identity(signed, identity) is True + + def test_signed_envelope_verifies_with_public_key(self, sample_envelope, identity): + """Signed envelope should verify with public key.""" + signed = sign_envelope(sample_envelope, identity) + + assert verify_envelope(signed, identity.public_key) is True + + def test_tampered_envelope_fails_verification(self, sample_envelope, identity): + """Modifying signed envelope should fail verification.""" + signed = sign_envelope(sample_envelope, identity) + + # Tamper with the payload + ns = "https://xml-pipeline.org/ns/envelope/v1" + meta = signed.find(f"{{{ns}}}meta") + from_elem = meta.find(f"{{{ns}}}from") + from_elem.text = "evil-agent" + + assert verify_envelope(signed, identity.public_key) is False + + def test_wrong_key_fails_verification(self, sample_envelope, identity): + """Verification with wrong key should fail.""" + signed = sign_envelope(sample_envelope, identity) + + other_identity = generate_identity() + assert verify_envelope(signed, other_identity.public_key) is False + + def test_unsigned_envelope_fails_verification(self, sample_envelope, identity): + """Unsigned envelope should fail verification.""" + assert verify_envelope(sample_envelope, identity.public_key) is False + + +class TestSignatureHelpers: + """Test signature utility functions.""" + + @pytest.fixture + def signed_envelope(self): + """Create a signed envelope.""" + identity = generate_identity() + envelope = build_envelope( + payload=b"data", + from_id="sender", + thread_id="thread-123", + ) + return sign_envelope(envelope, identity) + + def test_is_signed_true_for_signed(self, signed_envelope): + """is_signed() should return True for signed envelope.""" + assert is_signed(signed_envelope) is True + + def test_is_signed_false_for_unsigned(self): + """is_signed() should return False for unsigned envelope.""" + envelope = build_envelope( + payload=b"data", + from_id="sender", + thread_id="thread-123", + ) + assert is_signed(envelope) is False + + def test_extract_signature_returns_base64(self, signed_envelope): + """extract_signature() should return base64 string.""" + import base64 + + sig = extract_signature(signed_envelope) + + assert sig is not None + decoded = base64.b64decode(sig) + assert len(decoded) == 64 + + def test_strip_signature_removes_sig(self, signed_envelope): + """strip_signature() should remove signature element.""" + stripped = strip_signature(signed_envelope) + + assert is_signed(stripped) is False + + def test_strip_signature_preserves_original(self, signed_envelope): + """strip_signature() should not modify original by default.""" + strip_signature(signed_envelope) + + assert is_signed(signed_envelope) is True + + +class TestEnvelopeBuilder: + """Test envelope construction utilities.""" + + def test_build_envelope_creates_valid_structure(self): + """build_envelope() should create valid envelope structure.""" + envelope = build_envelope( + payload=b"Alice", + from_id="greeter", + thread_id="thread-abc", + to_id="shouter", + ) + + meta = extract_meta(envelope) + assert meta["from_id"] == "greeter" + assert meta["to_id"] == "shouter" + assert meta["thread_id"] == "thread-abc" + + payload = extract_payload(envelope) + assert payload is not None + assert payload.tag == "Greeting" + + def test_build_envelope_without_to(self): + """build_envelope() should work without to_id.""" + envelope = build_envelope( + payload=b"", + from_id="sender", + thread_id="thread-123", + ) + + meta = extract_meta(envelope) + assert meta["from_id"] == "sender" + assert meta["to_id"] is None + + def test_build_envelope_with_signing(self): + """build_envelope() should sign when identity provided.""" + identity = generate_identity() + + envelope = build_envelope( + payload=b"", + from_id="sender", + thread_id="thread-123", + identity=identity, + ) + + assert is_signed(envelope) is True + assert verify_envelope(envelope, identity.public_key) is True + + def test_envelope_to_bytes_produces_xml(self): + """envelope_to_bytes() should produce valid XML bytes.""" + envelope = build_envelope( + payload=b"", + from_id="sender", + thread_id="thread-123", + ) + + xml_bytes = envelope_to_bytes(envelope) + + assert xml_bytes.startswith(b"", + from_id="sender", + thread_id="thread-123", + ) + + # Sign with first identity + signed1 = sign_envelope(envelope, identity1) + sig1 = extract_signature(signed1) + + # Re-sign with second identity + signed2 = sign_envelope(signed1, identity2) + sig2 = extract_signature(signed2) + + # Signatures should be different + assert sig1 != sig2 + + # Should verify with second identity only + assert verify_envelope(signed2, identity1.public_key) is False + assert verify_envelope(signed2, identity2.public_key) is True + + def test_sign_envelope_missing_meta_raises(self): + """Signing envelope without meta should raise error.""" + identity = generate_identity() + + # Create envelope without meta + envelope = etree.Element("message") + payload = etree.SubElement(envelope, "Test") + payload.text = "data" + + with pytest.raises(SignatureError, match="missing "): + sign_envelope(envelope, identity) + + def test_empty_payload_signs_correctly(self): + """Empty payload should still sign correctly.""" + identity = generate_identity() + + envelope = build_envelope( + payload=b"", + from_id="sender", + thread_id="thread-123", + ) + + signed = sign_envelope(envelope, identity) + assert verify_envelope(signed, identity.public_key) is True + + +class TestPublicKeyFormats: + """Test public key export formats.""" + + def test_get_public_key_pem(self): + """get_public_key_pem() should return valid PEM.""" + identity = generate_identity() + + pem = identity.get_public_key_pem() + + assert pem.startswith(b"-----BEGIN PUBLIC KEY-----") + assert pem.endswith(b"-----END PUBLIC KEY-----\n") + + def test_get_public_key_base64(self): + """get_public_key_base64() should return compact base64.""" + import base64 + + identity = generate_identity() + + b64 = identity.get_public_key_base64() + + # Should decode to 32 bytes (Ed25519 public key size) + decoded = base64.b64decode(b64) + assert len(decoded) == 32 diff --git a/xml_pipeline/cli.py b/xml_pipeline/cli.py index db0a436..53497c0 100644 --- a/xml_pipeline/cli.py +++ b/xml_pipeline/cli.py @@ -6,6 +6,7 @@ Usage: xml-pipeline init [name] Create new organism config xml-pipeline check [config.yaml] Validate config without running xml-pipeline version Show version info + xml-pipeline keygen [-o path] Generate Ed25519 identity keypair """ import argparse @@ -100,6 +101,41 @@ def cmd_version(args: argparse.Namespace) -> int: return 0 +def cmd_keygen(args: argparse.Namespace) -> int: + """Generate Ed25519 identity keypair.""" + from xml_pipeline.crypto import generate_identity + + output = Path(args.output) + + # Prevent overwrite unless forced + if output.exists() and not args.force: + print(f"Error: {output} already exists. Use --force to overwrite.", file=sys.stderr) + return 1 + + public_path = output.with_suffix(".pub") + if public_path.exists() and not args.force: + print(f"Error: {public_path} already exists. Use --force to overwrite.", file=sys.stderr) + return 1 + + try: + identity = generate_identity() + identity.save(output, public_path) + + print(f"Generated Ed25519 identity keypair:") + print(f" Private key: {output}") + print(f" Public key: {public_path}") + print() + print(f"Add to organism.yaml:") + print(f" organism:") + print(f" identity: \"{output}\"") + print() + print("IMPORTANT: Keep the private key secure. Never commit it to version control.") + return 0 + except Exception as e: + print(f"Error generating keys: {e}", file=sys.stderr) + return 1 + + def main() -> int: """CLI entry point.""" parser = argparse.ArgumentParser( @@ -129,6 +165,16 @@ def main() -> int: version_parser = subparsers.add_parser("version", help="Show version info") version_parser.set_defaults(func=cmd_version) + # keygen + keygen_parser = subparsers.add_parser("keygen", help="Generate Ed25519 identity keypair") + keygen_parser.add_argument( + "-o", "--output", + default="identity.key", + help="Output path for private key (default: identity.key)", + ) + keygen_parser.add_argument("-f", "--force", action="store_true", help="Overwrite existing") + keygen_parser.set_defaults(func=cmd_keygen) + args = parser.parse_args() return args.func(args) diff --git a/xml_pipeline/crypto/__init__.py b/xml_pipeline/crypto/__init__.py new file mode 100644 index 0000000..4c13bc1 --- /dev/null +++ b/xml_pipeline/crypto/__init__.py @@ -0,0 +1,46 @@ +""" +crypto — Ed25519 identity keys for signing and verification. + +This module provides: +- Identity key generation and loading +- Envelope signing using Exclusive C14N +- Signature verification for incoming messages +- Federation peer authentication + +Usage: + from xml_pipeline.crypto import Identity, sign_envelope, verify_envelope + + # Load organism identity + identity = Identity.load("config/identity/private.ed25519") + + # Sign an envelope + signed_envelope = sign_envelope(envelope_tree, identity) + + # Verify with peer's public key + is_valid = verify_envelope(envelope_tree, peer_public_key) +""" + +from xml_pipeline.crypto.identity import ( + Identity, + generate_identity, + load_public_key, +) + +from xml_pipeline.crypto.signing import ( + sign_envelope, + verify_envelope, + extract_signature, + SIGNATURE_NAMESPACE, +) + +__all__ = [ + # Identity + "Identity", + "generate_identity", + "load_public_key", + # Signing + "sign_envelope", + "verify_envelope", + "extract_signature", + "SIGNATURE_NAMESPACE", +] diff --git a/xml_pipeline/crypto/identity.py b/xml_pipeline/crypto/identity.py new file mode 100644 index 0000000..d81b56c --- /dev/null +++ b/xml_pipeline/crypto/identity.py @@ -0,0 +1,265 @@ +""" +identity.py — Ed25519 key management for organism identity. + +Each organism has a unique Ed25519 keypair: +- Private key: Stored securely, used for signing outgoing messages +- Public key: Shared with federation peers for verification + +Key files use PEM format for compatibility. +""" + +from __future__ import annotations + +import base64 +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + +if TYPE_CHECKING: + pass + + +class IdentityError(Exception): + """Raised when identity key operations fail.""" + + pass + + +@dataclass +class Identity: + """ + Organism identity backed by Ed25519 keypair. + + The identity is used for: + - Signing outgoing message envelopes + - Verifying incoming messages from known peers + - Federation peer authentication + """ + + private_key: Ed25519PrivateKey + public_key: Ed25519PublicKey + path: str = "" # Where the key was loaded from (for logging) + + @classmethod + def load(cls, path: str | Path) -> Identity: + """ + Load identity from a PEM-encoded private key file. + + Args: + path: Path to the private key file (e.g., "config/identity/private.ed25519") + + Returns: + Identity instance with loaded keypair + + Raises: + IdentityError: If the key file doesn't exist or is invalid + """ + key_path = Path(path) + + if not key_path.exists(): + raise IdentityError(f"Identity key not found: {path}") + + try: + pem_data = key_path.read_bytes() + private_key = serialization.load_pem_private_key(pem_data, password=None) + + if not isinstance(private_key, Ed25519PrivateKey): + raise IdentityError(f"Key is not Ed25519: {path}") + + public_key = private_key.public_key() + + return cls( + private_key=private_key, + public_key=public_key, + path=str(path), + ) + except Exception as e: + if isinstance(e, IdentityError): + raise + raise IdentityError(f"Failed to load identity key: {e}") from e + + @classmethod + def generate(cls) -> Identity: + """ + Generate a new Ed25519 identity keypair. + + Returns: + Identity instance with freshly generated keypair + """ + private_key = Ed25519PrivateKey.generate() + public_key = private_key.public_key() + + return cls( + private_key=private_key, + public_key=public_key, + path="", + ) + + def save(self, private_path: str | Path, public_path: str | Path | None = None) -> None: + """ + Save the identity keypair to PEM files. + + Args: + private_path: Where to save the private key + public_path: Where to save the public key (optional, derived if not given) + """ + private_path = Path(private_path) + + # Ensure parent directory exists + private_path.parent.mkdir(parents=True, exist_ok=True) + + # Save private key + private_pem = self.private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + private_path.write_bytes(private_pem) + + # Save public key + if public_path is None: + public_path = private_path.with_suffix(".pub") + else: + public_path = Path(public_path) + + public_path.parent.mkdir(parents=True, exist_ok=True) + + public_pem = self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + public_path.write_bytes(public_pem) + + self.path = str(private_path) + + def sign(self, data: bytes) -> bytes: + """ + Sign data with the private key. + + Args: + data: Raw bytes to sign + + Returns: + 64-byte Ed25519 signature + """ + return self.private_key.sign(data) + + def sign_base64(self, data: bytes) -> str: + """ + Sign data and return base64-encoded signature. + + Args: + data: Raw bytes to sign + + Returns: + Base64-encoded signature string + """ + signature = self.sign(data) + return base64.b64encode(signature).decode("ascii") + + def verify(self, signature: bytes, data: bytes) -> bool: + """ + Verify a signature against data using our public key. + + Args: + signature: 64-byte Ed25519 signature + data: Original signed data + + Returns: + True if signature is valid, False otherwise + """ + try: + self.public_key.verify(signature, data) + return True + except Exception: + return False + + def get_public_key_pem(self) -> bytes: + """Get the public key in PEM format for sharing.""" + return self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + def get_public_key_base64(self) -> str: + """Get the raw public key bytes as base64 (compact format).""" + raw_bytes = self.public_key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + return base64.b64encode(raw_bytes).decode("ascii") + + +def generate_identity() -> Identity: + """ + Generate a new Ed25519 identity. + + Convenience function wrapping Identity.generate(). + + Returns: + New Identity instance + """ + return Identity.generate() + + +def load_public_key(path: str | Path) -> Ed25519PublicKey: + """ + Load a public key from a PEM file. + + Used for loading federation peer public keys. + + Args: + path: Path to public key file + + Returns: + Ed25519 public key + + Raises: + IdentityError: If file doesn't exist or key is invalid + """ + key_path = Path(path) + + if not key_path.exists(): + raise IdentityError(f"Public key not found: {path}") + + try: + pem_data = key_path.read_bytes() + public_key = serialization.load_pem_public_key(pem_data) + + if not isinstance(public_key, Ed25519PublicKey): + raise IdentityError(f"Key is not Ed25519: {path}") + + return public_key + except Exception as e: + if isinstance(e, IdentityError): + raise + raise IdentityError(f"Failed to load public key: {e}") from e + + +def verify_with_public_key( + public_key: Ed25519PublicKey, + signature: bytes, + data: bytes, +) -> bool: + """ + Verify a signature using a public key. + + Args: + public_key: Ed25519 public key + signature: 64-byte signature + data: Original signed data + + Returns: + True if valid, False otherwise + """ + try: + public_key.verify(signature, data) + return True + except Exception: + return False diff --git a/xml_pipeline/crypto/signing.py b/xml_pipeline/crypto/signing.py new file mode 100644 index 0000000..eb46cb1 --- /dev/null +++ b/xml_pipeline/crypto/signing.py @@ -0,0 +1,264 @@ +""" +signing.py — Envelope signing and verification using Exclusive C14N + Ed25519. + +Signing Process: +1. Remove any existing signature element from meta +2. Canonicalize the envelope using Exclusive C14N +3. Sign the canonical bytes with Ed25519 +4. Insert signature element into meta block + +Verification Process: +1. Extract and remove signature element from meta +2. Canonicalize the envelope (without signature) +3. Verify signature against canonical bytes + +The signature element uses namespace: https://xml-pipeline.org/ns/sig/v1 +This fits within the xs:any in envelope.xsd meta block. +""" + +from __future__ import annotations + +import base64 +import copy +from typing import TYPE_CHECKING + +from lxml import etree + +if TYPE_CHECKING: + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + from xml_pipeline.crypto.identity import Identity + +# Namespace for signature element +SIGNATURE_NAMESPACE = "https://xml-pipeline.org/ns/sig/v1" +SIGNATURE_TAG = f"{{{SIGNATURE_NAMESPACE}}}sig" + +# Envelope namespace +ENVELOPE_NAMESPACE = "https://xml-pipeline.org/ns/envelope/v1" +META_TAG = f"{{{ENVELOPE_NAMESPACE}}}meta" + + +class SignatureError(Exception): + """Raised when signing or verification fails.""" + + pass + + +def _find_meta(envelope: etree._Element) -> etree._Element | None: + """Find the meta element in an envelope.""" + # Try with namespace first + meta = envelope.find(f"{{{ENVELOPE_NAMESPACE}}}meta") + if meta is not None: + return meta + + # Try without namespace (for flexibility) + meta = envelope.find("meta") + return meta + + +def _remove_signature(envelope: etree._Element) -> etree._Element | None: + """ + Remove signature element from envelope's meta block. + + Returns the removed signature element, or None if not found. + Modifies envelope in place. + """ + meta = _find_meta(envelope) + if meta is None: + return None + + # Look for signature element + sig = meta.find(f"{{{SIGNATURE_NAMESPACE}}}sig") + if sig is not None: + meta.remove(sig) + return sig + + # Try without namespace + sig = meta.find("sig") + if sig is not None: + meta.remove(sig) + return sig + + return None + + +def _canonicalize(tree: etree._Element) -> bytes: + """ + Canonicalize an XML element using Exclusive C14N. + + This produces deterministic output suitable for signing. + """ + return etree.tostring(tree, method="c14n2", exclusive=True) + + +def sign_envelope( + envelope: etree._Element, + identity: Identity, + *, + in_place: bool = False, +) -> etree._Element: + """ + Sign an envelope using the organism's identity. + + The signature is inserted as a element in the meta block, + using namespace https://xml-pipeline.org/ns/sig/v1. + + Args: + envelope: The message envelope to sign + identity: Organism identity with private key + in_place: If True, modify envelope directly; if False, work on a copy + + Returns: + Signed envelope (same object if in_place=True, copy otherwise) + + Raises: + SignatureError: If envelope structure is invalid + """ + if not in_place: + envelope = copy.deepcopy(envelope) + + # Find meta block + meta = _find_meta(envelope) + if meta is None: + raise SignatureError("Envelope missing block") + + # Remove any existing signature (for re-signing) + _remove_signature(envelope) + + # Canonicalize the envelope (without signature) + canonical_bytes = _canonicalize(envelope) + + # Sign the canonical bytes + signature_b64 = identity.sign_base64(canonical_bytes) + + # Create signature element + sig_element = etree.SubElement( + meta, + SIGNATURE_TAG, + nsmap={"sig": SIGNATURE_NAMESPACE}, + ) + sig_element.text = signature_b64 + + return envelope + + +def extract_signature(envelope: etree._Element) -> str | None: + """ + Extract the base64 signature from an envelope without modifying it. + + Args: + envelope: The message envelope + + Returns: + Base64-encoded signature string, or None if not signed + """ + meta = _find_meta(envelope) + if meta is None: + return None + + # Look for signature element + sig = meta.find(f"{{{SIGNATURE_NAMESPACE}}}sig") + if sig is not None and sig.text: + return sig.text.strip() + + # Try without namespace + sig = meta.find("sig") + if sig is not None and sig.text: + return sig.text.strip() + + return None + + +def verify_envelope( + envelope: etree._Element, + public_key: Ed25519PublicKey, +) -> bool: + """ + Verify an envelope's signature using a public key. + + Args: + envelope: The signed message envelope + public_key: Ed25519 public key of the signer + + Returns: + True if signature is valid, False otherwise + + Note: + Returns False for unsigned envelopes (no signature present). + To check if envelope is signed, use extract_signature() first. + """ + # Work on a copy to avoid modifying the original + envelope_copy = copy.deepcopy(envelope) + + # Extract and remove signature + sig_element = _remove_signature(envelope_copy) + if sig_element is None or not sig_element.text: + return False + + try: + signature_b64 = sig_element.text.strip() + signature = base64.b64decode(signature_b64) + except Exception: + return False + + # Canonicalize the envelope (without signature) + canonical_bytes = _canonicalize(envelope_copy) + + # Verify signature + try: + public_key.verify(signature, canonical_bytes) + return True + except Exception: + return False + + +def verify_envelope_with_identity( + envelope: etree._Element, + identity: Identity, +) -> bool: + """ + Verify an envelope using an Identity's public key. + + Convenience wrapper around verify_envelope for local verification. + + Args: + envelope: The signed message envelope + identity: Identity containing public key + + Returns: + True if signature is valid, False otherwise + """ + return verify_envelope(envelope, identity.public_key) + + +def is_signed(envelope: etree._Element) -> bool: + """ + Check if an envelope has a signature. + + Args: + envelope: Message envelope to check + + Returns: + True if envelope contains a signature element + """ + return extract_signature(envelope) is not None + + +def strip_signature(envelope: etree._Element, *, in_place: bool = False) -> etree._Element: + """ + Remove signature from an envelope. + + Useful for processing after verification. + + Args: + envelope: The signed envelope + in_place: If True, modify directly; if False, work on copy + + Returns: + Envelope without signature + """ + if not in_place: + envelope = copy.deepcopy(envelope) + + _remove_signature(envelope) + return envelope diff --git a/xml_pipeline/message_bus/envelope.py b/xml_pipeline/message_bus/envelope.py index e69de29..da62567 100644 --- a/xml_pipeline/message_bus/envelope.py +++ b/xml_pipeline/message_bus/envelope.py @@ -0,0 +1,162 @@ +""" +envelope.py — Envelope construction and manipulation utilities. + +The universal envelope format: + + + sender + receiver + uuid + base64... + + ... + + +This module provides utilities for: +- Building envelopes from payloads +- Extracting metadata from envelopes +- Signing and verification integration +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from lxml import etree + +if TYPE_CHECKING: + from xml_pipeline.crypto.identity import Identity + +# Namespaces +ENVELOPE_NAMESPACE = "https://xml-pipeline.org/ns/envelope/v1" +ENVELOPE_NSMAP = {None: ENVELOPE_NAMESPACE} + + +def build_envelope( + payload: bytes | etree._Element, + from_id: str, + thread_id: str, + to_id: str | None = None, + identity: Identity | None = None, +) -> etree._Element: + """ + Build a message envelope around a payload. + + Args: + payload: XML payload as bytes or element + from_id: Sender identifier (listener name) + thread_id: Thread UUID + to_id: Optional target listener name + identity: Optional identity for signing + + Returns: + Complete message envelope element + """ + # Create message root + message = etree.Element( + f"{{{ENVELOPE_NAMESPACE}}}message", + nsmap=ENVELOPE_NSMAP, + ) + + # Create meta block + meta = etree.SubElement(message, f"{{{ENVELOPE_NAMESPACE}}}meta") + + # Add required fields + from_elem = etree.SubElement(meta, f"{{{ENVELOPE_NAMESPACE}}}from") + from_elem.text = from_id + + if to_id: + to_elem = etree.SubElement(meta, f"{{{ENVELOPE_NAMESPACE}}}to") + to_elem.text = to_id + + thread_elem = etree.SubElement(meta, f"{{{ENVELOPE_NAMESPACE}}}thread") + thread_elem.text = thread_id + + # Add payload + if isinstance(payload, bytes): + # Parse payload bytes + parser = etree.XMLParser(recover=True) + payload_elem = etree.fromstring(payload, parser=parser) + else: + payload_elem = payload + + message.append(payload_elem) + + # Sign if identity provided + if identity is not None: + from xml_pipeline.crypto.signing import sign_envelope + + message = sign_envelope(message, identity, in_place=True) + + return message + + +def envelope_to_bytes(envelope: etree._Element) -> bytes: + """ + Serialize an envelope to bytes. + + Uses UTF-8 encoding with XML declaration. + """ + return etree.tostring(envelope, encoding="utf-8", xml_declaration=True) + + +def extract_meta(envelope: etree._Element) -> dict[str, str | None]: + """ + Extract metadata from an envelope. + + Returns: + Dict with keys: from_id, to_id, thread_id + """ + result: dict[str, str | None] = { + "from_id": None, + "to_id": None, + "thread_id": None, + } + + # Find meta element + meta = envelope.find(f"{{{ENVELOPE_NAMESPACE}}}meta") + if meta is None: + meta = envelope.find("meta") + + if meta is None: + return result + + # Extract fields + from_elem = meta.find(f"{{{ENVELOPE_NAMESPACE}}}from") + if from_elem is None: + from_elem = meta.find("from") + if from_elem is not None and from_elem.text: + result["from_id"] = from_elem.text.strip() + + to_elem = meta.find(f"{{{ENVELOPE_NAMESPACE}}}to") + if to_elem is None: + to_elem = meta.find("to") + if to_elem is not None and to_elem.text: + result["to_id"] = to_elem.text.strip() + + thread_elem = meta.find(f"{{{ENVELOPE_NAMESPACE}}}thread") + if thread_elem is None: + thread_elem = meta.find("thread") + if thread_elem is not None and thread_elem.text: + result["thread_id"] = thread_elem.text.strip() + + return result + + +def extract_payload(envelope: etree._Element) -> etree._Element | None: + """ + Extract the payload element from an envelope. + + The payload is the first non-meta child of the message. + + Returns: + Payload element, or None if not found + """ + for child in envelope: + # Skip meta element + tag = child.tag + if isinstance(tag, str): + if tag.endswith("}meta") or tag == "meta": + continue + return child + return None diff --git a/xml_pipeline/message_bus/stream_pump.py b/xml_pipeline/message_bus/stream_pump.py index 6f5a089..1e7adef 100644 --- a/xml_pipeline/message_bus/stream_pump.py +++ b/xml_pipeline/message_bus/stream_pump.py @@ -26,7 +26,10 @@ import logging from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass, field from pathlib import Path -from typing import AsyncIterable, Callable, List, Dict, Any, Optional +from typing import AsyncIterable, Callable, List, Dict, Any, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from xml_pipeline.crypto.identity import Identity import yaml from lxml import etree @@ -210,6 +213,16 @@ class StreamPump: self.routing_table: Dict[str, List[Listener]] = {} self.listeners: Dict[str, Listener] = {} + # Identity for envelope signing (optional) + self.identity: Optional["Identity"] = None + if config.identity_path: + try: + from xml_pipeline.crypto import Identity + self.identity = Identity.load(config.identity_path) + pump_logger.info(f"Identity loaded: {config.identity_path}") + except Exception as e: + pump_logger.warning(f"Failed to load identity: {e}") + # Generic listeners (accept any payload type) # Used for ephemeral orchestration handlers (sequences, buffers) self._generic_listeners: Dict[str, Listener] = {} @@ -660,7 +673,7 @@ class StreamPump: ) def _wrap_in_envelope(self, payload: Any, from_id: str, to_id: str, thread_id: str) -> bytes: - """Wrap a dataclass payload in a message envelope.""" + """Wrap a dataclass payload in a message envelope, optionally signed.""" # Serialize payload to XML if hasattr(payload, 'to_xml'): # SystemError and similar have manual to_xml() @@ -680,7 +693,7 @@ class StreamPump: idx = payload_str.index('>') payload_str = payload_str[:idx] + ' xmlns=""' + payload_str[idx:] - envelope = f""" + envelope_str = f""" {from_id} {to_id} @@ -688,7 +701,19 @@ class StreamPump: {payload_str} """ - return envelope.encode('utf-8') + + # Sign if identity is configured + if self.identity is not None: + try: + from xml_pipeline.crypto.signing import sign_envelope + envelope_tree = etree.fromstring(envelope_str.encode('utf-8')) + signed_tree = sign_envelope(envelope_tree, self.identity, in_place=True) + return etree.tostring(signed_tree, encoding='utf-8', xml_declaration=True) + except Exception as e: + pump_logger.warning(f"Failed to sign envelope: {e}") + # Fall through to unsigned + + return envelope_str.encode('utf-8') async def _dispatch_to_process_pool( self,