test_provenance.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
| 1 | """Tests for muse.core.provenance — AgentIdentity, Ed25519 signing.""" |
| 2 | |
| 3 | import datetime |
| 4 | |
| 5 | import pytest |
| 6 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 7 | from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
| 8 | |
| 9 | from muse.core.types import decode_pubkey, decode_sig, encode_sig, split_id |
| 10 | from muse.core.provenance import ( |
| 11 | AgentIdentity, |
| 12 | encode_public_key, |
| 13 | make_agent_identity, |
| 14 | provenance_payload, |
| 15 | sign_commit_ed25519, |
| 16 | sign_commit_record, |
| 17 | verify_commit_ed25519, |
| 18 | ) |
| 19 | from muse.core.commits import CommitRecord |
| 20 | |
| 21 | |
| 22 | def _gen_key() -> Ed25519PrivateKey: |
| 23 | return Ed25519PrivateKey.generate() |
| 24 | |
| 25 | |
| 26 | def _pub_bytes(key: Ed25519PrivateKey) -> bytes: |
| 27 | return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 28 | |
| 29 | |
| 30 | # --------------------------------------------------------------------------- |
| 31 | # AgentIdentity factory |
| 32 | # --------------------------------------------------------------------------- |
| 33 | |
| 34 | |
| 35 | class TestMakeAgentIdentity: |
| 36 | def test_required_fields_present(self) -> None: |
| 37 | identity = make_agent_identity( |
| 38 | agent_id="test-agent", |
| 39 | model_id="gpt-5", |
| 40 | toolchain_id="muse-v2", |
| 41 | ) |
| 42 | assert identity["agent_id"] == "test-agent" |
| 43 | assert identity.get("model_id") == "gpt-5" |
| 44 | assert identity.get("toolchain_id") == "muse-v2" |
| 45 | |
| 46 | def test_prompt_hash_is_hex(self) -> None: |
| 47 | identity = make_agent_identity( |
| 48 | agent_id="a", |
| 49 | model_id="m", |
| 50 | toolchain_id="t", |
| 51 | prompt="system: you are a music agent", |
| 52 | ) |
| 53 | prompt_hash = identity.get("prompt_hash", "") |
| 54 | assert isinstance(prompt_hash, str) |
| 55 | assert prompt_hash.startswith("sha256:") and len(prompt_hash) == 71 |
| 56 | assert all(c in "0123456789abcdef" for c in split_id(prompt_hash)[1]) |
| 57 | |
| 58 | def test_no_prompt_gives_no_hash_key(self) -> None: |
| 59 | identity = make_agent_identity(agent_id="a", model_id="m", toolchain_id="t") |
| 60 | assert identity.get("prompt_hash", "") == "" |
| 61 | |
| 62 | def test_execution_context_hash_populated(self) -> None: |
| 63 | identity = make_agent_identity( |
| 64 | agent_id="a", |
| 65 | model_id="m", |
| 66 | toolchain_id="t", |
| 67 | execution_context='{"env": "ci", "version": "1.2.3"}', |
| 68 | ) |
| 69 | ec_hash = identity.get("execution_context_hash", "") |
| 70 | assert isinstance(ec_hash, str) |
| 71 | assert ec_hash.startswith("sha256:") and len(ec_hash) == 71 |
| 72 | |
| 73 | |
| 74 | # --------------------------------------------------------------------------- |
| 75 | # Ed25519 signing / verification |
| 76 | # --------------------------------------------------------------------------- |
| 77 | |
| 78 | |
| 79 | class TestEd25519Signing: |
| 80 | def test_sign_and_verify_succeed(self) -> None: |
| 81 | key = _gen_key() |
| 82 | payload = provenance_payload("abc123def456" * 4) |
| 83 | sig = sign_commit_ed25519(payload, key) |
| 84 | pub_bytes = _pub_bytes(key) |
| 85 | assert verify_commit_ed25519(payload, sig, pub_bytes) |
| 86 | |
| 87 | def test_wrong_key_fails(self) -> None: |
| 88 | key1 = _gen_key() |
| 89 | key2 = _gen_key() |
| 90 | payload = provenance_payload("abc123") |
| 91 | sig = sign_commit_ed25519(payload, key1) |
| 92 | assert not verify_commit_ed25519(payload, sig, _pub_bytes(key2)) |
| 93 | |
| 94 | def test_wrong_payload_fails(self) -> None: |
| 95 | key = _gen_key() |
| 96 | sig = sign_commit_ed25519(provenance_payload("commit-a"), key) |
| 97 | assert not verify_commit_ed25519(provenance_payload("commit-b"), sig, _pub_bytes(key)) |
| 98 | |
| 99 | def test_tampered_signature_fails(self) -> None: |
| 100 | key = _gen_key() |
| 101 | payload = provenance_payload("abc") |
| 102 | sig = sign_commit_ed25519(payload, key) |
| 103 | # Flip a byte in the middle of the raw signature. |
| 104 | _, raw = decode_sig(sig) |
| 105 | sig_bytes = bytearray(raw) |
| 106 | sig_bytes[32] ^= 0xFF |
| 107 | tampered = encode_sig("ed25519", bytes(sig_bytes)) |
| 108 | assert not verify_commit_ed25519(payload, tampered, _pub_bytes(key)) |
| 109 | |
| 110 | def test_signature_is_prefixed_base64url_string(self) -> None: |
| 111 | key = _gen_key() |
| 112 | sig = sign_commit_ed25519(provenance_payload("test-commit"), key) |
| 113 | assert isinstance(sig, str) |
| 114 | assert sig.startswith("ed25519:") |
| 115 | # Ed25519 signature is 64 bytes → 86 base64url chars + 8-char prefix |
| 116 | assert len(sig) == len("ed25519:") + 86 |
| 117 | |
| 118 | def test_empty_signature_fails(self) -> None: |
| 119 | key = _gen_key() |
| 120 | assert not verify_commit_ed25519(provenance_payload("x"), "", _pub_bytes(key)) |
| 121 | |
| 122 | def test_garbage_signature_fails(self) -> None: |
| 123 | key = _gen_key() |
| 124 | assert not verify_commit_ed25519(provenance_payload("x"), "!!not-base64!!", _pub_bytes(key)) |
| 125 | |
| 126 | def test_truncated_signature_fails(self) -> None: |
| 127 | key = _gen_key() |
| 128 | payload = provenance_payload("commit-x") |
| 129 | sig = sign_commit_ed25519(payload, key) |
| 130 | assert not verify_commit_ed25519(payload, sig[:40], _pub_bytes(key)) |
| 131 | |
| 132 | def test_different_keys_produce_different_sigs(self) -> None: |
| 133 | key1 = _gen_key() |
| 134 | key2 = _gen_key() |
| 135 | payload = provenance_payload("same-commit") |
| 136 | assert sign_commit_ed25519(payload, key1) != sign_commit_ed25519(payload, key2) |
| 137 | |
| 138 | |
| 139 | # --------------------------------------------------------------------------- |
| 140 | # Public key helpers |
| 141 | # --------------------------------------------------------------------------- |
| 142 | |
| 143 | |
| 144 | class TestPublicKeyHelpers: |
| 145 | def test_fingerprint_is_prefixed_and_71_chars(self) -> None: |
| 146 | from muse.core.types import public_key_fingerprint |
| 147 | key = _gen_key() |
| 148 | fp = public_key_fingerprint(_pub_bytes(key)) |
| 149 | assert fp.startswith("sha256:") |
| 150 | assert len(fp) == 71 |
| 151 | |
| 152 | def test_fingerprint_is_deterministic(self) -> None: |
| 153 | from muse.core.types import public_key_fingerprint |
| 154 | key = _gen_key() |
| 155 | pub = _pub_bytes(key) |
| 156 | assert public_key_fingerprint(pub) == public_key_fingerprint(pub) |
| 157 | |
| 158 | def test_different_keys_different_fingerprints(self) -> None: |
| 159 | from muse.core.types import public_key_fingerprint |
| 160 | k1, k2 = _gen_key(), _gen_key() |
| 161 | assert public_key_fingerprint(_pub_bytes(k1)) != public_key_fingerprint(_pub_bytes(k2)) |
| 162 | |
| 163 | def test_encode_public_key_returns_32_bytes_and_prefixed_b64(self) -> None: |
| 164 | key = _gen_key() |
| 165 | raw_bytes, b64 = encode_public_key(key) |
| 166 | assert len(raw_bytes) == 32 |
| 167 | assert isinstance(b64, str) |
| 168 | assert b64.startswith("ed25519:") |
| 169 | # No padding in the base64 part |
| 170 | assert "=" not in b64 |
| 171 | # Stripping prefix + decoding returns the raw bytes |
| 172 | _, decoded = decode_pubkey(b64) |
| 173 | assert decoded == raw_bytes |
| 174 | |
| 175 | |
| 176 | # --------------------------------------------------------------------------- |
| 177 | # sign_commit_record |
| 178 | # --------------------------------------------------------------------------- |
| 179 | |
| 180 | |
| 181 | class TestSignCommitRecord: |
| 182 | def test_sign_commit_record_returns_three_tuple(self) -> None: |
| 183 | key = _gen_key() |
| 184 | commit_id = "deadbeef" * 8 |
| 185 | result = sign_commit_record(commit_id, "test-agent", key) |
| 186 | assert result is not None |
| 187 | sig, pub_b64, fprint = result |
| 188 | assert sig != "" |
| 189 | assert pub_b64 != "" |
| 190 | assert fprint.startswith("sha256:") and len(fprint) == 71 |
| 191 | |
| 192 | def test_sign_commit_record_verifiable(self) -> None: |
| 193 | key = _gen_key() |
| 194 | commit_id = "cafebabe" * 8 |
| 195 | agent_id = "verify-agent" |
| 196 | result = sign_commit_record(commit_id, agent_id, key, model_id="claude-sonnet-4-6") |
| 197 | assert result is not None |
| 198 | sig, pub_b64, _ = result |
| 199 | _, pub_bytes = decode_pubkey(pub_b64) |
| 200 | payload = provenance_payload(commit_id, agent_id=agent_id, model_id="claude-sonnet-4-6") |
| 201 | assert verify_commit_ed25519(payload, sig, pub_bytes) |
| 202 | |
| 203 | def test_sign_commit_record_public_key_matches_private(self) -> None: |
| 204 | key = _gen_key() |
| 205 | result = sign_commit_record("aabbccdd" * 8, "agent", key) |
| 206 | assert result is not None |
| 207 | _, pub_b64, _ = result |
| 208 | raw, expected_b64 = encode_public_key(key) |
| 209 | assert pub_b64 == expected_b64 |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago