gabriel / muse public
test_verify_extended.py python
1,008 lines 42.5 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Extended integrity tests for ``muse verify`` / ``run_verify``.
2
3 Covers gaps left by test_cmd_verify.py, test_cmd_verify_hardening.py, and
4 test_cmd_verify_shallow.py:
5
6 Signature verification (run_verify BFS path, not verify-commit):
7 S1 Valid Ed25519 signature — run_verify must NOT report a failure.
8 S2 Tampered commit payload — signature present but payload changed → kind="signature".
9 S3 Wrong signature bytes (bit-flip) — Ed25519 rejects → kind="signature".
10 S4 Unknown signature algorithm prefix (e.g. "ml-dsa-65:…") → kind="signature".
11 S5 Unknown public-key algorithm prefix (e.g. "ml-dsa-65:…") → kind="key_missing".
12 S6 Malformed public-key base64 ("ed25519:!!!") → decode_pubkey ValueError
13 → pub_bytes=b"" → kind="signature".
14 S7 Empty signer_public_key ("") → sig_algo("") == "" → kind="key_missing".
15 S8 signatures_checked counts only signed commits (not unsigned ones).
16 S9 Mixed chain: some commits signed, some unsigned — only signed ones verified.
17 S10 Error message for sig failure names agent_id and key_id.
18
19 Merge commit (parent2_commit_id):
20 M1 Merge commit: both parent chains walked, all objects verified.
21 M2 Merge commit: corrupt object in second-parent chain detected.
22 M3 Merge commit: missing second-parent commit → kind="commit".
23
24 Ref path traversal security:
25 P1 branch="../../evil" — _branch_refs cannot escape heads dir.
26 P2 branch="/absolute/path" — does not read outside the repo.
27 P3 Ref file with binary (non-UTF-8) content — decode errors handled gracefully.
28
29 IOError / TOCTOU:
30 T1 Object file deleted between object_state returning PRESENT and _rehash_object
31 reading it — OSError propagates; CLI exits with code 3.
32
33 JSON schema completeness:
34 J1 --json output includes "strict" key.
35 J2 --json "strict" is False by default, True when --strict is passed.
36 J3 --json "check_objects" key present in all branches.
37
38 Counter accuracy:
39 C1 Same object ID referenced by two different snapshots counted once.
40 C2 signatures_checked equals the number of commits with a non-empty signature.
41 C3 hash-mismatch error message contains both expected and actual short IDs.
42 """
43
44 from __future__ import annotations
45
46 import datetime
47 import json
48 import os
49 import pathlib
50 import threading
51 from collections.abc import Mapping
52 from typing import Any
53
54 import pytest
55 from tests.cli_test_helper import CliRunner, InvokeResult
56
57 from muse.core.object_store import object_path, write_object
58 from muse.core.provenance import (
59 encode_public_key,
60 provenance_payload,
61 sign_commit_ed25519,
62 sign_commit_record,
63 verify_commit_ed25519,
64 )
65 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
66 from muse.core.commits import (
67 CommitRecord,
68 write_commit,
69 )
70 from muse.core.snapshots import (
71 SnapshotRecord,
72 write_snapshot,
73 )
74 from muse.core.types import blob_id, encode_pubkey, long_id, short_id
75 from muse.core.verify import run_verify
76 from muse.core.paths import heads_dir, muse_dir, ref_path
77
78 runner = CliRunner()
79 _REPO_ID = "verify-extended-test"
80
81
82 # ---------------------------------------------------------------------------
83 # Shared helpers
84 # ---------------------------------------------------------------------------
85
86
87 def _init_repo(path: pathlib.Path) -> pathlib.Path:
88 muse = muse_dir(path)
89 for d in ("commits", "snapshots", "objects", "refs/heads"):
90 (muse / d).mkdir(parents=True, exist_ok=True)
91 (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
92 (muse / "repo.json").write_text(
93 json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8"
94 )
95 return path
96
97
98
99
100 def _make_key() -> "Any":
101 """Generate a fresh Ed25519 private key."""
102 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
103 return Ed25519PrivateKey.generate()
104
105
106 def _commit(
107 root: pathlib.Path,
108 *,
109 branch: str = "main",
110 parent_id: str | None = None,
111 parent2_id: str | None = None,
112 content: bytes = b"data",
113 idx: int = 0,
114 private_key: "Any | None" = None,
115 agent_id: str = "test-agent",
116 ) -> str:
117 """Write a complete commit (object + snapshot + commit record) and update branch ref.
118
119 When *private_key* is given the commit is Ed25519-signed.
120 Returns the commit_id.
121 """
122 raw = content + idx.to_bytes(4, "big")
123 obj_id = blob_id(raw)
124 write_object(root, obj_id, raw)
125 manifest = {f"file_{idx}.txt": obj_id}
126 snap_id = compute_snapshot_id(manifest)
127 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
128
129 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(hours=idx)
130 parent_ids = [pid for pid in [parent_id, parent2_id] if pid]
131
132 # signer_public_key is included in the commit_id hash — must derive it BEFORE
133 # calling compute_commit_id so the stored record passes _verify_commit_id.
134 pub_b64 = ""
135 if private_key is not None:
136 _, pub_b64 = encode_public_key(private_key)
137
138 commit_id = compute_commit_id(
139 parent_ids=parent_ids,
140 snapshot_id=snap_id,
141 message=f"commit {idx}",
142 committed_at_iso=committed_at.isoformat(),
143 signer_public_key=pub_b64,
144 )
145
146 sig = key_id = ""
147 if private_key is not None:
148 sig, _, key_id = sign_commit_record(
149 commit_id,
150 agent_id=agent_id,
151 private_key=private_key,
152 committed_at=committed_at.isoformat(),
153 )
154
155 write_commit(root, CommitRecord(
156 commit_id=commit_id,
157 branch=branch,
158 snapshot_id=snap_id,
159 message=f"commit {idx}",
160 committed_at=committed_at,
161 parent_commit_id=parent_id,
162 parent2_commit_id=parent2_id,
163 agent_id=agent_id if private_key else "",
164 signature=sig,
165 signer_public_key=pub_b64,
166 signer_key_id=key_id,
167 ))
168 (ref_path(root, branch)).write_text(commit_id, encoding="utf-8")
169 return commit_id
170
171
172 def _env(root: pathlib.Path) -> Mapping[str, str]:
173 return {"MUSE_REPO_ROOT": str(root)}
174
175
176 def _force_write_commit(root: pathlib.Path, record: "CommitRecord") -> None:
177 """Overwrite a commit object unconditionally, bypassing write_commit idempotency.
178
179 Use only in tests that need to inject tampered records after a valid commit
180 has already been written.
181 """
182 import json as _json
183 import os
184 from muse.core.object_store import object_path
185 commit_file = object_path(root, record.commit_id)
186 commit_file.parent.mkdir(parents=True, exist_ok=True)
187 payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode()
188 if commit_file.exists():
189 os.chmod(commit_file, 0o644)
190 commit_file.write_bytes(b"commit " + str(len(payload)).encode() + b"\0" + payload)
191
192
193 def _invoke(root: pathlib.Path, *args: str) -> InvokeResult:
194 from muse.cli.app import main as cli_main
195 return runner.invoke(cli_main, ["verify", *args], env=_env(root))
196
197
198 # ---------------------------------------------------------------------------
199 # S — Signature verification in run_verify BFS
200 # ---------------------------------------------------------------------------
201
202
203 class TestSignatureVerification:
204 """Ed25519 signature verification exercised through run_verify's BFS walk.
205
206 These tests cover the signature branch inside run_verify, which is
207 distinct from the muse verify-commit command (a separate plumbing tool).
208 """
209
210 def test_s1_valid_signed_commit_passes(self, tmp_path: pathlib.Path) -> None:
211 """S1: A properly signed commit must not produce any failure."""
212 repo = _init_repo(tmp_path)
213 key = _make_key()
214 _commit(repo, private_key=key, idx=0)
215
216 result = run_verify(repo)
217
218 assert result["all_ok"] is True, f"Unexpected failures: {result['failures']}"
219 assert result["signatures_checked"] == 1
220 assert result["failures"] == []
221
222 def test_s2_tampered_payload_detected(self, tmp_path: pathlib.Path) -> None:
223 """S2: A commit whose agent_id differs from what was signed → signature invalid."""
224 repo = _init_repo(tmp_path)
225 key = _make_key()
226 cid = _commit(repo, private_key=key, agent_id="real-agent", idx=0)
227
228 # Re-read and tamper the commit record: change agent_id to something
229 # different from what was signed. The signature still references the
230 # original agent_id in the provenance_payload.
231 from muse.core.commits import read_commit
232 original = read_commit(repo, cid)
233 assert original is not None
234 tampered = CommitRecord(
235 commit_id=original.commit_id,
236 branch=original.branch,
237 snapshot_id=original.snapshot_id,
238 message=original.message,
239 committed_at=original.committed_at,
240 agent_id="evil-agent", # tampered — differs from what was signed
241 signature=original.signature,
242 signer_public_key=original.signer_public_key,
243 signer_key_id=original.signer_key_id,
244 )
245 _force_write_commit(repo, tampered)
246
247 result = run_verify(repo)
248
249 assert result["all_ok"] is False
250 sig_failures = [f for f in result["failures"] if f["kind"] == "signature"]
251 assert len(sig_failures) >= 1, f"Expected signature failure, got: {result['failures']}"
252
253 def test_s3_bit_flip_in_signature_bytes_detected(self, tmp_path: pathlib.Path) -> None:
254 """S3: One bit flipped in the stored signature bytes → Ed25519 rejects → kind='signature'."""
255 repo = _init_repo(tmp_path)
256 key = _make_key()
257 cid = _commit(repo, private_key=key, idx=0)
258
259 from muse.core.commits import read_commit
260 from muse.core.types import decode_sig, encode_sig
261 original = read_commit(repo, cid)
262 assert original is not None
263 _, sig_bytes = decode_sig(original.signature)
264 # Flip one bit in the middle of the signature
265 sig_list = bytearray(sig_bytes)
266 sig_list[32] ^= 0x01
267 bad_sig = encode_sig("ed25519", bytes(sig_list))
268
269 tampered = CommitRecord(
270 commit_id=original.commit_id,
271 branch=original.branch,
272 snapshot_id=original.snapshot_id,
273 message=original.message,
274 committed_at=original.committed_at,
275 agent_id=original.agent_id,
276 signature=bad_sig,
277 signer_public_key=original.signer_public_key,
278 signer_key_id=original.signer_key_id,
279 )
280 _force_write_commit(repo, tampered)
281
282 result = run_verify(repo)
283
284 assert result["all_ok"] is False
285 kinds = [f["kind"] for f in result["failures"]]
286 assert "signature" in kinds, f"Expected 'signature' failure, got: {kinds}"
287
288 def test_s4_unknown_signature_algorithm_reported(self, tmp_path: pathlib.Path) -> None:
289 """S4: sig='ml-dsa-65:…' (unknown algorithm) → kind='signature', not 'key_missing'."""
290 repo = _init_repo(tmp_path)
291 key = _make_key()
292 _, pub_b64 = encode_public_key(key)
293 content = b"unknown-sig-alg"
294 obj_id = blob_id(content)
295 write_object(repo, obj_id, content)
296 manifest = {"f.txt": obj_id}
297 snap_id = compute_snapshot_id(manifest)
298 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
299 committed_at = datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc)
300 cid = compute_commit_id(
301 parent_ids=[], snapshot_id=snap_id,
302 message="unknown alg", committed_at_iso=committed_at.isoformat(),
303 signer_public_key=pub_b64,
304 )
305 write_commit(repo, CommitRecord(
306 commit_id=cid, branch="main",
307 snapshot_id=snap_id, message="unknown alg", committed_at=committed_at,
308 signature=f"ml-dsa-65:{'A' * 80}", # unknown prefix
309 signer_public_key=pub_b64, # valid ed25519 key
310 agent_id="future-agent",
311 ))
312 (heads_dir(repo) / "main").write_text(cid)
313
314 result = run_verify(repo)
315
316 assert result["all_ok"] is False
317 kinds = [f["kind"] for f in result["failures"]]
318 assert "signature" in kinds, f"Expected 'signature', got: {kinds}"
319 assert "key_missing" not in kinds
320
321 def test_s5_unknown_pubkey_algorithm_reported_as_key_missing(self, tmp_path: pathlib.Path) -> None:
322 """S5: sig='ed25519:…' but pub_raw='ml-dsa-65:…' → kind='key_missing', not 'signature'."""
323 repo = _init_repo(tmp_path)
324 key = _make_key()
325 content = b"unknown-pk-alg"
326 obj_id = blob_id(content)
327 write_object(repo, obj_id, content)
328 manifest = {"f.txt": obj_id}
329 snap_id = compute_snapshot_id(manifest)
330 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
331 committed_at = datetime.datetime(2026, 3, 2, tzinfo=datetime.timezone.utc)
332 unknown_pk = f"ml-dsa-65:{'A' * 80}"
333 cid = compute_commit_id(
334 parent_ids=[], snapshot_id=snap_id,
335 message="unknown pk alg", committed_at_iso=committed_at.isoformat(),
336 signer_public_key=unknown_pk,
337 )
338 payload = provenance_payload(cid, agent_id="future-agent",
339 committed_at=committed_at.isoformat())
340 valid_sig = sign_commit_ed25519(payload, key)
341 write_commit(repo, CommitRecord(
342 commit_id=cid, branch="main",
343 snapshot_id=snap_id, message="unknown pk alg", committed_at=committed_at,
344 signature=valid_sig,
345 signer_public_key=unknown_pk, # unknown prefix on key
346 agent_id="future-agent",
347 ))
348 (heads_dir(repo) / "main").write_text(cid)
349
350 result = run_verify(repo)
351
352 assert result["all_ok"] is False
353 kinds = [f["kind"] for f in result["failures"]]
354 assert "key_missing" in kinds, f"Expected 'key_missing', got: {kinds}"
355 assert "signature" not in kinds
356
357 def test_s6_malformed_pubkey_base64_causes_signature_failure(self, tmp_path: pathlib.Path) -> None:
358 """S6: pub_raw='ed25519:!!!' (valid prefix, invalid base64) → decode_pubkey raises
359 ValueError → pub_bytes=b'' → kind='signature'."""
360 repo = _init_repo(tmp_path)
361 key = _make_key()
362 content = b"bad-b64-key"
363 obj_id = blob_id(content)
364 write_object(repo, obj_id, content)
365 manifest = {"f.txt": obj_id}
366 snap_id = compute_snapshot_id(manifest)
367 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
368 committed_at = datetime.datetime(2026, 3, 3, tzinfo=datetime.timezone.utc)
369 bad_pk = "ed25519:!!!notvalidbase64!!!"
370 cid = compute_commit_id(
371 parent_ids=[], snapshot_id=snap_id,
372 message="bad b64 key", committed_at_iso=committed_at.isoformat(),
373 signer_public_key=bad_pk,
374 )
375 payload = provenance_payload(cid, agent_id="agent",
376 committed_at=committed_at.isoformat())
377 valid_sig = sign_commit_ed25519(payload, key)
378 write_commit(repo, CommitRecord(
379 commit_id=cid, branch="main",
380 snapshot_id=snap_id, message="bad b64 key", committed_at=committed_at,
381 signature=valid_sig,
382 signer_public_key=bad_pk, # prefix ok, content not valid base64
383 agent_id="agent",
384 ))
385 (heads_dir(repo) / "main").write_text(cid)
386
387 result = run_verify(repo)
388
389 assert result["all_ok"] is False
390 kinds = [f["kind"] for f in result["failures"]]
391 assert "signature" in kinds, f"Expected 'signature' failure, got: {kinds}"
392
393 def test_s7_empty_signer_public_key_reported_as_key_missing(self, tmp_path: pathlib.Path) -> None:
394 """S7: signer_public_key='' → sig_algo('') == '' != 'ed25519' → kind='key_missing'."""
395 repo = _init_repo(tmp_path)
396 key = _make_key()
397 content = b"no-pk"
398 obj_id = blob_id(content)
399 write_object(repo, obj_id, content)
400 manifest = {"f.txt": obj_id}
401 snap_id = compute_snapshot_id(manifest)
402 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
403 committed_at = datetime.datetime(2026, 3, 4, tzinfo=datetime.timezone.utc)
404 cid = compute_commit_id(
405 parent_ids=[], snapshot_id=snap_id,
406 message="no pk", committed_at_iso=committed_at.isoformat(),
407 )
408 payload = provenance_payload(cid, committed_at=committed_at.isoformat())
409 valid_sig = sign_commit_ed25519(payload, key)
410 write_commit(repo, CommitRecord(
411 commit_id=cid, branch="main",
412 snapshot_id=snap_id, message="no pk", committed_at=committed_at,
413 signature=valid_sig,
414 signer_public_key="", # key rotation / missing key
415 ))
416 (heads_dir(repo) / "main").write_text(cid)
417
418 result = run_verify(repo)
419
420 assert result["all_ok"] is False
421 kinds = [f["kind"] for f in result["failures"]]
422 assert "key_missing" in kinds, f"Expected 'key_missing', got: {kinds}"
423 assert "signature" not in kinds
424
425 def test_s8_unsigned_commits_not_counted(self, tmp_path: pathlib.Path) -> None:
426 """S8: Commits with empty signature field do not increment signatures_checked."""
427 repo = _init_repo(tmp_path)
428 prev = _commit(repo, idx=0) # unsigned
429 _commit(repo, parent_id=prev, idx=1) # unsigned
430
431 result = run_verify(repo)
432
433 assert result["all_ok"] is True
434 assert result["signatures_checked"] == 0
435
436 def test_s9_mixed_chain_counts_only_signed(self, tmp_path: pathlib.Path) -> None:
437 """S9: 3-commit chain: commit 0 unsigned, commit 1 signed, commit 2 unsigned.
438 signatures_checked must be exactly 1 and all_ok must be True."""
439 repo = _init_repo(tmp_path)
440 key = _make_key()
441 c0 = _commit(repo, idx=0) # unsigned
442 c1 = _commit(repo, parent_id=c0, idx=1, private_key=key) # signed
443 _commit(repo, parent_id=c1, idx=2) # unsigned
444
445 result = run_verify(repo)
446
447 assert result["all_ok"] is True, f"Failures: {result['failures']}"
448 assert result["signatures_checked"] == 1
449 assert result["commits_checked"] == 3
450
451 def test_s10_signature_failure_error_names_agent(self, tmp_path: pathlib.Path) -> None:
452 """S10: Signature failure error message includes agent_id and key reference."""
453 repo = _init_repo(tmp_path)
454 key = _make_key()
455 cid = _commit(repo, private_key=key, agent_id="my-special-agent", idx=0)
456
457 # Tamper the signature bytes so verification fails
458 from muse.core.commits import read_commit
459 from muse.core.types import decode_sig, encode_sig
460 original = read_commit(repo, cid)
461 assert original is not None
462 _, sig_bytes = decode_sig(original.signature)
463 bad_sig = encode_sig("ed25519", bytes([sig_bytes[0] ^ 0xFF]) + sig_bytes[1:])
464 _force_write_commit(repo, CommitRecord(
465 commit_id=original.commit_id,
466 branch=original.branch, snapshot_id=original.snapshot_id,
467 message=original.message, committed_at=original.committed_at,
468 agent_id="my-special-agent",
469 signature=bad_sig,
470 signer_public_key=original.signer_public_key,
471 signer_key_id=original.signer_key_id,
472 ))
473
474 result = run_verify(repo)
475
476 assert result["all_ok"] is False
477 sig_failures = [f for f in result["failures"] if f["kind"] == "signature"]
478 assert sig_failures
479 error_msg = sig_failures[0]["error"]
480 assert "my-special-agent" in error_msg or short_id(cid) in error_msg, (
481 f"Error message should name agent or commit: {error_msg!r}"
482 )
483
484
485 # ---------------------------------------------------------------------------
486 # M — Merge commits (parent2_commit_id)
487 # ---------------------------------------------------------------------------
488
489
490 class TestMergeCommits:
491 """parent2_commit_id in the BFS walk — both parent chains verified."""
492
493 def _make_branch_commit(
494 self,
495 root: pathlib.Path,
496 branch: str,
497 idx: int,
498 parent_id: str | None = None,
499 ) -> tuple[str, str]:
500 """Create a commit on *branch* and return (commit_id, obj_id)."""
501 content = f"branch-{branch}-{idx}".encode()
502 obj_id = blob_id(content)
503 write_object(root, obj_id, content)
504 manifest = {f"{branch}_{idx}.py": obj_id}
505 snap_id = compute_snapshot_id(manifest)
506 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
507 committed_at = (
508 datetime.datetime(2026, 2, 1, tzinfo=datetime.timezone.utc)
509 + datetime.timedelta(hours=idx)
510 )
511 parent_ids = [parent_id] if parent_id else []
512 cid = compute_commit_id(
513 parent_ids=parent_ids, snapshot_id=snap_id,
514 message=f"{branch} commit {idx}", committed_at_iso=committed_at.isoformat(),
515 )
516 write_commit(root, CommitRecord(
517 commit_id=cid, branch=branch,
518 snapshot_id=snap_id, message=f"{branch} commit {idx}",
519 committed_at=committed_at, parent_commit_id=parent_id,
520 ))
521 (ref_path(root, branch)).write_text(cid)
522 return cid, obj_id
523
524 def test_m1_merge_commit_both_parents_walked(self, tmp_path: pathlib.Path) -> None:
525 """M1: A merge commit with two parents; objects from both parent chains verified."""
526 repo = _init_repo(tmp_path)
527
528 # main branch: one commit
529 main_cid, main_obj = self._make_branch_commit(repo, "main", idx=0)
530 # feat branch: one commit
531 feat_cid, feat_obj = self._make_branch_commit(repo, "feat", idx=1)
532
533 # Merge commit: parent1=main, parent2=feat
534 merge_content = b"merge-content"
535 merge_obj = blob_id(merge_content)
536 write_object(repo, merge_obj, merge_content)
537 manifest = {"merge.py": merge_obj}
538 snap_id = compute_snapshot_id(manifest)
539 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
540 committed_at = datetime.datetime(2026, 2, 1, 12, tzinfo=datetime.timezone.utc)
541 merge_cid = compute_commit_id(
542 parent_ids=[main_cid, feat_cid],
543 snapshot_id=snap_id,
544 message="merge feat into main",
545 committed_at_iso=committed_at.isoformat(),
546 )
547 write_commit(repo, CommitRecord(
548 commit_id=merge_cid, branch="main",
549 snapshot_id=snap_id, message="merge feat into main",
550 committed_at=committed_at, parent_commit_id=main_cid,
551 parent2_commit_id=feat_cid,
552 ))
553 (heads_dir(repo) / "main").write_text(merge_cid)
554
555 result = run_verify(repo)
556
557 assert result["all_ok"] is True, f"Failures: {result['failures']}"
558 # 3 distinct commits: main + feat + merge (feat also has its own branch ref)
559 assert result["commits_checked"] >= 3
560 # All 3 objects must have been checked
561 assert result["objects_checked"] >= 3
562
563 def test_m2_corrupt_object_in_second_parent_chain_detected(
564 self, tmp_path: pathlib.Path
565 ) -> None:
566 """M2: Corruption in an object reachable only via parent2 is caught."""
567 repo = _init_repo(tmp_path)
568
569 main_cid, _ = self._make_branch_commit(repo, "main", idx=0)
570 feat_cid, feat_obj = self._make_branch_commit(repo, "feat", idx=1)
571
572 # Corrupt the feat object
573 feat_file = object_path(repo, feat_obj)
574 os.chmod(feat_file, 0o644)
575 feat_file.write_bytes(b"corrupted by test")
576
577 # Merge with feat as parent2
578 merge_content = b"merge"
579 merge_obj = blob_id(merge_content)
580 write_object(repo, merge_obj, merge_content)
581 manifest = {"m.py": merge_obj}
582 snap_id = compute_snapshot_id(manifest)
583 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
584 committed_at = datetime.datetime(2026, 2, 2, tzinfo=datetime.timezone.utc)
585 merge_cid = compute_commit_id(
586 parent_ids=[main_cid, feat_cid], snapshot_id=snap_id,
587 message="merge", committed_at_iso=committed_at.isoformat(),
588 )
589 write_commit(repo, CommitRecord(
590 commit_id=merge_cid, branch="main",
591 snapshot_id=snap_id, message="merge", committed_at=committed_at,
592 parent_commit_id=main_cid, parent2_commit_id=feat_cid,
593 ))
594 (heads_dir(repo) / "main").write_text(merge_cid)
595
596 result = run_verify(repo, check_objects=True)
597
598 assert result["all_ok"] is False
599 object_failures = [f for f in result["failures"] if f["kind"] == "object"]
600 assert any(f["id"] == feat_obj for f in object_failures), (
601 f"Expected feat_obj failure, got: {object_failures}"
602 )
603
604 def test_m3_missing_second_parent_commit_reported(self, tmp_path: pathlib.Path) -> None:
605 """M3: parent2_commit_id points to a nonexistent commit → kind='commit'."""
606 repo = _init_repo(tmp_path)
607
608 main_cid, _ = self._make_branch_commit(repo, "main", idx=0)
609 phantom_parent = long_id("d" * 64) # will be stubbed — verify must report it missing
610 from muse.core.commits import commit_path as _cp
611 _stub = _cp(repo, phantom_parent)
612 _stub.parent.mkdir(parents=True, exist_ok=True)
613 _stub.write_bytes(b"") # unreadable stub; verify walks it and reports missing
614
615 merge_content = b"merge-phantom"
616 merge_obj = blob_id(merge_content)
617 write_object(repo, merge_obj, merge_content)
618 manifest = {"mp.py": merge_obj}
619 snap_id = compute_snapshot_id(manifest)
620 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
621 committed_at = datetime.datetime(2026, 2, 3, tzinfo=datetime.timezone.utc)
622 merge_cid = compute_commit_id(
623 parent_ids=[main_cid, phantom_parent], snapshot_id=snap_id,
624 message="merge phantom", committed_at_iso=committed_at.isoformat(),
625 )
626 write_commit(repo, CommitRecord(
627 commit_id=merge_cid, branch="main",
628 snapshot_id=snap_id, message="merge phantom", committed_at=committed_at,
629 parent_commit_id=main_cid, parent2_commit_id=phantom_parent,
630 ))
631 (heads_dir(repo) / "main").write_text(merge_cid)
632
633 result = run_verify(repo)
634
635 assert result["all_ok"] is False
636 commit_failures = [f for f in result["failures"] if f["kind"] == "commit"]
637 assert any(f["id"] == phantom_parent for f in commit_failures), (
638 f"Expected commit failure for phantom parent: {commit_failures}"
639 )
640
641
642 # ---------------------------------------------------------------------------
643 # P — Path traversal and ref security
644 # ---------------------------------------------------------------------------
645
646
647 class TestRefSecurity:
648 """Ref file security: path traversal, binary content, oversized files."""
649
650 def test_p1_path_traversal_via_branch_param_does_not_escape(
651 self, tmp_path: pathlib.Path
652 ) -> None:
653 """P1: branch='../../evil' cannot traverse outside the heads directory.
654
655 _branch_refs constructs heads_dir / branch. Python's Path resolves
656 '..' lazily — 'heads/../../evil' normalises to '.muse/evil' which
657 should not exist. The result must be an empty ref list (not a
658 failure, just nothing found).
659 """
660 repo = _init_repo(tmp_path)
661 # Write a file the traversal might try to read
662 evil_file = muse_dir(repo) / "evil"
663 evil_file.write_text(long_id("a" * 64))
664
665 from muse.core.verify import _branch_refs # type: ignore[attr-defined]
666 refs = _branch_refs(repo, branch="../../evil")
667 # Must return empty — either the file didn't resolve into heads/ or
668 # was not found. The critical requirement: no crash and no refs returned
669 # that would cause BFS to walk attacker-controlled data as a commit ID.
670 assert refs == [] or all(commit_id.startswith("sha256:") for _, commit_id in refs)
671
672 def test_p2_absolute_path_branch_does_not_read_outside_repo(
673 self, tmp_path: pathlib.Path
674 ) -> None:
675 """P2: branch='/etc/passwd' is joined to heads_dir — Path joins strip leading /
676 on some platforms or produce a heads_dir-relative path. Either way no
677 sensitive file is read and no crash occurs."""
678 repo = _init_repo(tmp_path)
679
680 from muse.core.verify import _branch_refs # type: ignore[attr-defined]
681 # Must not raise; may return [] or a ref if heads_dir//etc/passwd exists (it won't)
682 try:
683 refs = _branch_refs(repo, branch="/etc/passwd")
684 except Exception as exc:
685 pytest.fail(f"_branch_refs raised on absolute branch path: {exc}")
686 # No valid commit ID should come from /etc/passwd content
687 for _, cid in refs:
688 assert cid.startswith("sha256:") and len(cid) == 71, (
689 f"Suspicious commit ID from absolute path branch: {cid!r}"
690 )
691
692 def test_p3_binary_ref_file_handled_gracefully(self, tmp_path: pathlib.Path) -> None:
693 """P3: Binary (non-UTF-8) content in a ref file is decoded with errors='replace'
694 and produces an invalid ref ID → kind='ref' failure, no crash."""
695 repo = _init_repo(tmp_path)
696 # Write binary garbage to the ref file
697 (heads_dir(repo) / "main").write_bytes(b"\xff\xfe\x00binary\x01garbage")
698
699 result = run_verify(repo)
700
701 # Must not raise; the invalid ref ID should be reported
702 assert result["all_ok"] is False
703 kinds = [f["kind"] for f in result["failures"]]
704 assert "ref" in kinds, f"Expected 'ref' failure for binary content, got: {kinds}"
705
706
707 # ---------------------------------------------------------------------------
708 # T — IOError / TOCTOU
709 # ---------------------------------------------------------------------------
710
711
712 class TestIOErrorHandling:
713 """IOError propagation from _rehash_object and related paths."""
714
715 def test_t1_object_deleted_between_state_check_and_read(
716 self, tmp_path: pathlib.Path
717 ) -> None:
718 """T1: Object file exists when object_state runs but is deleted before
719 _rehash_object opens it → OSError propagates through run_verify.
720 The CLI must exit with code 3 (INTERNAL_ERROR)."""
721 repo = _init_repo(tmp_path)
722 content = b"will be deleted"
723 obj_id = blob_id(content)
724 write_object(repo, obj_id, content)
725 manifest = {"toctou.py": obj_id}
726 snap_id = compute_snapshot_id(manifest)
727 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
728 committed_at = datetime.datetime(2026, 4, 10, tzinfo=datetime.timezone.utc)
729 cid = compute_commit_id(
730 parent_ids=[], snapshot_id=snap_id,
731 message="toctou test", committed_at_iso=committed_at.isoformat(),
732 )
733 write_commit(repo, CommitRecord(
734 commit_id=cid, branch="main",
735 snapshot_id=snap_id, message="toctou test", committed_at=committed_at,
736 ))
737 (heads_dir(repo) / "main").write_text(cid)
738
739 # Delete the object after writing it (simulate TOCTOU)
740 obj_file = object_path(repo, obj_id)
741 os.chmod(obj_file, 0o644)
742 os.unlink(obj_file)
743
744 # run_verify itself should raise OSError (not silently swallow it)
745 # OR handle it and produce a failure. Both are acceptable; what's NOT
746 # acceptable is silently reporting all_ok=True.
747 try:
748 result = run_verify(repo, check_objects=True)
749 # If run_verify catches the OSError internally, it must report a failure
750 assert result["all_ok"] is False, (
751 "run_verify must not report all_ok=True when an object is unreadable"
752 )
753 except OSError:
754 # Also acceptable: OSError propagates to CLI level
755 pass
756
757
758 # ---------------------------------------------------------------------------
759 # J — JSON schema completeness
760 # ---------------------------------------------------------------------------
761
762
763 class TestJsonSchema:
764 """JSON output must include all documented fields."""
765
766 def test_j1_strict_field_present_in_json(self, tmp_path: pathlib.Path) -> None:
767 """J1: The 'strict' key must appear in --json output."""
768 repo = _init_repo(tmp_path)
769 _commit(repo, idx=0)
770 result = _invoke(repo, "--json")
771 assert result.exit_code == 0
772 data = json.loads(result.output)
773 assert "strict" in data, f"'strict' missing from JSON: {list(data.keys())}"
774
775 def test_j2_strict_false_by_default(self, tmp_path: pathlib.Path) -> None:
776 """J2: Default invocation must have strict=False in JSON output."""
777 repo = _init_repo(tmp_path)
778 _commit(repo, idx=0)
779 data = json.loads(_invoke(repo, "--json").output)
780 assert data["strict"] is False
781
782 def test_j2b_strict_true_when_flag_passed(self, tmp_path: pathlib.Path) -> None:
783 """J2b: --strict must set strict=True in JSON output."""
784 repo = _init_repo(tmp_path)
785 _commit(repo, idx=0)
786 data = json.loads(_invoke(repo, "--strict", "--json").output)
787 assert data["strict"] is True
788
789 def test_j3_check_objects_present_in_all_branches(self, tmp_path: pathlib.Path) -> None:
790 """J3: 'check_objects' must appear whether or not --no-objects is passed."""
791 repo = _init_repo(tmp_path)
792 _commit(repo, idx=0)
793 d1 = json.loads(_invoke(repo, "--json").output)
794 d2 = json.loads(_invoke(repo, "--no-objects", "--json").output)
795 assert "check_objects" in d1
796 assert "check_objects" in d2
797 assert d1["check_objects"] is True
798 assert d2["check_objects"] is False
799
800 def test_j4_all_documented_fields_present(self, tmp_path: pathlib.Path) -> None:
801 """J4: Every field documented in the command docstring appears in JSON."""
802 repo = _init_repo(tmp_path)
803 _commit(repo, idx=0)
804 data = json.loads(_invoke(repo, "--json").output)
805 required_fields = {
806 "repo_id", "refs_checked", "commits_checked", "snapshots_checked",
807 "objects_checked", "signatures_checked", "all_ok", "nothing_checked",
808 "check_objects", "strict", "branch", "fail_fast", "failures",
809 "shallow_commits", "promised_objects", "is_shallow", "promisor_remotes",
810 "muse_version", "schema", "exit_code", "duration_ms", "timestamp",
811 "warnings",
812 }
813 missing = required_fields - set(data.keys())
814 assert not missing, f"JSON output missing fields: {missing}"
815
816 def test_j5_failures_list_empty_when_all_ok(self, tmp_path: pathlib.Path) -> None:
817 """J5: When all_ok=True the failures list must be [] (not absent)."""
818 repo = _init_repo(tmp_path)
819 _commit(repo, idx=0)
820 data = json.loads(_invoke(repo, "--json").output)
821 assert data["all_ok"] is True
822 assert data["failures"] == []
823
824
825 # ---------------------------------------------------------------------------
826 # C — Counter accuracy
827 # ---------------------------------------------------------------------------
828
829
830 class TestCounterAccuracy:
831 """Verify that all counters are accurate, deduplicated, and never inflated."""
832
833 def test_c1_same_object_across_two_snapshots_counted_once(
834 self, tmp_path: pathlib.Path
835 ) -> None:
836 """C1: One object ID referenced by two different snapshots must appear
837 in objects_checked exactly once (deduplication via verified_objects set)."""
838 repo = _init_repo(tmp_path)
839 shared_content = b"shared object"
840 shared_obj = blob_id(shared_content)
841 write_object(repo, shared_obj, shared_content)
842
843 # Commit 0: snapshot references shared_obj
844 manifest0 = {"shared.py": shared_obj}
845 snap0 = compute_snapshot_id(manifest0)
846 write_snapshot(repo, SnapshotRecord(snapshot_id=snap0, manifest=manifest0))
847 committed_at0 = datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc)
848 cid0 = compute_commit_id(
849 parent_ids=[], snapshot_id=snap0,
850 message="c0", committed_at_iso=committed_at0.isoformat(),
851 )
852 write_commit(repo, CommitRecord(
853 commit_id=cid0, branch="main",
854 snapshot_id=snap0, message="c0", committed_at=committed_at0,
855 ))
856
857 # Commit 1: different snapshot, same shared_obj
858 extra_content = b"extra"
859 extra_obj = blob_id(extra_content)
860 write_object(repo, extra_obj, extra_content)
861 manifest1 = {"shared.py": shared_obj, "extra.py": extra_obj}
862 snap1 = compute_snapshot_id(manifest1)
863 write_snapshot(repo, SnapshotRecord(snapshot_id=snap1, manifest=manifest1))
864 committed_at1 = datetime.datetime(2026, 5, 2, tzinfo=datetime.timezone.utc)
865 cid1 = compute_commit_id(
866 parent_ids=[cid0], snapshot_id=snap1,
867 message="c1", committed_at_iso=committed_at1.isoformat(),
868 )
869 write_commit(repo, CommitRecord(
870 commit_id=cid1, branch="main",
871 snapshot_id=snap1, message="c1", committed_at=committed_at1,
872 parent_commit_id=cid0,
873 ))
874 (heads_dir(repo) / "main").write_text(cid1)
875
876 result = run_verify(repo, check_objects=True)
877
878 assert result["all_ok"] is True
879 # 2 distinct objects: shared_obj + extra_obj (shared_obj counted once)
880 assert result["objects_checked"] == 2, (
881 f"Expected 2 unique objects, got {result['objects_checked']}"
882 )
883
884 def test_c2_signatures_checked_exact_count(self, tmp_path: pathlib.Path) -> None:
885 """C2: signatures_checked equals exactly the number of commits with
886 a non-empty 'signature' field."""
887 repo = _init_repo(tmp_path)
888 key = _make_key()
889 prev = None
890 for i in range(5):
891 # Alternate: even-indexed commits are signed
892 pk = key if i % 2 == 0 else None
893 prev = _commit(repo, parent_id=prev, idx=i, private_key=pk)
894
895 result = run_verify(repo)
896
897 # Commits 0, 2, 4 are signed → 3 signatures_checked
898 assert result["all_ok"] is True, f"Failures: {result['failures']}"
899 assert result["signatures_checked"] == 3
900
901 def test_c3_hash_mismatch_error_shows_both_ids(self, tmp_path: pathlib.Path) -> None:
902 """C3: A hash mismatch failure's error string contains both the expected
903 short ID and the actual short ID computed from the corrupted content."""
904 repo = _init_repo(tmp_path)
905 content = b"original content for c3"
906 obj_id = blob_id(content)
907 write_object(repo, obj_id, content)
908 manifest = {"c3.py": obj_id}
909 snap_id = compute_snapshot_id(manifest)
910 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
911 committed_at = datetime.datetime(2026, 5, 3, tzinfo=datetime.timezone.utc)
912 cid = compute_commit_id(
913 parent_ids=[], snapshot_id=snap_id,
914 message="c3", committed_at_iso=committed_at.isoformat(),
915 )
916 write_commit(repo, CommitRecord(
917 commit_id=cid, branch="main",
918 snapshot_id=snap_id, message="c3", committed_at=committed_at,
919 ))
920 (heads_dir(repo) / "main").write_text(cid)
921
922 corrupt_content = b"corrupted replacement bytes for c3"
923 obj_file = object_path(repo, obj_id)
924 os.chmod(obj_file, 0o644)
925 obj_file.write_bytes(corrupt_content)
926
927 result = run_verify(repo, check_objects=True)
928
929 assert result["all_ok"] is False
930 obj_failures = [f for f in result["failures"] if f["kind"] == "object"]
931 assert obj_failures
932 error_msg = obj_failures[0]["error"]
933 # Error must mention the expected short ID or the actual short ID
934 actual_id = blob_id(corrupt_content)
935 assert short_id(obj_id) in error_msg or short_id(actual_id) in error_msg, (
936 f"Error message should contain short ID reference: {error_msg!r}"
937 )
938 # Keyword "mismatch" or "corruption" must appear
939 assert "mismatch" in error_msg or "corruption" in error_msg, (
940 f"Error must describe the problem: {error_msg!r}"
941 )
942
943 def test_c4_commit_count_accurate_on_diamond_dag(self, tmp_path: pathlib.Path) -> None:
944 """C4: Diamond-shaped DAG (main←A, main←B, merge←A+B) — each commit
945 counted exactly once despite two paths to common ancestors."""
946 repo = _init_repo(tmp_path)
947
948 # Common ancestor
949 base_cid, _ = self._make_raw_commit(repo, "main", idx=0, parent=None)
950 # Two diverging branches
951 a_cid, _ = self._make_raw_commit(repo, "feat-a", idx=1, parent=base_cid)
952 b_cid, _ = self._make_raw_commit(repo, "feat-b", idx=2, parent=base_cid)
953 # Merge
954 merge_content = b"diamond-merge"
955 merge_obj = blob_id(merge_content)
956 write_object(repo, merge_obj, merge_content)
957 manifest = {"m.py": merge_obj}
958 snap_id = compute_snapshot_id(manifest)
959 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
960 committed_at = datetime.datetime(2026, 5, 10, tzinfo=datetime.timezone.utc)
961 merge_cid = compute_commit_id(
962 parent_ids=[a_cid, b_cid], snapshot_id=snap_id,
963 message="merge", committed_at_iso=committed_at.isoformat(),
964 )
965 write_commit(repo, CommitRecord(
966 commit_id=merge_cid, branch="main",
967 snapshot_id=snap_id, message="merge", committed_at=committed_at,
968 parent_commit_id=a_cid, parent2_commit_id=b_cid,
969 ))
970 (heads_dir(repo) / "main").write_text(merge_cid)
971
972 result = run_verify(repo)
973
974 assert result["all_ok"] is True
975 # 4 commits: base + A + B + merge — base must NOT be counted twice
976 assert result["commits_checked"] == 4, (
977 f"Expected 4 commits in diamond DAG, got {result['commits_checked']}"
978 )
979
980 def _make_raw_commit(
981 self,
982 root: pathlib.Path,
983 branch: str,
984 idx: int,
985 parent: str | None,
986 ) -> tuple[str, str]:
987 content = f"raw-{branch}-{idx}".encode()
988 obj_id = blob_id(content)
989 write_object(root, obj_id, content)
990 manifest = {f"{branch}_{idx}.py": obj_id}
991 snap_id = compute_snapshot_id(manifest)
992 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
993 committed_at = (
994 datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc)
995 + datetime.timedelta(hours=idx)
996 )
997 parent_ids = [parent] if parent else []
998 cid = compute_commit_id(
999 parent_ids=parent_ids, snapshot_id=snap_id,
1000 message=f"{branch} {idx}", committed_at_iso=committed_at.isoformat(),
1001 )
1002 write_commit(root, CommitRecord(
1003 commit_id=cid, branch=branch,
1004 snapshot_id=snap_id, message=f"{branch} {idx}",
1005 committed_at=committed_at, parent_commit_id=parent,
1006 ))
1007 (ref_path(root, branch)).write_text(cid)
1008 return cid, obj_id
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago