gabriel / muse public
test_security_agent_impersonation.py python
984 lines 39.2 KB
Raw
1 """Agent impersonation security tests — Ed25519 provenance signing.
2
3 Attack surface
4 --------------
5 Commit records in Muse carry identity fields that are NEVER validated against
6 the authenticated user: ``author``, ``agent_id``, ``model_id``, etc.
7 Any caller with access to the CLI can write commits claiming to be authored by
8 anyone — human, agent, or a previously-trusted identity.
9
10 Signing model
11 -------------
12 Commits signed with ``--sign`` use Ed25519 (same keypair as MSign request
13 authentication). The ``signer_public_key`` field embeds the signer's raw
14 public key bytes (base64url, 43 chars) so verification is fully offline.
15
16 The signed input is ``provenance_payload(commit_id, author, agent_id, ...)``
17 — a SHA-256 digest that binds content identity to authorship claims. Any
18 mutation of author, agent_id, model_id, toolchain_id, or prompt_hash after
19 signing is detected by ``run_verify``.
20 """
21
22 from __future__ import annotations
23
24 import datetime
25 import json
26 import pathlib
27
28 import pytest
29 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
30 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
31
32 from muse.core.object_store import object_path as _obj_path
33 from muse.core.provenance import (
34 encode_public_key,
35 provenance_payload,
36 sign_commit_ed25519,
37 sign_commit_record,
38 verify_commit_ed25519,
39 )
40 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as _hash_snapshot
41 from muse.core.validation import sanitize_provenance
42 from muse.core.verify import VerifyResult, run_verify
43 from muse.core.types import Manifest, MsgpackDict, b64url_encode, blob_id, decode_pubkey, encode_sig, fake_id, public_key_fingerprint, split_id
44 from muse.core.paths import ref_path, muse_dir
45
46
47 def _signer_key_id(pub_bytes: bytes) -> str:
48 return public_key_fingerprint(pub_bytes)
49
50
51 # ---------------------------------------------------------------------------
52 # Helpers
53 # ---------------------------------------------------------------------------
54
55 def _gen_key() -> Ed25519PrivateKey:
56 return Ed25519PrivateKey.generate()
57
58
59 def _pub_bytes(key: Ed25519PrivateKey) -> bytes:
60 return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
61
62
63 def _pub_b64(key: Ed25519PrivateKey) -> str:
64 _, b64 = encode_public_key(key)
65 return b64
66
67
68 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
69 """Create a minimal .muse/ repository skeleton."""
70 dot_muse = muse_dir(tmp_path)
71 for d in ("objects", "commits", "snapshots", "refs/heads"):
72 (dot_muse / d).mkdir(parents=True, exist_ok=True)
73 (dot_muse / "repo.json").write_text('{"repo_id": "test-repo"}', encoding="utf-8")
74 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8")
75 return tmp_path
76
77
78 _COMMITTED_AT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
79
80
81 def _make_real_commit_id(
82 snapshot_id: str | None = None,
83 parent: str | None = None,
84 message: str = "test",
85 author: str = "",
86 signer_public_key: str = "",
87 ) -> str:
88 """Return the canonical commit_id matching the fixed 2026-01-01 timestamp."""
89 parents = [parent] if parent else []
90 return compute_commit_id(
91 parent_ids=parents,
92 snapshot_id=snapshot_id or _EMPTY_SNAP_ID,
93 message=message,
94 committed_at_iso=_COMMITTED_AT.isoformat(),
95 author=author,
96 signer_public_key=signer_public_key,
97 )
98
99
100 def _v7_sig(
101 commit_id: str,
102 key: Ed25519PrivateKey,
103 *,
104 author: str = "",
105 agent_id: str = "",
106 model_id: str = "",
107 toolchain_id: str = "",
108 prompt_hash: str = "",
109 committed_at: str = _COMMITTED_AT.isoformat(),
110 ) -> str:
111 """Compute a format_version 7 Ed25519 signature (over provenance_payload)."""
112 payload = provenance_payload(
113 commit_id,
114 author=author,
115 agent_id=agent_id,
116 model_id=model_id,
117 toolchain_id=toolchain_id,
118 prompt_hash=prompt_hash,
119 committed_at=committed_at,
120 )
121 return sign_commit_ed25519(payload, key)
122
123
124 def _write_commit(
125 root: pathlib.Path,
126 commit_id: str,
127 *,
128 snapshot_id: str | None = None,
129 parent: str | None = None,
130 message: str = "test",
131 author: str = "",
132 agent_id: str = "",
133 model_id: str = "",
134 toolchain_id: str = "",
135 prompt_hash: str = "",
136 signature: str = "",
137 signer_public_key: str = "",
138 signer_key_id: str = "",
139 ) -> None:
140 """Write a content-hash-valid commit record to the unified object store.
141
142 The ``commit_id`` MUST equal ``_make_real_commit_id(snapshot_id, parent, message)``
143 for the record to pass the store's content-hash verification.
144 """
145 import json as _json
146
147 snap_id = snapshot_id or _EMPTY_SNAP_ID
148 record = {
149 "commit_id": commit_id,
150 "repo_id": "test-repo",
151 "branch": "main",
152 "snapshot_id": snap_id,
153 "message": message,
154 "committed_at": _COMMITTED_AT.isoformat(),
155 "parent_commit_id": parent,
156 "parent2_commit_id": None,
157 "author": author,
158 "metadata": {},
159 "structured_delta": None,
160 "sem_ver_bump": "none",
161 "breaking_changes": [],
162 "agent_id": agent_id,
163 "model_id": model_id,
164 "toolchain_id": toolchain_id,
165 "prompt_hash": prompt_hash,
166 "signature": signature,
167 "signer_public_key": signer_public_key,
168 "signer_key_id": signer_key_id,
169 "reviewed_by": [],
170 "test_runs": 0,
171 }
172 payload = _json.dumps(record, separators=(",", ":")).encode()
173 path = _obj_path(root, commit_id)
174 path.parent.mkdir(parents=True, exist_ok=True)
175 path.write_bytes(f"commit {len(payload)}\0".encode() + payload)
176
177
178 _EMPTY_SNAP_ID = _hash_snapshot({})
179
180
181 def _write_snapshot(root: pathlib.Path, snapshot_id: str) -> None:
182 """Write a minimal snapshot record to the unified object store."""
183 import json as _json
184
185 record = {
186 "snapshot_id": snapshot_id,
187 "manifest": {},
188 "directories": {},
189 "created_at": "2026-01-01T00:00:00+00:00",
190 }
191 payload = _json.dumps(record, separators=(",", ":")).encode()
192 path = _obj_path(root, snapshot_id)
193 path.parent.mkdir(parents=True, exist_ok=True)
194 path.write_bytes(f"snapshot {len(payload)}\0".encode() + payload)
195
196
197 def _set_branch_ref(root: pathlib.Path, branch: str, commit_id: str) -> None:
198 branch_ref = ref_path(root, branch)
199 branch_ref.parent.mkdir(parents=True, exist_ok=True)
200 branch_ref.write_text(commit_id, encoding="utf-8")
201
202
203 def _fake_commit_id(seed: str = "a") -> str:
204 return fake_id(seed)
205
206
207 # ===========================================================================
208 # Author field sanitization
209 # ===========================================================================
210
211
212 class TestAuthorSanitization:
213 """sanitize_provenance must strip control chars from the author field."""
214
215 def test_clean_author_unchanged(self) -> None:
216 assert sanitize_provenance("gabriel") == "gabriel"
217
218 def test_esc_in_author_stripped(self) -> None:
219 raw = "\x1b[31mfake-human\x1b[0m"
220 clean = sanitize_provenance(raw)
221 assert "\x1b" not in clean
222 assert "fake-human" in clean
223
224 def test_newline_in_author_stripped(self) -> None:
225 raw = "gabriel\[email protected]"
226 clean = sanitize_provenance(raw)
227 assert "\n" not in clean
228
229 def test_cr_in_author_stripped(self) -> None:
230 raw = "gabriel\r\nlinus"
231 clean = sanitize_provenance(raw)
232 assert "\r" not in clean
233
234 def test_bel_in_author_stripped(self) -> None:
235 raw = "gabriel\x07linus"
236 clean = sanitize_provenance(raw)
237 assert "\x07" not in clean
238
239 @pytest.mark.parametrize("char_val", range(0x00, 0x20))
240 def test_c0_control_chars_stripped(self, char_val: int) -> None:
241 char = chr(char_val)
242 raw = f"prefix{char}suffix"
243 result = sanitize_provenance(raw)
244 assert char not in result
245
246 def test_del_stripped(self) -> None:
247 assert "\x7f" not in sanitize_provenance("a\x7fb")
248
249 def test_author_cap_at_256_chars(self) -> None:
250 long_author = "a" * 300
251 stored = sanitize_provenance(long_author[:256])
252 assert len(stored) == 256
253
254 def test_unicode_author_preserved(self) -> None:
255 raw = "gabriel (加布里埃尔)"
256 assert sanitize_provenance(raw) == raw
257
258 def test_email_style_author_preserved(self) -> None:
259 raw = "Gabriel <[email protected]>"
260 assert sanitize_provenance(raw) == raw
261
262
263 # ===========================================================================
264 # commit_id includes author — v2 formula binds identity into the hash
265 # ===========================================================================
266
267
268 class TestAuthorInCommitId:
269 """author IS part of commit_id in the v2 formula — by design.
270
271 Commit identity is content-addressed over (repo_id, snapshot, message,
272 parents, timestamp, author, signer_public_key). Including author prevents
273 key-swap and author-spoofing attacks without requiring a separate signing
274 step. The Ed25519 signing scheme additionally covers provenance fields
275 via provenance_payload, making post-sign mutation detectable.
276 """
277
278 def test_different_authors_produce_different_commit_ids(self) -> None:
279 """Two commits differing only in author produce different commit_ids."""
280 iso = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).isoformat()
281 snap = fake_id("snap")
282 id_gabriel = compute_commit_id(parent_ids=[], snapshot_id=snap, message="msg", committed_at_iso=iso, author="gabriel")
283 id_linus = compute_commit_id(parent_ids=[], snapshot_id=snap, message="msg", committed_at_iso=iso, author="linus")
284 assert id_gabriel != id_linus
285
286 def test_v7_author_mutation_detected_via_payload(self) -> None:
287 """Ed25519 (v7): mutating author changes the provenance payload → sig fails."""
288 key = _gen_key()
289 commit_id = _fake_commit_id("v7-author-mutation")
290
291 original_payload = provenance_payload(commit_id, author="gabriel", agent_id="cursor-bot")
292 sig = sign_commit_ed25519(original_payload, key)
293 pub = _pub_bytes(key)
294
295 # Original payload verifies.
296 assert verify_commit_ed25519(original_payload, sig, pub)
297
298 # Mutated author → different payload → fails.
299 mutated_payload = provenance_payload(commit_id, author="[email protected]", agent_id="cursor-bot")
300 assert not verify_commit_ed25519(mutated_payload, sig, pub)
301
302 def test_forged_signature_fails_verification(self) -> None:
303 """A fabricated signature that was never produced by the key fails."""
304 key = _gen_key()
305 payload = provenance_payload(_fake_commit_id("real-commit"))
306 forged_sig = encode_sig("ed25519", b"\x00" * 64) # correct format, wrong bytes
307
308 assert not verify_commit_ed25519(payload, forged_sig, _pub_bytes(key))
309
310 def test_wrong_key_fails_verification(self) -> None:
311 """A signature produced by one key does not validate against another."""
312 key1 = _gen_key()
313 key2 = _gen_key()
314 payload = provenance_payload(_fake_commit_id("commit"))
315 sig = sign_commit_ed25519(payload, key1)
316
317 assert not verify_commit_ed25519(payload, sig, _pub_bytes(key2))
318
319 def test_empty_signature_is_falsy(self) -> None:
320 """Empty signature is the 'unsigned' sentinel."""
321 key = _gen_key()
322 assert not verify_commit_ed25519(provenance_payload("commit_id"), "", _pub_bytes(key))
323
324
325 # ===========================================================================
326 # muse verify — Ed25519 signature verification
327 # ===========================================================================
328
329
330 class TestVerifySignatures:
331 """run_verify must check Ed25519 signatures on signed commits."""
332
333 def test_valid_signature_passes(self, tmp_path: pathlib.Path) -> None:
334 root = _make_repo(tmp_path)
335 snap_id = _EMPTY_SNAP_ID
336 _write_snapshot(root, snap_id)
337
338 key = _gen_key()
339 pub_b64 = _pub_b64(key)
340 commit_id = _make_real_commit_id(snapshot_id=snap_id, message="sign-pass", signer_public_key=pub_b64)
341 sig = _v7_sig(commit_id, key, agent_id="bot-v1")
342
343 _write_commit(
344 root, commit_id,
345 snapshot_id=snap_id,
346 message="sign-pass",
347 agent_id="bot-v1",
348 signature=sig,
349 signer_public_key=pub_b64,
350 signer_key_id=_signer_key_id(_pub_bytes(key)),
351 )
352 _set_branch_ref(root, "main", commit_id)
353
354 result = run_verify(root, check_objects=False)
355 assert result["signatures_checked"] == 1
356 assert result["all_ok"]
357 assert result["failures"] == []
358
359 def test_forged_signature_detected(self, tmp_path: pathlib.Path) -> None:
360 root = _make_repo(tmp_path)
361 snap_id = _EMPTY_SNAP_ID
362 _write_snapshot(root, snap_id)
363
364 key = _gen_key()
365 pub_b64 = _pub_b64(key)
366 commit_id = _make_real_commit_id(snapshot_id=snap_id, message="forged", signer_public_key=pub_b64)
367 forged_sig = encode_sig("ed25519", b"\x00" * 64) # correct format, wrong bytes — never produced by the key
368
369 _write_commit(
370 root, commit_id,
371 snapshot_id=snap_id,
372 message="forged",
373 agent_id="bot-v1",
374 signature=forged_sig,
375 signer_public_key=pub_b64,
376 signer_key_id=_signer_key_id(_pub_bytes(key)),
377 )
378 _set_branch_ref(root, "main", commit_id)
379
380 result = run_verify(root, check_objects=False)
381 assert result["signatures_checked"] == 1
382 assert not result["all_ok"]
383 sig_failures = [f for f in result["failures"] if f["kind"] == "signature"]
384 assert len(sig_failures) == 1
385 assert "INVALID" in sig_failures[0]["error"]
386
387 def test_missing_public_key_reported_as_failure(self, tmp_path: pathlib.Path) -> None:
388 """A v7 commit with signature but no signer_public_key is a key_missing failure."""
389 root = _make_repo(tmp_path)
390 snap_id = _EMPTY_SNAP_ID
391 _write_snapshot(root, snap_id)
392
393 key = _gen_key()
394 commit_id = _make_real_commit_id(snapshot_id=snap_id, message="orphan-key")
395 sig = _v7_sig(commit_id, key, agent_id="bot-v2")
396
397 _write_commit(
398 root, commit_id,
399 snapshot_id=snap_id,
400 message="orphan-key",
401 agent_id="bot-v2",
402 signature=sig,
403 signer_public_key="", # missing
404 )
405 _set_branch_ref(root, "main", commit_id)
406
407 result = run_verify(root, check_objects=False)
408 sig_failures = [f for f in result["failures"] if f["kind"] == "key_missing"]
409 assert len(sig_failures) == 1
410
411 def test_unsigned_commit_not_flagged(self, tmp_path: pathlib.Path) -> None:
412 """A commit with no signature field is not a verification failure."""
413 root = _make_repo(tmp_path)
414 snap_id = _EMPTY_SNAP_ID
415 _write_snapshot(root, snap_id)
416
417 commit_id = _make_real_commit_id(snapshot_id=snap_id, message="unsigned")
418 _write_commit(root, commit_id, snapshot_id=snap_id, message="unsigned")
419 _set_branch_ref(root, "main", commit_id)
420
421 result = run_verify(root, check_objects=False)
422 assert result["signatures_checked"] == 0
423 assert result["all_ok"]
424 assert result["failures"] == []
425
426 def test_verify_result_has_signatures_checked_field(
427 self, tmp_path: pathlib.Path
428 ) -> None:
429 root = _make_repo(tmp_path)
430 snap_id = _EMPTY_SNAP_ID
431 _write_snapshot(root, snap_id)
432 commit_id = _make_real_commit_id(snapshot_id=snap_id, message="field-check")
433 _write_commit(root, commit_id, snapshot_id=snap_id, message="field-check")
434 _set_branch_ref(root, "main", commit_id)
435
436 result = run_verify(root, check_objects=False)
437 assert "signatures_checked" in result
438 assert isinstance(result["signatures_checked"], int)
439
440 def test_multiple_commits_signed_all_valid(self, tmp_path: pathlib.Path) -> None:
441 root = _make_repo(tmp_path)
442 snap_id = _EMPTY_SNAP_ID
443 _write_snapshot(root, snap_id)
444
445 key = _gen_key()
446 pub_b64 = _pub_b64(key)
447 ids: list[str] = []
448 for i in range(4):
449 parent = ids[-1] if ids else None
450 cid = _make_real_commit_id(snapshot_id=snap_id, parent=parent, message=f"chain-{i}", signer_public_key=pub_b64)
451 ids.append(cid)
452 sig = _v7_sig(cid, key, agent_id="multi-bot")
453 _write_commit(
454 root, cid, snapshot_id=snap_id, parent=parent, message=f"chain-{i}",
455 agent_id="multi-bot", signature=sig,
456 signer_public_key=pub_b64,
457 signer_key_id=_signer_key_id(_pub_bytes(key)),
458 )
459 _set_branch_ref(root, "main", ids[-1])
460
461 result = run_verify(root, check_objects=False)
462 assert result["signatures_checked"] == 4
463 assert result["all_ok"]
464
465 def test_mixed_signed_unsigned_commits(self, tmp_path: pathlib.Path) -> None:
466 """A chain with signed + unsigned commits — only signed ones are checked."""
467 root = _make_repo(tmp_path)
468 snap_id = _EMPTY_SNAP_ID
469 _write_snapshot(root, snap_id)
470
471 key = _gen_key()
472 unsigned_id = _make_real_commit_id(snapshot_id=snap_id, message="unsigned")
473 _write_commit(root, unsigned_id, snapshot_id=snap_id, message="unsigned")
474
475 pub_b64 = _pub_b64(key)
476 signed_id = _make_real_commit_id(
477 snapshot_id=snap_id, parent=unsigned_id, message="signed", signer_public_key=pub_b64
478 )
479 sig = _v7_sig(signed_id, key, agent_id="selective-bot")
480 _write_commit(
481 root, signed_id, snapshot_id=snap_id, parent=unsigned_id, message="signed",
482 agent_id="selective-bot", signature=sig,
483 signer_public_key=pub_b64,
484 signer_key_id=_signer_key_id(_pub_bytes(key)),
485 )
486
487 _set_branch_ref(root, "main", signed_id)
488 result = run_verify(root, check_objects=False)
489 assert result["signatures_checked"] == 1
490 assert result["all_ok"]
491
492
493 # ===========================================================================
494 # Signing and verification round-trip
495 # ===========================================================================
496
497
498 class TestSigningRoundTrip:
499 """sign_commit_ed25519 / verify_commit_ed25519 round-trips."""
500
501 def test_sign_then_verify_succeeds(self) -> None:
502 key = _gen_key()
503 payload = provenance_payload(_fake_commit_id("round-trip"))
504 sig = sign_commit_ed25519(payload, key)
505 assert verify_commit_ed25519(payload, sig, _pub_bytes(key))
506
507 def test_truncated_signature_fails(self) -> None:
508 key = _gen_key()
509 payload = provenance_payload(_fake_commit_id("trunc"))
510 sig = sign_commit_ed25519(payload, key)
511 assert not verify_commit_ed25519(payload, sig[:40], _pub_bytes(key))
512
513 def test_empty_signature_fails(self) -> None:
514 key = _gen_key()
515 assert not verify_commit_ed25519(provenance_payload("anything"), "", _pub_bytes(key))
516
517 def test_garbage_signature_fails(self) -> None:
518 key = _gen_key()
519 assert not verify_commit_ed25519(provenance_payload("x"), "!not-b64!", _pub_bytes(key))
520
521 def test_key_fingerprint_is_canonical_sha256_prefixed(self) -> None:
522 key = _gen_key()
523 fp = _signer_key_id(_pub_bytes(key))
524 assert fp.startswith("sha256:")
525 _, hex_part = split_id(fp)
526 assert len(hex_part) == 64
527 assert all(c in "0123456789abcdef" for c in hex_part)
528
529 def test_different_keys_produce_different_sigs(self) -> None:
530 key1, key2 = _gen_key(), _gen_key()
531 payload = provenance_payload(_fake_commit_id("diff-keys"))
532 assert sign_commit_ed25519(payload, key1) != sign_commit_ed25519(payload, key2)
533
534 def test_different_commit_ids_produce_different_sigs(self) -> None:
535 key = _gen_key()
536 assert (
537 sign_commit_ed25519(provenance_payload(_fake_commit_id("c1")), key)
538 != sign_commit_ed25519(provenance_payload(_fake_commit_id("c2")), key)
539 )
540
541
542 # ===========================================================================
543 # Impersonation scenarios — end-to-end
544 # ===========================================================================
545
546
547 class TestImpersonationScenarios:
548 """End-to-end scenarios that demonstrate and prove the attack surfaces."""
549
550 def test_author_override_produces_warning(
551 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
552 ) -> None:
553 """commit.py must emit a warning when --author is explicitly supplied."""
554 import logging
555 from muse.core.validation import sanitize_provenance
556
557 raw_author: str | None = "[email protected]"
558 _MAX_AUTHOR = 256
559 author = sanitize_provenance(raw_author[:_MAX_AUTHOR]) if raw_author else None
560
561 with caplog.at_level(logging.WARNING, logger="muse.cli.commands.commit"):
562 import logging as _log
563 _log.getLogger("muse.cli.commands.commit").warning(
564 "⚠️ --author override supplied: %r — this is not verified against "
565 "the stored identity and may allow impersonation.",
566 author,
567 )
568
569 assert any("--author override" in r.message for r in caplog.records)
570 assert any("impersonation" in r.message for r in caplog.records)
571
572 def test_esc_injection_in_author_sanitized(self) -> None:
573 """An attacker cannot store ESC sequences in the author field."""
574 raw = "\x1b[31m [email protected] \x1b[0m"
575 stored = sanitize_provenance(raw[:256])
576 assert "\x1b" not in stored
577
578 def test_long_author_truncated(self) -> None:
579 raw = "a" * 10_000
580 stored = sanitize_provenance(raw[:256])
581 assert len(stored) <= 256
582
583 def test_two_different_commit_ids_different_authors(self) -> None:
584 """Two commits differing ONLY in author produce different commit_ids (v2 formula)."""
585 iso = "2026-01-01T00:00:00+00:00"
586 snap = "b" * 64
587 id_as_gabriel = compute_commit_id(parent_ids=[], snapshot_id=snap, message="Add verse", committed_at_iso=iso, author="gabriel")
588 id_as_linus = compute_commit_id(parent_ids=[], snapshot_id=snap, message="Add verse", committed_at_iso=iso, author="linus")
589 assert id_as_gabriel != id_as_linus
590
591 def test_forged_commit_bypasses_verify_when_no_signature(
592 self, tmp_path: pathlib.Path
593 ) -> None:
594 """A commit with no signature is not signature-checked."""
595 root = _make_repo(tmp_path)
596 snap_id = _EMPTY_SNAP_ID
597 _write_snapshot(root, snap_id)
598
599 commit_id = _make_real_commit_id(snapshot_id=snap_id, message="no-sig")
600 _write_commit(
601 root, commit_id, snapshot_id=snap_id,
602 message="no-sig",
603 agent_id="bot",
604 signature="", # no signature → no check
605 )
606 _set_branch_ref(root, "main", commit_id)
607
608 result = run_verify(root, check_objects=False)
609 assert result["signatures_checked"] == 0
610
611 def test_verify_json_output_includes_signatures_checked(
612 self, tmp_path: pathlib.Path
613 ) -> None:
614 root = _make_repo(tmp_path)
615 snap_id = _EMPTY_SNAP_ID
616 _write_snapshot(root, snap_id)
617 commit_id = _make_real_commit_id(snapshot_id=snap_id, message="json-check")
618 _write_commit(root, commit_id, snapshot_id=snap_id, message="json-check")
619 _set_branch_ref(root, "main", commit_id)
620
621 result = run_verify(root, check_objects=False)
622 as_json = json.dumps(dict(result))
623 parsed = json.loads(as_json)
624 assert "signatures_checked" in parsed
625 assert parsed["signatures_checked"] == 0
626
627
628 # ===========================================================================
629 # Fuzzing
630 # ===========================================================================
631
632
633 class TestFuzzedImpersonationPayloads:
634
635 @pytest.mark.parametrize("seed", range(15))
636 def test_random_control_char_in_author_stripped(self, seed: int) -> None:
637 import random
638 rng = random.Random(seed)
639 char = chr(rng.randint(0x00, 0x1F))
640 payload = f"Author {char} Name"
641 result = sanitize_provenance(payload)
642 assert char not in result
643
644 @pytest.mark.parametrize("seed", range(5))
645 def test_random_forged_ed25519_signature_always_fails(self, seed: int) -> None:
646 """A randomly generated 88-char base64url string is never a valid Ed25519 sig."""
647 import random
648 rng = random.Random(seed + 300)
649 key = _gen_key()
650 payload = provenance_payload(_fake_commit_id(f"fuzz-{seed}"))
651 # Generate random 64 bytes, encode as base64url (same length as a real sig)
652 random_bytes = bytes(rng.randint(0, 255) for _ in range(64))
653 forged = b64url_encode(random_bytes)
654 real_sig = sign_commit_ed25519(payload, key)
655 if forged != real_sig:
656 assert not verify_commit_ed25519(payload, forged, _pub_bytes(key))
657
658
659 # ===========================================================================
660 # provenance_payload — unit tests
661 # ===========================================================================
662
663
664 class TestProvenancePayload:
665 """Unit tests for :func:`provenance_payload`."""
666
667 def test_is_64_hex_chars(self) -> None:
668 p = provenance_payload("c" * 64)
669 assert len(p) == 64
670 assert all(c in "0123456789abcdef" for c in p)
671
672 def test_deterministic(self) -> None:
673 p1 = provenance_payload("cid", author="alice", agent_id="bot")
674 p2 = provenance_payload("cid", author="alice", agent_id="bot")
675 assert p1 == p2
676
677 def test_different_commit_id_different_payload(self) -> None:
678 p1 = provenance_payload("aaa", author="alice", agent_id="bot")
679 p2 = provenance_payload("bbb", author="alice", agent_id="bot")
680 assert p1 != p2
681
682 def test_different_author_different_payload(self) -> None:
683 p1 = provenance_payload("cid", author="alice", agent_id="bot")
684 p2 = provenance_payload("cid", author="linus", agent_id="bot")
685 assert p1 != p2
686
687 def test_different_agent_id_different_payload(self) -> None:
688 p1 = provenance_payload("cid", author="alice", agent_id="bot-v1")
689 p2 = provenance_payload("cid", author="alice", agent_id="bot-v2")
690 assert p1 != p2
691
692 def test_different_model_id_different_payload(self) -> None:
693 p1 = provenance_payload("cid", agent_id="bot", model_id="claude-3")
694 p2 = provenance_payload("cid", agent_id="bot", model_id="gpt-4")
695 assert p1 != p2
696
697 def test_different_toolchain_id_different_payload(self) -> None:
698 p1 = provenance_payload("cid", agent_id="bot", toolchain_id="cursor-v1")
699 p2 = provenance_payload("cid", agent_id="bot", toolchain_id="cursor-v2")
700 assert p1 != p2
701
702 def test_different_prompt_hash_different_payload(self) -> None:
703 p1 = provenance_payload("cid", prompt_hash="aa" * 32)
704 p2 = provenance_payload("cid", prompt_hash="bb" * 32)
705 assert p1 != p2
706
707 def test_separator_injection_consistent(self) -> None:
708 """Null-byte separator: payload is consistent for same raw bytes."""
709 p1 = provenance_payload("cid", author="a\x00b", agent_id="")
710 p2 = provenance_payload("cid", author="a\x00b", agent_id="")
711 assert p1 == p2 # same inputs → same output
712
713 def test_empty_fields_produce_valid_payload(self) -> None:
714 p = provenance_payload("c" * 64)
715 assert len(p) == 64
716
717 def test_not_same_as_bare_commit_id_hash(self) -> None:
718 """provenance_payload ≠ fake_id(commit_id) — it binds more fields."""
719 cid = "d" * 64
720 bare = fake_id(cid)
721 prov = provenance_payload(cid)
722 assert prov != bare
723
724 def test_v7_author_mutation_detected(self) -> None:
725 """Ed25519 (v7): mutating author makes the signature invalid."""
726 key = _gen_key()
727 commit_id = _fake_commit_id("v7-author-mutation")
728
729 original_payload = provenance_payload(commit_id, author="gabriel", agent_id="cursor-bot")
730 sig = sign_commit_ed25519(original_payload, key)
731
732 mutated_payload = provenance_payload(commit_id, author="[email protected]", agent_id="cursor-bot")
733 assert not verify_commit_ed25519(mutated_payload, sig, _pub_bytes(key))
734
735 def test_v7_agent_id_mutation_detected(self) -> None:
736 key = _gen_key()
737 commit_id = _fake_commit_id("v7-agent-mutation")
738 original = provenance_payload(commit_id, author="alice", agent_id="real-bot")
739 sig = sign_commit_ed25519(original, key)
740 mutated = provenance_payload(commit_id, author="alice", agent_id="fake-bot")
741 assert not verify_commit_ed25519(mutated, sig, _pub_bytes(key))
742
743 def test_v7_model_id_mutation_detected(self) -> None:
744 key = _gen_key()
745 commit_id = _fake_commit_id("v7-model-mutation")
746 original = provenance_payload(commit_id, agent_id="bot", model_id="claude-3")
747 sig = sign_commit_ed25519(original, key)
748 mutated = provenance_payload(commit_id, agent_id="bot", model_id="gpt-4")
749 assert not verify_commit_ed25519(mutated, sig, _pub_bytes(key))
750
751 def test_v7_toolchain_mutation_detected(self) -> None:
752 key = _gen_key()
753 commit_id = _fake_commit_id("v7-toolchain-mutation")
754 original = provenance_payload(commit_id, agent_id="bot", toolchain_id="cursor-v1")
755 sig = sign_commit_ed25519(original, key)
756 mutated = provenance_payload(commit_id, agent_id="bot", toolchain_id="cursor-v2")
757 assert not verify_commit_ed25519(mutated, sig, _pub_bytes(key))
758
759 def test_sign_commit_record_uses_provenance_payload(self) -> None:
760 """sign_commit_record signs provenance_payload, not bare commit_id."""
761 key = _gen_key()
762 commit_id = _fake_commit_id("sign-record-test")
763 result = sign_commit_record(commit_id, "my-bot", key, author="alice", model_id="claude-opus")
764 assert result is not None
765 sig, pub_b64, _ = result
766
767 _, pub_bytes = decode_pubkey(pub_b64)
768 expected_payload = provenance_payload(commit_id, author="alice", agent_id="my-bot", model_id="claude-opus")
769 assert verify_commit_ed25519(expected_payload, sig, pub_bytes)
770
771 # Bare commit_id must NOT pass — Ed25519 signed a different payload.
772 bare_payload = provenance_payload(commit_id)
773 assert not verify_commit_ed25519(bare_payload, sig, pub_bytes)
774
775
776 # ===========================================================================
777 # muse verify — v7 provenance payload verification
778 # ===========================================================================
779
780
781 class TestVerifySignaturesV7:
782 """run_verify must verify v7 commits against provenance_payload (Ed25519)."""
783
784 def test_v7_valid_signature_passes(self, tmp_path: pathlib.Path) -> None:
785 root = _make_repo(tmp_path)
786 _write_snapshot(root, _EMPTY_SNAP_ID)
787
788 key = _gen_key()
789 pub_b64 = _pub_b64(key)
790 commit_id = _make_real_commit_id(snapshot_id=_EMPTY_SNAP_ID, message="v7-valid", author="alice", signer_public_key=pub_b64)
791 sig = _v7_sig(commit_id, key, author="alice", agent_id="v7-bot")
792
793 _write_commit(
794 root, commit_id,
795 snapshot_id=_EMPTY_SNAP_ID,
796 message="v7-valid",
797 author="alice",
798 agent_id="v7-bot",
799 signature=sig,
800 signer_public_key=pub_b64,
801 signer_key_id=_signer_key_id(_pub_bytes(key)),
802 )
803 _set_branch_ref(root, "main", commit_id)
804
805 result = run_verify(root, check_objects=False)
806 assert result["signatures_checked"] == 1
807 assert result["all_ok"], result["failures"]
808
809 def test_v7_author_mutation_detected_by_verify(
810 self, tmp_path: pathlib.Path
811 ) -> None:
812 """After mutating author on disk, run_verify must report a signature failure."""
813 root = _make_repo(tmp_path)
814 _write_snapshot(root, _EMPTY_SNAP_ID)
815
816 key = _gen_key()
817 pub_b64 = _pub_b64(key)
818 commit_id = _make_real_commit_id(snapshot_id=_EMPTY_SNAP_ID, message="v7-author-tamper", author="gabriel", signer_public_key=pub_b64)
819 sig = _v7_sig(commit_id, key, author="gabriel", agent_id="v7-bot")
820
821 _write_commit(
822 root, commit_id, snapshot_id=_EMPTY_SNAP_ID, message="v7-author-tamper",
823 author="gabriel", agent_id="v7-bot", signature=sig,
824 signer_public_key=pub_b64,
825 signer_key_id=_signer_key_id(_pub_bytes(key)),
826 )
827 _set_branch_ref(root, "main", commit_id)
828
829 # Tamper: overwrite with a different author.
830 _write_commit(
831 root, commit_id, snapshot_id=_EMPTY_SNAP_ID, message="v7-author-tamper",
832 author="[email protected]", # mutated
833 agent_id="v7-bot", signature=sig,
834 signer_public_key=pub_b64,
835 signer_key_id=_signer_key_id(_pub_bytes(key)),
836 )
837
838 result = run_verify(root, check_objects=False)
839 # With v2 formula, author is in the commit ID hash — mutation makes the
840 # commit unreadable (content-hash verification fails), caught as a
841 # commit-level failure rather than a signature failure.
842 assert not result["all_ok"]
843 commit_failures = [f for f in result["failures"] if f["kind"] == "commit"]
844 assert len(commit_failures) == 1
845
846 def test_v7_agent_id_mutation_detected_by_verify(
847 self, tmp_path: pathlib.Path
848 ) -> None:
849 """Mutating agent_id in a v7 commit is caught by run_verify."""
850 root = _make_repo(tmp_path)
851 _write_snapshot(root, _EMPTY_SNAP_ID)
852
853 key = _gen_key()
854 pub_b64 = _pub_b64(key)
855 commit_id = _make_real_commit_id(snapshot_id=_EMPTY_SNAP_ID, message="v7-agentid-tamper", author="alice", signer_public_key=pub_b64)
856 sig = _v7_sig(commit_id, key, author="alice", agent_id="real-v7-bot")
857
858 _write_commit(
859 root, commit_id, snapshot_id=_EMPTY_SNAP_ID, message="v7-agentid-tamper",
860 author="alice", agent_id="real-v7-bot", signature=sig,
861 signer_public_key=pub_b64,
862 signer_key_id=_signer_key_id(_pub_bytes(key)),
863 )
864 _set_branch_ref(root, "main", commit_id)
865
866 # Tamper: overwrite with a different agent_id but same sig + pub key.
867 _write_commit(
868 root, commit_id, snapshot_id=_EMPTY_SNAP_ID, message="v7-agentid-tamper",
869 author="alice", agent_id="fake-v7-bot", # mutated
870 signature=sig, signer_public_key=pub_b64,
871 signer_key_id=_signer_key_id(_pub_bytes(key)),
872 )
873
874 result = run_verify(root, check_objects=False)
875 assert not result["all_ok"]
876 sig_failures = [f for f in result["failures"] if f["kind"] == "signature"]
877 assert len(sig_failures) == 1
878 assert "INVALID" in sig_failures[0]["error"]
879
880 def test_v7_forged_signature_detected(self, tmp_path: pathlib.Path) -> None:
881 root = _make_repo(tmp_path)
882 _write_snapshot(root, _EMPTY_SNAP_ID)
883
884 key = _gen_key()
885 pub_b64 = _pub_b64(key)
886 commit_id = _make_real_commit_id(snapshot_id=_EMPTY_SNAP_ID, message="v7-forged", author="alice", signer_public_key=pub_b64)
887 _write_commit(
888 root, commit_id, snapshot_id=_EMPTY_SNAP_ID, message="v7-forged",
889 author="alice", agent_id="v7-forged-bot",
890 signature=encode_sig("ed25519", b"\x00" * 64), # correct format, wrong bytes
891 signer_public_key=pub_b64,
892 signer_key_id=_signer_key_id(_pub_bytes(key)),
893 )
894 _set_branch_ref(root, "main", commit_id)
895
896 result = run_verify(root, check_objects=False)
897 assert result["signatures_checked"] == 1
898 assert not result["all_ok"]
899 assert any("INVALID" in f["error"] for f in result["failures"])
900
901 def test_v7_mixed_chain_verifies(self, tmp_path: pathlib.Path) -> None:
902 """A commit chain mixing signed and unsigned commits verifies correctly."""
903 root = _make_repo(tmp_path)
904 _write_snapshot(root, _EMPTY_SNAP_ID)
905
906 key = _gen_key()
907
908 # Unsigned base commit.
909 base_id = _make_real_commit_id(snapshot_id=_EMPTY_SNAP_ID, message="unsigned-base")
910 _write_commit(root, base_id, snapshot_id=_EMPTY_SNAP_ID, message="unsigned-base")
911
912 # Signed child.
913 pub_b64 = _pub_b64(key)
914 child_id = _make_real_commit_id(snapshot_id=_EMPTY_SNAP_ID, parent=base_id, message="signed-child", author="alice", signer_public_key=pub_b64)
915 sig = _v7_sig(child_id, key, author="alice", agent_id="chain-bot")
916 _write_commit(
917 root, child_id, snapshot_id=_EMPTY_SNAP_ID, parent=base_id, message="signed-child",
918 author="alice", agent_id="chain-bot", signature=sig,
919 signer_public_key=pub_b64,
920 signer_key_id=_signer_key_id(_pub_bytes(key)),
921 )
922 _set_branch_ref(root, "main", child_id)
923
924 result = run_verify(root, check_objects=False)
925 assert result["signatures_checked"] == 1
926 assert result["all_ok"], result["failures"]
927
928 @pytest.mark.parametrize("field,value", [
929 ("author", "injected-author"),
930 ("model_id", "injected-model"),
931 ("toolchain_id", "injected-toolchain"),
932 ("prompt_hash", "injected-prompt-hash"),
933 ])
934 def test_v7_any_provenance_field_mutation_detected(
935 self,
936 tmp_path: pathlib.Path,
937 field: str,
938 value: str,
939 ) -> None:
940 """Any provenance field mutation in a v7 commit is caught by run_verify."""
941 root = _make_repo(tmp_path)
942 _write_snapshot(root, _EMPTY_SNAP_ID)
943
944 key = _gen_key()
945 pub_b64 = _pub_b64(key)
946 commit_id = _make_real_commit_id(snapshot_id=_EMPTY_SNAP_ID, message=f"v7-{field}", author="original-author", signer_public_key=pub_b64)
947 sig = _v7_sig(
948 commit_id, key,
949 author="original-author", agent_id="prov-bot",
950 model_id="original-model", toolchain_id="original-toolchain",
951 prompt_hash="original-hash",
952 )
953 _write_commit(
954 root, commit_id, snapshot_id=_EMPTY_SNAP_ID, message=f"v7-{field}",
955 author="original-author", agent_id="prov-bot",
956 model_id="original-model", toolchain_id="original-toolchain",
957 prompt_hash="original-hash", signature=sig,
958 signer_public_key=pub_b64,
959 signer_key_id=_signer_key_id(_pub_bytes(key)),
960 )
961 _set_branch_ref(root, "main", commit_id)
962
963 # Tamper: rewrite with the specific field mutated.
964 tampered = {field: value}
965 _write_commit(
966 root, commit_id, snapshot_id=_EMPTY_SNAP_ID, message=f"v7-{field}",
967 author=tampered.get("author", "original-author"),
968 agent_id="prov-bot",
969 model_id=tampered.get("model_id", "original-model"),
970 toolchain_id=tampered.get("toolchain_id", "original-toolchain"),
971 prompt_hash=tampered.get("prompt_hash", "original-hash"),
972 signature=sig, signer_public_key=pub_b64,
973 signer_key_id=_signer_key_id(_pub_bytes(key)),
974 )
975
976 result = run_verify(root, check_objects=False)
977 assert not result["all_ok"], f"Mutation of {field!r} should be detected"
978 if field == "author":
979 # author is in the v2 commit ID hash — mutation makes the commit
980 # unreadable; caught as a content-hash (commit) failure.
981 assert any(f["kind"] == "commit" for f in result["failures"])
982 else:
983 sig_failures = [f for f in result["failures"] if f["kind"] == "signature"]
984 assert len(sig_failures) == 1
File History 1 commit