test_core_provenance.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
7 days ago
| 1 | """Provenance payload tests — timestamp binding and version prefix. |
| 2 | |
| 3 | These tests verify the canonical provenance signing payload format: |
| 4 | - Version prefix "muse-provenance-v2\n" |
| 5 | - committed_at timestamp included as the last field |
| 6 | - Signature detects timestamp mutation |
| 7 | """ |
| 8 | |
| 9 | from __future__ import annotations |
| 10 | |
| 11 | import pytest |
| 12 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 13 | from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
| 14 | |
| 15 | from muse.core.types import decode_pubkey, decode_sig, encode_sig, sig_algo, long_id, split_sig |
| 16 | from muse.core.provenance import ( |
| 17 | _PROV_VERSION_PREFIX, |
| 18 | provenance_payload, |
| 19 | sign_commit_ed25519, |
| 20 | sign_commit_record, |
| 21 | verify_commit_ed25519, |
| 22 | ) |
| 23 | |
| 24 | |
| 25 | def _gen_key() -> Ed25519PrivateKey: |
| 26 | return Ed25519PrivateKey.generate() |
| 27 | |
| 28 | |
| 29 | def _pub_bytes(key: Ed25519PrivateKey) -> bytes: |
| 30 | return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 31 | |
| 32 | |
| 33 | # --------------------------------------------------------------------------- |
| 34 | # Payload format |
| 35 | # --------------------------------------------------------------------------- |
| 36 | |
| 37 | |
| 38 | class TestProvenancePayloadV2: |
| 39 | def test_payload_version_prefix(self) -> None: |
| 40 | """Payload is prefixed with 'muse-provenance-v2\\n'.""" |
| 41 | import hashlib |
| 42 | commit_id = "a" * 64 |
| 43 | # Reconstruct the raw payload and check the prefix. |
| 44 | # provenance_payload returns the SHA-256 hex of the raw bytes, so we |
| 45 | # can verify by computing manually. |
| 46 | fields = [commit_id, "", "", "", "", "", ""] |
| 47 | raw = (_PROV_VERSION_PREFIX + "\x00".join(fields)).encode() |
| 48 | expected = hashlib.sha256(raw).hexdigest() |
| 49 | actual = provenance_payload(commit_id) |
| 50 | assert actual == expected |
| 51 | |
| 52 | def test_payload_includes_committed_at(self) -> None: |
| 53 | """committed_at appears as the last field in the canonical payload.""" |
| 54 | import hashlib |
| 55 | commit_id = "b" * 64 |
| 56 | ts = "2026-04-08T12:00:00" |
| 57 | fields = [commit_id, "", "", "", "", "", ts] |
| 58 | raw = (_PROV_VERSION_PREFIX + "\x00".join(fields)).encode() |
| 59 | expected = hashlib.sha256(raw).hexdigest() |
| 60 | actual = provenance_payload(commit_id, committed_at=ts) |
| 61 | assert actual == expected |
| 62 | |
| 63 | def test_different_committed_at_different_payload(self) -> None: |
| 64 | """Two different committed_at values → different payload hashes.""" |
| 65 | commit_id = "c" * 64 |
| 66 | p1 = provenance_payload(commit_id, committed_at="2026-01-01T00:00:00") |
| 67 | p2 = provenance_payload(commit_id, committed_at="2026-01-02T00:00:00") |
| 68 | assert p1 != p2 |
| 69 | |
| 70 | def test_empty_committed_at_still_works(self) -> None: |
| 71 | """Empty committed_at is valid (backward compat for unsigned commits).""" |
| 72 | commit_id = "d" * 64 |
| 73 | # Should not raise. |
| 74 | payload = provenance_payload(commit_id, committed_at="") |
| 75 | assert isinstance(payload, str) |
| 76 | assert len(payload) == 64 |
| 77 | |
| 78 | def test_payload_is_64_hex_chars(self) -> None: |
| 79 | """Return value is always a 64-character hex string.""" |
| 80 | payload = provenance_payload("e" * 64, committed_at="2026-04-08T00:00:00") |
| 81 | assert len(payload) == 64 |
| 82 | assert all(c in "0123456789abcdef" for c in payload) |
| 83 | |
| 84 | |
| 85 | class TestTimestampSignatureBinding: |
| 86 | def test_signature_detects_timestamp_mutation(self) -> None: |
| 87 | """Sign with timestamp T1, verify with T2 → fails.""" |
| 88 | key = _gen_key() |
| 89 | commit_id = "f" * 64 |
| 90 | ts1 = "2026-04-08T12:00:00" |
| 91 | ts2 = "2026-04-09T12:00:00" |
| 92 | |
| 93 | # Sign with ts1. |
| 94 | payload_t1 = provenance_payload(commit_id, committed_at=ts1) |
| 95 | sig = sign_commit_ed25519(payload_t1, key) |
| 96 | |
| 97 | # Verify with ts2 → must fail. |
| 98 | payload_t2 = provenance_payload(commit_id, committed_at=ts2) |
| 99 | pub = _pub_bytes(key) |
| 100 | assert not verify_commit_ed25519(payload_t2, sig, pub) |
| 101 | |
| 102 | def test_signature_validates_with_same_timestamp(self) -> None: |
| 103 | """Sign and verify with the same timestamp → succeeds.""" |
| 104 | key = _gen_key() |
| 105 | commit_id = "g" * 64 |
| 106 | ts = "2026-04-08T12:00:00" |
| 107 | payload = provenance_payload(commit_id, committed_at=ts) |
| 108 | sig = sign_commit_ed25519(payload, key) |
| 109 | assert verify_commit_ed25519(payload, sig, _pub_bytes(key)) |
| 110 | |
| 111 | def test_round_trip_sign_verify_all_fields(self) -> None: |
| 112 | """sign_commit_record + verify_commit_ed25519 with all fields → passes.""" |
| 113 | key = _gen_key() |
| 114 | commit_id = "h" * 64 |
| 115 | agent_id = "test-agent" |
| 116 | model_id = "claude-sonnet-4-6" |
| 117 | toolchain_id = "agentception/v1" |
| 118 | prompt_hash = "ab" * 32 |
| 119 | ts = "2026-04-08T15:30:00+00:00" |
| 120 | |
| 121 | result = sign_commit_record( |
| 122 | commit_id, |
| 123 | agent_id, |
| 124 | key, |
| 125 | model_id=model_id, |
| 126 | toolchain_id=toolchain_id, |
| 127 | prompt_hash=prompt_hash, |
| 128 | committed_at=ts, |
| 129 | ) |
| 130 | assert result is not None |
| 131 | sig, pub_b64, _ = result |
| 132 | |
| 133 | _, pub_bytes = decode_pubkey(pub_b64) |
| 134 | payload = provenance_payload( |
| 135 | commit_id, |
| 136 | agent_id=agent_id, |
| 137 | model_id=model_id, |
| 138 | toolchain_id=toolchain_id, |
| 139 | prompt_hash=prompt_hash, |
| 140 | committed_at=ts, |
| 141 | ) |
| 142 | assert verify_commit_ed25519(payload, sig, pub_bytes) |
| 143 | |
| 144 | def test_sign_commit_record_without_committed_at_still_verifiable(self) -> None: |
| 145 | """sign_commit_record with empty committed_at → verifiable with empty committed_at.""" |
| 146 | key = _gen_key() |
| 147 | commit_id = "i" * 64 |
| 148 | result = sign_commit_record(commit_id, "agent", key) |
| 149 | assert result is not None |
| 150 | sig, pub_b64, _ = result |
| 151 | _, pub_bytes = decode_pubkey(pub_b64) |
| 152 | # Verify with the same default empty committed_at. |
| 153 | payload = provenance_payload(commit_id, agent_id="agent") |
| 154 | assert verify_commit_ed25519(payload, sig, pub_bytes) |
| 155 | |
| 156 | |
| 157 | # --------------------------------------------------------------------------- |
| 158 | # TDD: algorithm-prefixed signature values |
| 159 | # --------------------------------------------------------------------------- |
| 160 | |
| 161 | |
| 162 | class TestAlgorithmPrefix: |
| 163 | """sign_commit_ed25519 must embed the algorithm in the stored value. |
| 164 | |
| 165 | Rationale: same self-describing philosophy as sha256:-prefixed object IDs. |
| 166 | When the signature value carries its algorithm, verifiers dispatch on the |
| 167 | prefix instead of a separate integer field (signature_format: int is gone). |
| 168 | """ |
| 169 | |
| 170 | def test_sign_returns_ed25519_prefix(self) -> None: |
| 171 | """sign_commit_ed25519 returns 'ed25519:<base64url>', not bare base64url.""" |
| 172 | key = _gen_key() |
| 173 | payload = provenance_payload("a" * 64) |
| 174 | sig = sign_commit_ed25519(payload, key) |
| 175 | assert sig.startswith("ed25519:"), f"expected 'ed25519:' prefix, got: {sig[:20]!r}" |
| 176 | |
| 177 | def test_sign_suffix_is_valid_base64url(self) -> None: |
| 178 | """The part after 'ed25519:' is valid base64url-encoded bytes.""" |
| 179 | key = _gen_key() |
| 180 | payload = provenance_payload("b" * 64) |
| 181 | sig = sign_commit_ed25519(payload, key) |
| 182 | _, raw = decode_sig(sig) |
| 183 | assert len(raw) == 64 # Ed25519 signature is always 64 bytes |
| 184 | |
| 185 | def test_verify_accepts_prefixed_signature(self) -> None: |
| 186 | """verify_commit_ed25519 succeeds on a correctly prefixed signature.""" |
| 187 | key = _gen_key() |
| 188 | payload = provenance_payload("c" * 64) |
| 189 | sig = sign_commit_ed25519(payload, key) |
| 190 | assert sig.startswith("ed25519:") |
| 191 | assert verify_commit_ed25519(payload, sig, _pub_bytes(key)) |
| 192 | |
| 193 | def test_verify_rejects_bare_base64url(self) -> None: |
| 194 | """Bare base64url (old format, no prefix) must be rejected — no backward compat.""" |
| 195 | key = _gen_key() |
| 196 | payload = provenance_payload("d" * 64) |
| 197 | # Produce a real sig, then strip the prefix — gives bare base64url. |
| 198 | sig = sign_commit_ed25519(payload, key) |
| 199 | _, bare_sig = split_sig(sig) |
| 200 | assert not bare_sig.startswith("ed25519:") |
| 201 | assert not verify_commit_ed25519(payload, bare_sig, _pub_bytes(key)) |
| 202 | |
| 203 | def test_verify_rejects_unknown_prefix(self) -> None: |
| 204 | """An unknown algorithm prefix (e.g. 'mldsa65:') must return False.""" |
| 205 | key = _gen_key() |
| 206 | payload = provenance_payload("e" * 64) |
| 207 | sig = sign_commit_ed25519(payload, key) |
| 208 | # Swap the prefix to a future/unknown algorithm. |
| 209 | _, raw = decode_sig(sig) |
| 210 | unknown_sig = encode_sig("mldsa65", raw) |
| 211 | assert not verify_commit_ed25519(payload, unknown_sig, _pub_bytes(key)) |
| 212 | |
| 213 | def test_encode_public_key_returns_ed25519_prefix(self) -> None: |
| 214 | """encode_public_key returns 'ed25519:<base64url>' for the b64 component.""" |
| 215 | from muse.core.provenance import encode_public_key |
| 216 | key = _gen_key() |
| 217 | raw_bytes, b64 = encode_public_key(key) |
| 218 | assert b64.startswith("ed25519:"), f"expected 'ed25519:' prefix, got: {b64[:20]!r}" |
| 219 | # The raw part decodes back to raw_bytes. |
| 220 | _, decoded = decode_pubkey(b64) |
| 221 | assert decoded == raw_bytes |
| 222 | |
| 223 | def test_sign_commit_record_pubkey_prefixed(self) -> None: |
| 224 | """sign_commit_record returns an 'ed25519:'-prefixed public key.""" |
| 225 | key = _gen_key() |
| 226 | result = sign_commit_record("f" * 64, "test-agent", key) |
| 227 | assert result is not None |
| 228 | _sig, pub_b64, _fprint = result |
| 229 | assert pub_b64.startswith("ed25519:") |
| 230 | |
| 231 | def test_commit_record_has_no_signature_format_field(self) -> None: |
| 232 | """CommitRecord must NOT have a signature_format attribute — field is deleted.""" |
| 233 | from muse.core.commits import CommitRecord |
| 234 | import datetime |
| 235 | r = CommitRecord( |
| 236 | commit_id=long_id("a" * 64), |
| 237 | branch="dev", |
| 238 | snapshot_id=long_id("b" * 64), |
| 239 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 240 | message="test", |
| 241 | ) |
| 242 | assert not hasattr(r, "signature_format"), ( |
| 243 | "signature_format field must be removed from CommitRecord; " |
| 244 | "algorithm is now encoded in the signature value prefix" |
| 245 | ) |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
7 days ago