"""Provenance payload tests — timestamp binding and version prefix. These tests verify the canonical provenance signing payload format: - Version prefix "muse-provenance-v2\n" - committed_at timestamp included as the last field - Signature detects timestamp mutation """ from __future__ import annotations import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from muse.core.types import decode_pubkey, decode_sig, encode_sig, sig_algo, long_id, split_sig from muse.core.provenance import ( _PROV_VERSION_PREFIX, provenance_payload, sign_commit_ed25519, sign_commit_record, verify_commit_ed25519, ) def _gen_key() -> Ed25519PrivateKey: return Ed25519PrivateKey.generate() def _pub_bytes(key: Ed25519PrivateKey) -> bytes: return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) # --------------------------------------------------------------------------- # Payload format # --------------------------------------------------------------------------- class TestProvenancePayloadV2: def test_payload_version_prefix(self) -> None: """Payload is prefixed with 'muse-provenance-v2\\n'.""" import hashlib commit_id = "a" * 64 # Reconstruct the raw payload and check the prefix. # provenance_payload returns the SHA-256 hex of the raw bytes, so we # can verify by computing manually. fields = [commit_id, "", "", "", "", "", ""] raw = (_PROV_VERSION_PREFIX + "\x00".join(fields)).encode() expected = hashlib.sha256(raw).hexdigest() actual = provenance_payload(commit_id) assert actual == expected def test_payload_includes_committed_at(self) -> None: """committed_at appears as the last field in the canonical payload.""" import hashlib commit_id = "b" * 64 ts = "2026-04-08T12:00:00" fields = [commit_id, "", "", "", "", "", ts] raw = (_PROV_VERSION_PREFIX + "\x00".join(fields)).encode() expected = hashlib.sha256(raw).hexdigest() actual = provenance_payload(commit_id, committed_at=ts) assert actual == expected def test_different_committed_at_different_payload(self) -> None: """Two different committed_at values → different payload hashes.""" commit_id = "c" * 64 p1 = provenance_payload(commit_id, committed_at="2026-01-01T00:00:00") p2 = provenance_payload(commit_id, committed_at="2026-01-02T00:00:00") assert p1 != p2 def test_empty_committed_at_still_works(self) -> None: """Empty committed_at is valid (backward compat for unsigned commits).""" commit_id = "d" * 64 # Should not raise. payload = provenance_payload(commit_id, committed_at="") assert isinstance(payload, str) assert len(payload) == 64 def test_payload_is_64_hex_chars(self) -> None: """Return value is always a 64-character hex string.""" payload = provenance_payload("e" * 64, committed_at="2026-04-08T00:00:00") assert len(payload) == 64 assert all(c in "0123456789abcdef" for c in payload) class TestTimestampSignatureBinding: def test_signature_detects_timestamp_mutation(self) -> None: """Sign with timestamp T1, verify with T2 → fails.""" key = _gen_key() commit_id = "f" * 64 ts1 = "2026-04-08T12:00:00" ts2 = "2026-04-09T12:00:00" # Sign with ts1. payload_t1 = provenance_payload(commit_id, committed_at=ts1) sig = sign_commit_ed25519(payload_t1, key) # Verify with ts2 → must fail. payload_t2 = provenance_payload(commit_id, committed_at=ts2) pub = _pub_bytes(key) assert not verify_commit_ed25519(payload_t2, sig, pub) def test_signature_validates_with_same_timestamp(self) -> None: """Sign and verify with the same timestamp → succeeds.""" key = _gen_key() commit_id = "g" * 64 ts = "2026-04-08T12:00:00" payload = provenance_payload(commit_id, committed_at=ts) sig = sign_commit_ed25519(payload, key) assert verify_commit_ed25519(payload, sig, _pub_bytes(key)) def test_round_trip_sign_verify_all_fields(self) -> None: """sign_commit_record + verify_commit_ed25519 with all fields → passes.""" key = _gen_key() commit_id = "h" * 64 agent_id = "test-agent" model_id = "claude-sonnet-4-6" toolchain_id = "agentception/v1" prompt_hash = "ab" * 32 ts = "2026-04-08T15:30:00+00:00" result = sign_commit_record( commit_id, agent_id, key, model_id=model_id, toolchain_id=toolchain_id, prompt_hash=prompt_hash, committed_at=ts, ) assert result is not None sig, pub_b64, _ = result _, pub_bytes = decode_pubkey(pub_b64) payload = provenance_payload( commit_id, agent_id=agent_id, model_id=model_id, toolchain_id=toolchain_id, prompt_hash=prompt_hash, committed_at=ts, ) assert verify_commit_ed25519(payload, sig, pub_bytes) def test_sign_commit_record_without_committed_at_still_verifiable(self) -> None: """sign_commit_record with empty committed_at → verifiable with empty committed_at.""" key = _gen_key() commit_id = "i" * 64 result = sign_commit_record(commit_id, "agent", key) assert result is not None sig, pub_b64, _ = result _, pub_bytes = decode_pubkey(pub_b64) # Verify with the same default empty committed_at. payload = provenance_payload(commit_id, agent_id="agent") assert verify_commit_ed25519(payload, sig, pub_bytes) # --------------------------------------------------------------------------- # TDD: algorithm-prefixed signature values # --------------------------------------------------------------------------- class TestAlgorithmPrefix: """sign_commit_ed25519 must embed the algorithm in the stored value. Rationale: same self-describing philosophy as sha256:-prefixed object IDs. When the signature value carries its algorithm, verifiers dispatch on the prefix instead of a separate integer field (signature_format: int is gone). """ def test_sign_returns_ed25519_prefix(self) -> None: """sign_commit_ed25519 returns 'ed25519:', not bare base64url.""" key = _gen_key() payload = provenance_payload("a" * 64) sig = sign_commit_ed25519(payload, key) assert sig.startswith("ed25519:"), f"expected 'ed25519:' prefix, got: {sig[:20]!r}" def test_sign_suffix_is_valid_base64url(self) -> None: """The part after 'ed25519:' is valid base64url-encoded bytes.""" key = _gen_key() payload = provenance_payload("b" * 64) sig = sign_commit_ed25519(payload, key) _, raw = decode_sig(sig) assert len(raw) == 64 # Ed25519 signature is always 64 bytes def test_verify_accepts_prefixed_signature(self) -> None: """verify_commit_ed25519 succeeds on a correctly prefixed signature.""" key = _gen_key() payload = provenance_payload("c" * 64) sig = sign_commit_ed25519(payload, key) assert sig.startswith("ed25519:") assert verify_commit_ed25519(payload, sig, _pub_bytes(key)) def test_verify_rejects_bare_base64url(self) -> None: """Bare base64url (old format, no prefix) must be rejected — no backward compat.""" key = _gen_key() payload = provenance_payload("d" * 64) # Produce a real sig, then strip the prefix — gives bare base64url. sig = sign_commit_ed25519(payload, key) _, bare_sig = split_sig(sig) assert not bare_sig.startswith("ed25519:") assert not verify_commit_ed25519(payload, bare_sig, _pub_bytes(key)) def test_verify_rejects_unknown_prefix(self) -> None: """An unknown algorithm prefix (e.g. 'mldsa65:') must return False.""" key = _gen_key() payload = provenance_payload("e" * 64) sig = sign_commit_ed25519(payload, key) # Swap the prefix to a future/unknown algorithm. _, raw = decode_sig(sig) unknown_sig = encode_sig("mldsa65", raw) assert not verify_commit_ed25519(payload, unknown_sig, _pub_bytes(key)) def test_encode_public_key_returns_ed25519_prefix(self) -> None: """encode_public_key returns 'ed25519:' for the b64 component.""" from muse.core.provenance import encode_public_key key = _gen_key() raw_bytes, b64 = encode_public_key(key) assert b64.startswith("ed25519:"), f"expected 'ed25519:' prefix, got: {b64[:20]!r}" # The raw part decodes back to raw_bytes. _, decoded = decode_pubkey(b64) assert decoded == raw_bytes def test_sign_commit_record_pubkey_prefixed(self) -> None: """sign_commit_record returns an 'ed25519:'-prefixed public key.""" key = _gen_key() result = sign_commit_record("f" * 64, "test-agent", key) assert result is not None _sig, pub_b64, _fprint = result assert pub_b64.startswith("ed25519:") def test_commit_record_has_no_signature_format_field(self) -> None: """CommitRecord must NOT have a signature_format attribute — field is deleted.""" from muse.core.commits import CommitRecord import datetime r = CommitRecord( commit_id=long_id("a" * 64), branch="dev", snapshot_id=long_id("b" * 64), committed_at=datetime.datetime.now(datetime.timezone.utc), message="test", ) assert not hasattr(r, "signature_format"), ( "signature_format field must be removed from CommitRecord; " "algorithm is now encoded in the signature value prefix" )