"""Extended integrity tests for ``muse verify`` / ``run_verify``. Covers gaps left by test_cmd_verify.py, test_cmd_verify_hardening.py, and test_cmd_verify_shallow.py: Signature verification (run_verify BFS path, not verify-commit): S1 Valid Ed25519 signature — run_verify must NOT report a failure. S2 Tampered commit payload — signature present but payload changed → kind="signature". S3 Wrong signature bytes (bit-flip) — Ed25519 rejects → kind="signature". S4 Unknown signature algorithm prefix (e.g. "ml-dsa-65:…") → kind="signature". S5 Unknown public-key algorithm prefix (e.g. "ml-dsa-65:…") → kind="key_missing". S6 Malformed public-key base64 ("ed25519:!!!") → decode_pubkey ValueError → pub_bytes=b"" → kind="signature". S7 Empty signer_public_key ("") → sig_algo("") == "" → kind="key_missing". S8 signatures_checked counts only signed commits (not unsigned ones). S9 Mixed chain: some commits signed, some unsigned — only signed ones verified. S10 Error message for sig failure names agent_id and key_id. Merge commit (parent2_commit_id): M1 Merge commit: both parent chains walked, all objects verified. M2 Merge commit: corrupt object in second-parent chain detected. M3 Merge commit: missing second-parent commit → kind="commit". Ref path traversal security: P1 branch="../../evil" — _branch_refs cannot escape heads dir. P2 branch="/absolute/path" — does not read outside the repo. P3 Ref file with binary (non-UTF-8) content — decode errors handled gracefully. IOError / TOCTOU: T1 Object file deleted between object_state returning PRESENT and _rehash_object reading it — OSError propagates; CLI exits with code 3. JSON schema completeness: J1 --json output includes "strict" key. J2 --json "strict" is False by default, True when --strict is passed. J3 --json "check_objects" key present in all branches. Counter accuracy: C1 Same object ID referenced by two different snapshots counted once. C2 signatures_checked equals the number of commits with a non-empty signature. C3 hash-mismatch error message contains both expected and actual short IDs. """ from __future__ import annotations import datetime import json import os import pathlib import threading from collections.abc import Mapping from typing import Any import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.object_store import object_path, write_object from muse.core.provenance import ( encode_public_key, provenance_payload, sign_commit_ed25519, sign_commit_record, verify_commit_ed25519, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import blob_id, encode_pubkey, long_id, short_id from muse.core.verify import run_verify from muse.core.paths import heads_dir, muse_dir, ref_path runner = CliRunner() _REPO_ID = "verify-extended-test" # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _make_key() -> "Any": """Generate a fresh Ed25519 private key.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey return Ed25519PrivateKey.generate() def _commit( root: pathlib.Path, *, branch: str = "main", parent_id: str | None = None, parent2_id: str | None = None, content: bytes = b"data", idx: int = 0, private_key: "Any | None" = None, agent_id: str = "test-agent", ) -> str: """Write a complete commit (object + snapshot + commit record) and update branch ref. When *private_key* is given the commit is Ed25519-signed. Returns the commit_id. """ raw = content + idx.to_bytes(4, "big") obj_id = blob_id(raw) write_object(root, obj_id, raw) manifest = {f"file_{idx}.txt": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(hours=idx) parent_ids = [pid for pid in [parent_id, parent2_id] if pid] # signer_public_key is included in the commit_id hash — must derive it BEFORE # calling compute_commit_id so the stored record passes _verify_commit_id. pub_b64 = "" if private_key is not None: _, pub_b64 = encode_public_key(private_key) commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"commit {idx}", committed_at_iso=committed_at.isoformat(), signer_public_key=pub_b64, ) sig = key_id = "" if private_key is not None: sig, _, key_id = sign_commit_record( commit_id, agent_id=agent_id, private_key=private_key, committed_at=committed_at.isoformat(), ) write_commit(root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=f"commit {idx}", committed_at=committed_at, parent_commit_id=parent_id, parent2_commit_id=parent2_id, agent_id=agent_id if private_key else "", signature=sig, signer_public_key=pub_b64, signer_key_id=key_id, )) (ref_path(root, branch)).write_text(commit_id, encoding="utf-8") return commit_id def _env(root: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(root)} def _force_write_commit(root: pathlib.Path, record: "CommitRecord") -> None: """Overwrite a commit object unconditionally, bypassing write_commit idempotency. Use only in tests that need to inject tampered records after a valid commit has already been written. """ import json as _json import os from muse.core.object_store import object_path commit_file = object_path(root, record.commit_id) commit_file.parent.mkdir(parents=True, exist_ok=True) payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode() if commit_file.exists(): os.chmod(commit_file, 0o644) commit_file.write_bytes(b"commit " + str(len(payload)).encode() + b"\0" + payload) def _invoke(root: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli_main return runner.invoke(cli_main, ["verify", *args], env=_env(root)) # --------------------------------------------------------------------------- # S — Signature verification in run_verify BFS # --------------------------------------------------------------------------- class TestSignatureVerification: """Ed25519 signature verification exercised through run_verify's BFS walk. These tests cover the signature branch inside run_verify, which is distinct from the muse verify-commit command (a separate plumbing tool). """ def test_s1_valid_signed_commit_passes(self, tmp_path: pathlib.Path) -> None: """S1: A properly signed commit must not produce any failure.""" repo = _init_repo(tmp_path) key = _make_key() _commit(repo, private_key=key, idx=0) result = run_verify(repo) assert result["all_ok"] is True, f"Unexpected failures: {result['failures']}" assert result["signatures_checked"] == 1 assert result["failures"] == [] def test_s2_tampered_payload_detected(self, tmp_path: pathlib.Path) -> None: """S2: A commit whose agent_id differs from what was signed → signature invalid.""" repo = _init_repo(tmp_path) key = _make_key() cid = _commit(repo, private_key=key, agent_id="real-agent", idx=0) # Re-read and tamper the commit record: change agent_id to something # different from what was signed. The signature still references the # original agent_id in the provenance_payload. from muse.core.commits import read_commit original = read_commit(repo, cid) assert original is not None tampered = CommitRecord( commit_id=original.commit_id, branch=original.branch, snapshot_id=original.snapshot_id, message=original.message, committed_at=original.committed_at, agent_id="evil-agent", # tampered — differs from what was signed signature=original.signature, signer_public_key=original.signer_public_key, signer_key_id=original.signer_key_id, ) _force_write_commit(repo, tampered) result = run_verify(repo) assert result["all_ok"] is False sig_failures = [f for f in result["failures"] if f["kind"] == "signature"] assert len(sig_failures) >= 1, f"Expected signature failure, got: {result['failures']}" def test_s3_bit_flip_in_signature_bytes_detected(self, tmp_path: pathlib.Path) -> None: """S3: One bit flipped in the stored signature bytes → Ed25519 rejects → kind='signature'.""" repo = _init_repo(tmp_path) key = _make_key() cid = _commit(repo, private_key=key, idx=0) from muse.core.commits import read_commit from muse.core.types import decode_sig, encode_sig original = read_commit(repo, cid) assert original is not None _, sig_bytes = decode_sig(original.signature) # Flip one bit in the middle of the signature sig_list = bytearray(sig_bytes) sig_list[32] ^= 0x01 bad_sig = encode_sig("ed25519", bytes(sig_list)) tampered = CommitRecord( commit_id=original.commit_id, branch=original.branch, snapshot_id=original.snapshot_id, message=original.message, committed_at=original.committed_at, agent_id=original.agent_id, signature=bad_sig, signer_public_key=original.signer_public_key, signer_key_id=original.signer_key_id, ) _force_write_commit(repo, tampered) result = run_verify(repo) assert result["all_ok"] is False kinds = [f["kind"] for f in result["failures"]] assert "signature" in kinds, f"Expected 'signature' failure, got: {kinds}" def test_s4_unknown_signature_algorithm_reported(self, tmp_path: pathlib.Path) -> None: """S4: sig='ml-dsa-65:…' (unknown algorithm) → kind='signature', not 'key_missing'.""" repo = _init_repo(tmp_path) key = _make_key() _, pub_b64 = encode_public_key(key) content = b"unknown-sig-alg" obj_id = blob_id(content) write_object(repo, obj_id, content) manifest = {"f.txt": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="unknown alg", committed_at_iso=committed_at.isoformat(), signer_public_key=pub_b64, ) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="unknown alg", committed_at=committed_at, signature=f"ml-dsa-65:{'A' * 80}", # unknown prefix signer_public_key=pub_b64, # valid ed25519 key agent_id="future-agent", )) (heads_dir(repo) / "main").write_text(cid) result = run_verify(repo) assert result["all_ok"] is False kinds = [f["kind"] for f in result["failures"]] assert "signature" in kinds, f"Expected 'signature', got: {kinds}" assert "key_missing" not in kinds def test_s5_unknown_pubkey_algorithm_reported_as_key_missing(self, tmp_path: pathlib.Path) -> None: """S5: sig='ed25519:…' but pub_raw='ml-dsa-65:…' → kind='key_missing', not 'signature'.""" repo = _init_repo(tmp_path) key = _make_key() content = b"unknown-pk-alg" obj_id = blob_id(content) write_object(repo, obj_id, content) manifest = {"f.txt": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 3, 2, tzinfo=datetime.timezone.utc) unknown_pk = f"ml-dsa-65:{'A' * 80}" cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="unknown pk alg", committed_at_iso=committed_at.isoformat(), signer_public_key=unknown_pk, ) payload = provenance_payload(cid, agent_id="future-agent", committed_at=committed_at.isoformat()) valid_sig = sign_commit_ed25519(payload, key) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="unknown pk alg", committed_at=committed_at, signature=valid_sig, signer_public_key=unknown_pk, # unknown prefix on key agent_id="future-agent", )) (heads_dir(repo) / "main").write_text(cid) result = run_verify(repo) assert result["all_ok"] is False kinds = [f["kind"] for f in result["failures"]] assert "key_missing" in kinds, f"Expected 'key_missing', got: {kinds}" assert "signature" not in kinds def test_s6_malformed_pubkey_base64_causes_signature_failure(self, tmp_path: pathlib.Path) -> None: """S6: pub_raw='ed25519:!!!' (valid prefix, invalid base64) → decode_pubkey raises ValueError → pub_bytes=b'' → kind='signature'.""" repo = _init_repo(tmp_path) key = _make_key() content = b"bad-b64-key" obj_id = blob_id(content) write_object(repo, obj_id, content) manifest = {"f.txt": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 3, 3, tzinfo=datetime.timezone.utc) bad_pk = "ed25519:!!!notvalidbase64!!!" cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="bad b64 key", committed_at_iso=committed_at.isoformat(), signer_public_key=bad_pk, ) payload = provenance_payload(cid, agent_id="agent", committed_at=committed_at.isoformat()) valid_sig = sign_commit_ed25519(payload, key) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="bad b64 key", committed_at=committed_at, signature=valid_sig, signer_public_key=bad_pk, # prefix ok, content not valid base64 agent_id="agent", )) (heads_dir(repo) / "main").write_text(cid) result = run_verify(repo) assert result["all_ok"] is False kinds = [f["kind"] for f in result["failures"]] assert "signature" in kinds, f"Expected 'signature' failure, got: {kinds}" def test_s7_empty_signer_public_key_reported_as_key_missing(self, tmp_path: pathlib.Path) -> None: """S7: signer_public_key='' → sig_algo('') == '' != 'ed25519' → kind='key_missing'.""" repo = _init_repo(tmp_path) key = _make_key() content = b"no-pk" obj_id = blob_id(content) write_object(repo, obj_id, content) manifest = {"f.txt": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 3, 4, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="no pk", committed_at_iso=committed_at.isoformat(), ) payload = provenance_payload(cid, committed_at=committed_at.isoformat()) valid_sig = sign_commit_ed25519(payload, key) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="no pk", committed_at=committed_at, signature=valid_sig, signer_public_key="", # key rotation / missing key )) (heads_dir(repo) / "main").write_text(cid) result = run_verify(repo) assert result["all_ok"] is False kinds = [f["kind"] for f in result["failures"]] assert "key_missing" in kinds, f"Expected 'key_missing', got: {kinds}" assert "signature" not in kinds def test_s8_unsigned_commits_not_counted(self, tmp_path: pathlib.Path) -> None: """S8: Commits with empty signature field do not increment signatures_checked.""" repo = _init_repo(tmp_path) prev = _commit(repo, idx=0) # unsigned _commit(repo, parent_id=prev, idx=1) # unsigned result = run_verify(repo) assert result["all_ok"] is True assert result["signatures_checked"] == 0 def test_s9_mixed_chain_counts_only_signed(self, tmp_path: pathlib.Path) -> None: """S9: 3-commit chain: commit 0 unsigned, commit 1 signed, commit 2 unsigned. signatures_checked must be exactly 1 and all_ok must be True.""" repo = _init_repo(tmp_path) key = _make_key() c0 = _commit(repo, idx=0) # unsigned c1 = _commit(repo, parent_id=c0, idx=1, private_key=key) # signed _commit(repo, parent_id=c1, idx=2) # unsigned result = run_verify(repo) assert result["all_ok"] is True, f"Failures: {result['failures']}" assert result["signatures_checked"] == 1 assert result["commits_checked"] == 3 def test_s10_signature_failure_error_names_agent(self, tmp_path: pathlib.Path) -> None: """S10: Signature failure error message includes agent_id and key reference.""" repo = _init_repo(tmp_path) key = _make_key() cid = _commit(repo, private_key=key, agent_id="my-special-agent", idx=0) # Tamper the signature bytes so verification fails from muse.core.commits import read_commit from muse.core.types import decode_sig, encode_sig original = read_commit(repo, cid) assert original is not None _, sig_bytes = decode_sig(original.signature) bad_sig = encode_sig("ed25519", bytes([sig_bytes[0] ^ 0xFF]) + sig_bytes[1:]) _force_write_commit(repo, CommitRecord( commit_id=original.commit_id, branch=original.branch, snapshot_id=original.snapshot_id, message=original.message, committed_at=original.committed_at, agent_id="my-special-agent", signature=bad_sig, signer_public_key=original.signer_public_key, signer_key_id=original.signer_key_id, )) result = run_verify(repo) assert result["all_ok"] is False sig_failures = [f for f in result["failures"] if f["kind"] == "signature"] assert sig_failures error_msg = sig_failures[0]["error"] assert "my-special-agent" in error_msg or short_id(cid) in error_msg, ( f"Error message should name agent or commit: {error_msg!r}" ) # --------------------------------------------------------------------------- # M — Merge commits (parent2_commit_id) # --------------------------------------------------------------------------- class TestMergeCommits: """parent2_commit_id in the BFS walk — both parent chains verified.""" def _make_branch_commit( self, root: pathlib.Path, branch: str, idx: int, parent_id: str | None = None, ) -> tuple[str, str]: """Create a commit on *branch* and return (commit_id, obj_id).""" content = f"branch-{branch}-{idx}".encode() obj_id = blob_id(content) write_object(root, obj_id, content) manifest = {f"{branch}_{idx}.py": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = ( datetime.datetime(2026, 2, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(hours=idx) ) parent_ids = [parent_id] if parent_id else [] cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"{branch} commit {idx}", committed_at_iso=committed_at.isoformat(), ) write_commit(root, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=f"{branch} commit {idx}", committed_at=committed_at, parent_commit_id=parent_id, )) (ref_path(root, branch)).write_text(cid) return cid, obj_id def test_m1_merge_commit_both_parents_walked(self, tmp_path: pathlib.Path) -> None: """M1: A merge commit with two parents; objects from both parent chains verified.""" repo = _init_repo(tmp_path) # main branch: one commit main_cid, main_obj = self._make_branch_commit(repo, "main", idx=0) # feat branch: one commit feat_cid, feat_obj = self._make_branch_commit(repo, "feat", idx=1) # Merge commit: parent1=main, parent2=feat merge_content = b"merge-content" merge_obj = blob_id(merge_content) write_object(repo, merge_obj, merge_content) manifest = {"merge.py": merge_obj} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 2, 1, 12, tzinfo=datetime.timezone.utc) merge_cid = compute_commit_id( parent_ids=[main_cid, feat_cid], snapshot_id=snap_id, message="merge feat into main", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=merge_cid, branch="main", snapshot_id=snap_id, message="merge feat into main", committed_at=committed_at, parent_commit_id=main_cid, parent2_commit_id=feat_cid, )) (heads_dir(repo) / "main").write_text(merge_cid) result = run_verify(repo) assert result["all_ok"] is True, f"Failures: {result['failures']}" # 3 distinct commits: main + feat + merge (feat also has its own branch ref) assert result["commits_checked"] >= 3 # All 3 objects must have been checked assert result["objects_checked"] >= 3 def test_m2_corrupt_object_in_second_parent_chain_detected( self, tmp_path: pathlib.Path ) -> None: """M2: Corruption in an object reachable only via parent2 is caught.""" repo = _init_repo(tmp_path) main_cid, _ = self._make_branch_commit(repo, "main", idx=0) feat_cid, feat_obj = self._make_branch_commit(repo, "feat", idx=1) # Corrupt the feat object feat_file = object_path(repo, feat_obj) os.chmod(feat_file, 0o644) feat_file.write_bytes(b"corrupted by test") # Merge with feat as parent2 merge_content = b"merge" merge_obj = blob_id(merge_content) write_object(repo, merge_obj, merge_content) manifest = {"m.py": merge_obj} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 2, 2, tzinfo=datetime.timezone.utc) merge_cid = compute_commit_id( parent_ids=[main_cid, feat_cid], snapshot_id=snap_id, message="merge", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=merge_cid, branch="main", snapshot_id=snap_id, message="merge", committed_at=committed_at, parent_commit_id=main_cid, parent2_commit_id=feat_cid, )) (heads_dir(repo) / "main").write_text(merge_cid) result = run_verify(repo, check_objects=True) assert result["all_ok"] is False object_failures = [f for f in result["failures"] if f["kind"] == "object"] assert any(f["id"] == feat_obj for f in object_failures), ( f"Expected feat_obj failure, got: {object_failures}" ) def test_m3_missing_second_parent_commit_reported(self, tmp_path: pathlib.Path) -> None: """M3: parent2_commit_id points to a nonexistent commit → kind='commit'.""" repo = _init_repo(tmp_path) main_cid, _ = self._make_branch_commit(repo, "main", idx=0) phantom_parent = long_id("d" * 64) # will be stubbed — verify must report it missing from muse.core.commits import commit_path as _cp _stub = _cp(repo, phantom_parent) _stub.parent.mkdir(parents=True, exist_ok=True) _stub.write_bytes(b"") # unreadable stub; verify walks it and reports missing merge_content = b"merge-phantom" merge_obj = blob_id(merge_content) write_object(repo, merge_obj, merge_content) manifest = {"mp.py": merge_obj} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 2, 3, tzinfo=datetime.timezone.utc) merge_cid = compute_commit_id( parent_ids=[main_cid, phantom_parent], snapshot_id=snap_id, message="merge phantom", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=merge_cid, branch="main", snapshot_id=snap_id, message="merge phantom", committed_at=committed_at, parent_commit_id=main_cid, parent2_commit_id=phantom_parent, )) (heads_dir(repo) / "main").write_text(merge_cid) result = run_verify(repo) assert result["all_ok"] is False commit_failures = [f for f in result["failures"] if f["kind"] == "commit"] assert any(f["id"] == phantom_parent for f in commit_failures), ( f"Expected commit failure for phantom parent: {commit_failures}" ) # --------------------------------------------------------------------------- # P — Path traversal and ref security # --------------------------------------------------------------------------- class TestRefSecurity: """Ref file security: path traversal, binary content, oversized files.""" def test_p1_path_traversal_via_branch_param_does_not_escape( self, tmp_path: pathlib.Path ) -> None: """P1: branch='../../evil' cannot traverse outside the heads directory. _branch_refs constructs heads_dir / branch. Python's Path resolves '..' lazily — 'heads/../../evil' normalises to '.muse/evil' which should not exist. The result must be an empty ref list (not a failure, just nothing found). """ repo = _init_repo(tmp_path) # Write a file the traversal might try to read evil_file = muse_dir(repo) / "evil" evil_file.write_text(long_id("a" * 64)) from muse.core.verify import _branch_refs # type: ignore[attr-defined] refs = _branch_refs(repo, branch="../../evil") # Must return empty — either the file didn't resolve into heads/ or # was not found. The critical requirement: no crash and no refs returned # that would cause BFS to walk attacker-controlled data as a commit ID. assert refs == [] or all(commit_id.startswith("sha256:") for _, commit_id in refs) def test_p2_absolute_path_branch_does_not_read_outside_repo( self, tmp_path: pathlib.Path ) -> None: """P2: branch='/etc/passwd' is joined to heads_dir — Path joins strip leading / on some platforms or produce a heads_dir-relative path. Either way no sensitive file is read and no crash occurs.""" repo = _init_repo(tmp_path) from muse.core.verify import _branch_refs # type: ignore[attr-defined] # Must not raise; may return [] or a ref if heads_dir//etc/passwd exists (it won't) try: refs = _branch_refs(repo, branch="/etc/passwd") except Exception as exc: pytest.fail(f"_branch_refs raised on absolute branch path: {exc}") # No valid commit ID should come from /etc/passwd content for _, cid in refs: assert cid.startswith("sha256:") and len(cid) == 71, ( f"Suspicious commit ID from absolute path branch: {cid!r}" ) def test_p3_binary_ref_file_handled_gracefully(self, tmp_path: pathlib.Path) -> None: """P3: Binary (non-UTF-8) content in a ref file is decoded with errors='replace' and produces an invalid ref ID → kind='ref' failure, no crash.""" repo = _init_repo(tmp_path) # Write binary garbage to the ref file (heads_dir(repo) / "main").write_bytes(b"\xff\xfe\x00binary\x01garbage") result = run_verify(repo) # Must not raise; the invalid ref ID should be reported assert result["all_ok"] is False kinds = [f["kind"] for f in result["failures"]] assert "ref" in kinds, f"Expected 'ref' failure for binary content, got: {kinds}" # --------------------------------------------------------------------------- # T — IOError / TOCTOU # --------------------------------------------------------------------------- class TestIOErrorHandling: """IOError propagation from _rehash_object and related paths.""" def test_t1_object_deleted_between_state_check_and_read( self, tmp_path: pathlib.Path ) -> None: """T1: Object file exists when object_state runs but is deleted before _rehash_object opens it → OSError propagates through run_verify. The CLI must exit with code 3 (INTERNAL_ERROR).""" repo = _init_repo(tmp_path) content = b"will be deleted" obj_id = blob_id(content) write_object(repo, obj_id, content) manifest = {"toctou.py": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 4, 10, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="toctou test", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="toctou test", committed_at=committed_at, )) (heads_dir(repo) / "main").write_text(cid) # Delete the object after writing it (simulate TOCTOU) obj_file = object_path(repo, obj_id) os.chmod(obj_file, 0o644) os.unlink(obj_file) # run_verify itself should raise OSError (not silently swallow it) # OR handle it and produce a failure. Both are acceptable; what's NOT # acceptable is silently reporting all_ok=True. try: result = run_verify(repo, check_objects=True) # If run_verify catches the OSError internally, it must report a failure assert result["all_ok"] is False, ( "run_verify must not report all_ok=True when an object is unreadable" ) except OSError: # Also acceptable: OSError propagates to CLI level pass # --------------------------------------------------------------------------- # J — JSON schema completeness # --------------------------------------------------------------------------- class TestJsonSchema: """JSON output must include all documented fields.""" def test_j1_strict_field_present_in_json(self, tmp_path: pathlib.Path) -> None: """J1: The 'strict' key must appear in --json output.""" repo = _init_repo(tmp_path) _commit(repo, idx=0) result = _invoke(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "strict" in data, f"'strict' missing from JSON: {list(data.keys())}" def test_j2_strict_false_by_default(self, tmp_path: pathlib.Path) -> None: """J2: Default invocation must have strict=False in JSON output.""" repo = _init_repo(tmp_path) _commit(repo, idx=0) data = json.loads(_invoke(repo, "--json").output) assert data["strict"] is False def test_j2b_strict_true_when_flag_passed(self, tmp_path: pathlib.Path) -> None: """J2b: --strict must set strict=True in JSON output.""" repo = _init_repo(tmp_path) _commit(repo, idx=0) data = json.loads(_invoke(repo, "--strict", "--json").output) assert data["strict"] is True def test_j3_check_objects_present_in_all_branches(self, tmp_path: pathlib.Path) -> None: """J3: 'check_objects' must appear whether or not --no-objects is passed.""" repo = _init_repo(tmp_path) _commit(repo, idx=0) d1 = json.loads(_invoke(repo, "--json").output) d2 = json.loads(_invoke(repo, "--no-objects", "--json").output) assert "check_objects" in d1 assert "check_objects" in d2 assert d1["check_objects"] is True assert d2["check_objects"] is False def test_j4_all_documented_fields_present(self, tmp_path: pathlib.Path) -> None: """J4: Every field documented in the command docstring appears in JSON.""" repo = _init_repo(tmp_path) _commit(repo, idx=0) data = json.loads(_invoke(repo, "--json").output) required_fields = { "repo_id", "refs_checked", "commits_checked", "snapshots_checked", "objects_checked", "signatures_checked", "all_ok", "nothing_checked", "check_objects", "strict", "branch", "fail_fast", "failures", "shallow_commits", "promised_objects", "is_shallow", "promisor_remotes", "muse_version", "schema", "exit_code", "duration_ms", "timestamp", "warnings", } missing = required_fields - set(data.keys()) assert not missing, f"JSON output missing fields: {missing}" def test_j5_failures_list_empty_when_all_ok(self, tmp_path: pathlib.Path) -> None: """J5: When all_ok=True the failures list must be [] (not absent).""" repo = _init_repo(tmp_path) _commit(repo, idx=0) data = json.loads(_invoke(repo, "--json").output) assert data["all_ok"] is True assert data["failures"] == [] # --------------------------------------------------------------------------- # C — Counter accuracy # --------------------------------------------------------------------------- class TestCounterAccuracy: """Verify that all counters are accurate, deduplicated, and never inflated.""" def test_c1_same_object_across_two_snapshots_counted_once( self, tmp_path: pathlib.Path ) -> None: """C1: One object ID referenced by two different snapshots must appear in objects_checked exactly once (deduplication via verified_objects set).""" repo = _init_repo(tmp_path) shared_content = b"shared object" shared_obj = blob_id(shared_content) write_object(repo, shared_obj, shared_content) # Commit 0: snapshot references shared_obj manifest0 = {"shared.py": shared_obj} snap0 = compute_snapshot_id(manifest0) write_snapshot(repo, SnapshotRecord(snapshot_id=snap0, manifest=manifest0)) committed_at0 = datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc) cid0 = compute_commit_id( parent_ids=[], snapshot_id=snap0, message="c0", committed_at_iso=committed_at0.isoformat(), ) write_commit(repo, CommitRecord( commit_id=cid0, branch="main", snapshot_id=snap0, message="c0", committed_at=committed_at0, )) # Commit 1: different snapshot, same shared_obj extra_content = b"extra" extra_obj = blob_id(extra_content) write_object(repo, extra_obj, extra_content) manifest1 = {"shared.py": shared_obj, "extra.py": extra_obj} snap1 = compute_snapshot_id(manifest1) write_snapshot(repo, SnapshotRecord(snapshot_id=snap1, manifest=manifest1)) committed_at1 = datetime.datetime(2026, 5, 2, tzinfo=datetime.timezone.utc) cid1 = compute_commit_id( parent_ids=[cid0], snapshot_id=snap1, message="c1", committed_at_iso=committed_at1.isoformat(), ) write_commit(repo, CommitRecord( commit_id=cid1, branch="main", snapshot_id=snap1, message="c1", committed_at=committed_at1, parent_commit_id=cid0, )) (heads_dir(repo) / "main").write_text(cid1) result = run_verify(repo, check_objects=True) assert result["all_ok"] is True # 2 distinct objects: shared_obj + extra_obj (shared_obj counted once) assert result["objects_checked"] == 2, ( f"Expected 2 unique objects, got {result['objects_checked']}" ) def test_c2_signatures_checked_exact_count(self, tmp_path: pathlib.Path) -> None: """C2: signatures_checked equals exactly the number of commits with a non-empty 'signature' field.""" repo = _init_repo(tmp_path) key = _make_key() prev = None for i in range(5): # Alternate: even-indexed commits are signed pk = key if i % 2 == 0 else None prev = _commit(repo, parent_id=prev, idx=i, private_key=pk) result = run_verify(repo) # Commits 0, 2, 4 are signed → 3 signatures_checked assert result["all_ok"] is True, f"Failures: {result['failures']}" assert result["signatures_checked"] == 3 def test_c3_hash_mismatch_error_shows_both_ids(self, tmp_path: pathlib.Path) -> None: """C3: A hash mismatch failure's error string contains both the expected short ID and the actual short ID computed from the corrupted content.""" repo = _init_repo(tmp_path) content = b"original content for c3" obj_id = blob_id(content) write_object(repo, obj_id, content) manifest = {"c3.py": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 5, 3, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="c3", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="c3", committed_at=committed_at, )) (heads_dir(repo) / "main").write_text(cid) corrupt_content = b"corrupted replacement bytes for c3" obj_file = object_path(repo, obj_id) os.chmod(obj_file, 0o644) obj_file.write_bytes(corrupt_content) result = run_verify(repo, check_objects=True) assert result["all_ok"] is False obj_failures = [f for f in result["failures"] if f["kind"] == "object"] assert obj_failures error_msg = obj_failures[0]["error"] # Error must mention the expected short ID or the actual short ID actual_id = blob_id(corrupt_content) assert short_id(obj_id) in error_msg or short_id(actual_id) in error_msg, ( f"Error message should contain short ID reference: {error_msg!r}" ) # Keyword "mismatch" or "corruption" must appear assert "mismatch" in error_msg or "corruption" in error_msg, ( f"Error must describe the problem: {error_msg!r}" ) def test_c4_commit_count_accurate_on_diamond_dag(self, tmp_path: pathlib.Path) -> None: """C4: Diamond-shaped DAG (main←A, main←B, merge←A+B) — each commit counted exactly once despite two paths to common ancestors.""" repo = _init_repo(tmp_path) # Common ancestor base_cid, _ = self._make_raw_commit(repo, "main", idx=0, parent=None) # Two diverging branches a_cid, _ = self._make_raw_commit(repo, "feat-a", idx=1, parent=base_cid) b_cid, _ = self._make_raw_commit(repo, "feat-b", idx=2, parent=base_cid) # Merge merge_content = b"diamond-merge" merge_obj = blob_id(merge_content) write_object(repo, merge_obj, merge_content) manifest = {"m.py": merge_obj} snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 5, 10, tzinfo=datetime.timezone.utc) merge_cid = compute_commit_id( parent_ids=[a_cid, b_cid], snapshot_id=snap_id, message="merge", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=merge_cid, branch="main", snapshot_id=snap_id, message="merge", committed_at=committed_at, parent_commit_id=a_cid, parent2_commit_id=b_cid, )) (heads_dir(repo) / "main").write_text(merge_cid) result = run_verify(repo) assert result["all_ok"] is True # 4 commits: base + A + B + merge — base must NOT be counted twice assert result["commits_checked"] == 4, ( f"Expected 4 commits in diamond DAG, got {result['commits_checked']}" ) def _make_raw_commit( self, root: pathlib.Path, branch: str, idx: int, parent: str | None, ) -> tuple[str, str]: content = f"raw-{branch}-{idx}".encode() obj_id = blob_id(content) write_object(root, obj_id, content) manifest = {f"{branch}_{idx}.py": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = ( datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(hours=idx) ) parent_ids = [parent] if parent else [] cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"{branch} {idx}", committed_at_iso=committed_at.isoformat(), ) write_commit(root, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=f"{branch} {idx}", committed_at=committed_at, parent_commit_id=parent, )) (ref_path(root, branch)).write_text(cid) return cid, obj_id