test_verify_extended.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
| 1 | """Extended integrity tests for ``muse verify`` / ``run_verify``. |
| 2 | |
| 3 | Covers gaps left by test_cmd_verify.py, test_cmd_verify_hardening.py, and |
| 4 | test_cmd_verify_shallow.py: |
| 5 | |
| 6 | Signature verification (run_verify BFS path, not verify-commit): |
| 7 | S1 Valid Ed25519 signature — run_verify must NOT report a failure. |
| 8 | S2 Tampered commit payload — signature present but payload changed → kind="signature". |
| 9 | S3 Wrong signature bytes (bit-flip) — Ed25519 rejects → kind="signature". |
| 10 | S4 Unknown signature algorithm prefix (e.g. "ml-dsa-65:…") → kind="signature". |
| 11 | S5 Unknown public-key algorithm prefix (e.g. "ml-dsa-65:…") → kind="key_missing". |
| 12 | S6 Malformed public-key base64 ("ed25519:!!!") → decode_pubkey ValueError |
| 13 | → pub_bytes=b"" → kind="signature". |
| 14 | S7 Empty signer_public_key ("") → sig_algo("") == "" → kind="key_missing". |
| 15 | S8 signatures_checked counts only signed commits (not unsigned ones). |
| 16 | S9 Mixed chain: some commits signed, some unsigned — only signed ones verified. |
| 17 | S10 Error message for sig failure names agent_id and key_id. |
| 18 | |
| 19 | Merge commit (parent2_commit_id): |
| 20 | M1 Merge commit: both parent chains walked, all objects verified. |
| 21 | M2 Merge commit: corrupt object in second-parent chain detected. |
| 22 | M3 Merge commit: missing second-parent commit → kind="commit". |
| 23 | |
| 24 | Ref path traversal security: |
| 25 | P1 branch="../../evil" — _branch_refs cannot escape heads dir. |
| 26 | P2 branch="/absolute/path" — does not read outside the repo. |
| 27 | P3 Ref file with binary (non-UTF-8) content — decode errors handled gracefully. |
| 28 | |
| 29 | IOError / TOCTOU: |
| 30 | T1 Object file deleted between object_state returning PRESENT and _rehash_object |
| 31 | reading it — OSError propagates; CLI exits with code 3. |
| 32 | |
| 33 | JSON schema completeness: |
| 34 | J1 --json output includes "strict" key. |
| 35 | J2 --json "strict" is False by default, True when --strict is passed. |
| 36 | J3 --json "check_objects" key present in all branches. |
| 37 | |
| 38 | Counter accuracy: |
| 39 | C1 Same object ID referenced by two different snapshots counted once. |
| 40 | C2 signatures_checked equals the number of commits with a non-empty signature. |
| 41 | C3 hash-mismatch error message contains both expected and actual short IDs. |
| 42 | """ |
| 43 | |
| 44 | from __future__ import annotations |
| 45 | |
| 46 | import datetime |
| 47 | import json |
| 48 | import os |
| 49 | import pathlib |
| 50 | import threading |
| 51 | from collections.abc import Mapping |
| 52 | from typing import Any |
| 53 | |
| 54 | import pytest |
| 55 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 56 | |
| 57 | from muse.core.object_store import object_path, write_object |
| 58 | from muse.core.provenance import ( |
| 59 | encode_public_key, |
| 60 | provenance_payload, |
| 61 | sign_commit_ed25519, |
| 62 | sign_commit_record, |
| 63 | verify_commit_ed25519, |
| 64 | ) |
| 65 | from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id |
| 66 | from muse.core.commits import ( |
| 67 | CommitRecord, |
| 68 | write_commit, |
| 69 | ) |
| 70 | from muse.core.snapshots import ( |
| 71 | SnapshotRecord, |
| 72 | write_snapshot, |
| 73 | ) |
| 74 | from muse.core.types import blob_id, encode_pubkey, long_id, short_id |
| 75 | from muse.core.verify import run_verify |
| 76 | from muse.core.paths import heads_dir, muse_dir, ref_path |
| 77 | |
| 78 | runner = CliRunner() |
| 79 | _REPO_ID = "verify-extended-test" |
| 80 | |
| 81 | |
| 82 | # --------------------------------------------------------------------------- |
| 83 | # Shared helpers |
| 84 | # --------------------------------------------------------------------------- |
| 85 | |
| 86 | |
| 87 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 88 | muse = muse_dir(path) |
| 89 | for d in ("commits", "snapshots", "objects", "refs/heads"): |
| 90 | (muse / d).mkdir(parents=True, exist_ok=True) |
| 91 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 92 | (muse / "repo.json").write_text( |
| 93 | json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" |
| 94 | ) |
| 95 | return path |
| 96 | |
| 97 | |
| 98 | |
| 99 | |
| 100 | def _make_key() -> "Any": |
| 101 | """Generate a fresh Ed25519 private key.""" |
| 102 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 103 | return Ed25519PrivateKey.generate() |
| 104 | |
| 105 | |
| 106 | def _commit( |
| 107 | root: pathlib.Path, |
| 108 | *, |
| 109 | branch: str = "main", |
| 110 | parent_id: str | None = None, |
| 111 | parent2_id: str | None = None, |
| 112 | content: bytes = b"data", |
| 113 | idx: int = 0, |
| 114 | private_key: "Any | None" = None, |
| 115 | agent_id: str = "test-agent", |
| 116 | ) -> str: |
| 117 | """Write a complete commit (object + snapshot + commit record) and update branch ref. |
| 118 | |
| 119 | When *private_key* is given the commit is Ed25519-signed. |
| 120 | Returns the commit_id. |
| 121 | """ |
| 122 | raw = content + idx.to_bytes(4, "big") |
| 123 | obj_id = blob_id(raw) |
| 124 | write_object(root, obj_id, raw) |
| 125 | manifest = {f"file_{idx}.txt": obj_id} |
| 126 | snap_id = compute_snapshot_id(manifest) |
| 127 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 128 | |
| 129 | committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(hours=idx) |
| 130 | parent_ids = [pid for pid in [parent_id, parent2_id] if pid] |
| 131 | |
| 132 | # signer_public_key is included in the commit_id hash — must derive it BEFORE |
| 133 | # calling compute_commit_id so the stored record passes _verify_commit_id. |
| 134 | pub_b64 = "" |
| 135 | if private_key is not None: |
| 136 | _, pub_b64 = encode_public_key(private_key) |
| 137 | |
| 138 | commit_id = compute_commit_id( |
| 139 | parent_ids=parent_ids, |
| 140 | snapshot_id=snap_id, |
| 141 | message=f"commit {idx}", |
| 142 | committed_at_iso=committed_at.isoformat(), |
| 143 | signer_public_key=pub_b64, |
| 144 | ) |
| 145 | |
| 146 | sig = key_id = "" |
| 147 | if private_key is not None: |
| 148 | sig, _, key_id = sign_commit_record( |
| 149 | commit_id, |
| 150 | agent_id=agent_id, |
| 151 | private_key=private_key, |
| 152 | committed_at=committed_at.isoformat(), |
| 153 | ) |
| 154 | |
| 155 | write_commit(root, CommitRecord( |
| 156 | commit_id=commit_id, |
| 157 | branch=branch, |
| 158 | snapshot_id=snap_id, |
| 159 | message=f"commit {idx}", |
| 160 | committed_at=committed_at, |
| 161 | parent_commit_id=parent_id, |
| 162 | parent2_commit_id=parent2_id, |
| 163 | agent_id=agent_id if private_key else "", |
| 164 | signature=sig, |
| 165 | signer_public_key=pub_b64, |
| 166 | signer_key_id=key_id, |
| 167 | )) |
| 168 | (ref_path(root, branch)).write_text(commit_id, encoding="utf-8") |
| 169 | return commit_id |
| 170 | |
| 171 | |
| 172 | def _env(root: pathlib.Path) -> Mapping[str, str]: |
| 173 | return {"MUSE_REPO_ROOT": str(root)} |
| 174 | |
| 175 | |
| 176 | def _force_write_commit(root: pathlib.Path, record: "CommitRecord") -> None: |
| 177 | """Overwrite a commit object unconditionally, bypassing write_commit idempotency. |
| 178 | |
| 179 | Use only in tests that need to inject tampered records after a valid commit |
| 180 | has already been written. |
| 181 | """ |
| 182 | import json as _json |
| 183 | import os |
| 184 | from muse.core.object_store import object_path |
| 185 | commit_file = object_path(root, record.commit_id) |
| 186 | commit_file.parent.mkdir(parents=True, exist_ok=True) |
| 187 | payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode() |
| 188 | if commit_file.exists(): |
| 189 | os.chmod(commit_file, 0o644) |
| 190 | commit_file.write_bytes(b"commit " + str(len(payload)).encode() + b"\0" + payload) |
| 191 | |
| 192 | |
| 193 | def _invoke(root: pathlib.Path, *args: str) -> InvokeResult: |
| 194 | from muse.cli.app import main as cli_main |
| 195 | return runner.invoke(cli_main, ["verify", *args], env=_env(root)) |
| 196 | |
| 197 | |
| 198 | # --------------------------------------------------------------------------- |
| 199 | # S — Signature verification in run_verify BFS |
| 200 | # --------------------------------------------------------------------------- |
| 201 | |
| 202 | |
| 203 | class TestSignatureVerification: |
| 204 | """Ed25519 signature verification exercised through run_verify's BFS walk. |
| 205 | |
| 206 | These tests cover the signature branch inside run_verify, which is |
| 207 | distinct from the muse verify-commit command (a separate plumbing tool). |
| 208 | """ |
| 209 | |
| 210 | def test_s1_valid_signed_commit_passes(self, tmp_path: pathlib.Path) -> None: |
| 211 | """S1: A properly signed commit must not produce any failure.""" |
| 212 | repo = _init_repo(tmp_path) |
| 213 | key = _make_key() |
| 214 | _commit(repo, private_key=key, idx=0) |
| 215 | |
| 216 | result = run_verify(repo) |
| 217 | |
| 218 | assert result["all_ok"] is True, f"Unexpected failures: {result['failures']}" |
| 219 | assert result["signatures_checked"] == 1 |
| 220 | assert result["failures"] == [] |
| 221 | |
| 222 | def test_s2_tampered_payload_detected(self, tmp_path: pathlib.Path) -> None: |
| 223 | """S2: A commit whose agent_id differs from what was signed → signature invalid.""" |
| 224 | repo = _init_repo(tmp_path) |
| 225 | key = _make_key() |
| 226 | cid = _commit(repo, private_key=key, agent_id="real-agent", idx=0) |
| 227 | |
| 228 | # Re-read and tamper the commit record: change agent_id to something |
| 229 | # different from what was signed. The signature still references the |
| 230 | # original agent_id in the provenance_payload. |
| 231 | from muse.core.commits import read_commit |
| 232 | original = read_commit(repo, cid) |
| 233 | assert original is not None |
| 234 | tampered = CommitRecord( |
| 235 | commit_id=original.commit_id, |
| 236 | branch=original.branch, |
| 237 | snapshot_id=original.snapshot_id, |
| 238 | message=original.message, |
| 239 | committed_at=original.committed_at, |
| 240 | agent_id="evil-agent", # tampered — differs from what was signed |
| 241 | signature=original.signature, |
| 242 | signer_public_key=original.signer_public_key, |
| 243 | signer_key_id=original.signer_key_id, |
| 244 | ) |
| 245 | _force_write_commit(repo, tampered) |
| 246 | |
| 247 | result = run_verify(repo) |
| 248 | |
| 249 | assert result["all_ok"] is False |
| 250 | sig_failures = [f for f in result["failures"] if f["kind"] == "signature"] |
| 251 | assert len(sig_failures) >= 1, f"Expected signature failure, got: {result['failures']}" |
| 252 | |
| 253 | def test_s3_bit_flip_in_signature_bytes_detected(self, tmp_path: pathlib.Path) -> None: |
| 254 | """S3: One bit flipped in the stored signature bytes → Ed25519 rejects → kind='signature'.""" |
| 255 | repo = _init_repo(tmp_path) |
| 256 | key = _make_key() |
| 257 | cid = _commit(repo, private_key=key, idx=0) |
| 258 | |
| 259 | from muse.core.commits import read_commit |
| 260 | from muse.core.types import decode_sig, encode_sig |
| 261 | original = read_commit(repo, cid) |
| 262 | assert original is not None |
| 263 | _, sig_bytes = decode_sig(original.signature) |
| 264 | # Flip one bit in the middle of the signature |
| 265 | sig_list = bytearray(sig_bytes) |
| 266 | sig_list[32] ^= 0x01 |
| 267 | bad_sig = encode_sig("ed25519", bytes(sig_list)) |
| 268 | |
| 269 | tampered = CommitRecord( |
| 270 | commit_id=original.commit_id, |
| 271 | branch=original.branch, |
| 272 | snapshot_id=original.snapshot_id, |
| 273 | message=original.message, |
| 274 | committed_at=original.committed_at, |
| 275 | agent_id=original.agent_id, |
| 276 | signature=bad_sig, |
| 277 | signer_public_key=original.signer_public_key, |
| 278 | signer_key_id=original.signer_key_id, |
| 279 | ) |
| 280 | _force_write_commit(repo, tampered) |
| 281 | |
| 282 | result = run_verify(repo) |
| 283 | |
| 284 | assert result["all_ok"] is False |
| 285 | kinds = [f["kind"] for f in result["failures"]] |
| 286 | assert "signature" in kinds, f"Expected 'signature' failure, got: {kinds}" |
| 287 | |
| 288 | def test_s4_unknown_signature_algorithm_reported(self, tmp_path: pathlib.Path) -> None: |
| 289 | """S4: sig='ml-dsa-65:…' (unknown algorithm) → kind='signature', not 'key_missing'.""" |
| 290 | repo = _init_repo(tmp_path) |
| 291 | key = _make_key() |
| 292 | _, pub_b64 = encode_public_key(key) |
| 293 | content = b"unknown-sig-alg" |
| 294 | obj_id = blob_id(content) |
| 295 | write_object(repo, obj_id, content) |
| 296 | manifest = {"f.txt": obj_id} |
| 297 | snap_id = compute_snapshot_id(manifest) |
| 298 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 299 | committed_at = datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc) |
| 300 | cid = compute_commit_id( |
| 301 | parent_ids=[], snapshot_id=snap_id, |
| 302 | message="unknown alg", committed_at_iso=committed_at.isoformat(), |
| 303 | signer_public_key=pub_b64, |
| 304 | ) |
| 305 | write_commit(repo, CommitRecord( |
| 306 | commit_id=cid, branch="main", |
| 307 | snapshot_id=snap_id, message="unknown alg", committed_at=committed_at, |
| 308 | signature=f"ml-dsa-65:{'A' * 80}", # unknown prefix |
| 309 | signer_public_key=pub_b64, # valid ed25519 key |
| 310 | agent_id="future-agent", |
| 311 | )) |
| 312 | (heads_dir(repo) / "main").write_text(cid) |
| 313 | |
| 314 | result = run_verify(repo) |
| 315 | |
| 316 | assert result["all_ok"] is False |
| 317 | kinds = [f["kind"] for f in result["failures"]] |
| 318 | assert "signature" in kinds, f"Expected 'signature', got: {kinds}" |
| 319 | assert "key_missing" not in kinds |
| 320 | |
| 321 | def test_s5_unknown_pubkey_algorithm_reported_as_key_missing(self, tmp_path: pathlib.Path) -> None: |
| 322 | """S5: sig='ed25519:…' but pub_raw='ml-dsa-65:…' → kind='key_missing', not 'signature'.""" |
| 323 | repo = _init_repo(tmp_path) |
| 324 | key = _make_key() |
| 325 | content = b"unknown-pk-alg" |
| 326 | obj_id = blob_id(content) |
| 327 | write_object(repo, obj_id, content) |
| 328 | manifest = {"f.txt": obj_id} |
| 329 | snap_id = compute_snapshot_id(manifest) |
| 330 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 331 | committed_at = datetime.datetime(2026, 3, 2, tzinfo=datetime.timezone.utc) |
| 332 | unknown_pk = f"ml-dsa-65:{'A' * 80}" |
| 333 | cid = compute_commit_id( |
| 334 | parent_ids=[], snapshot_id=snap_id, |
| 335 | message="unknown pk alg", committed_at_iso=committed_at.isoformat(), |
| 336 | signer_public_key=unknown_pk, |
| 337 | ) |
| 338 | payload = provenance_payload(cid, agent_id="future-agent", |
| 339 | committed_at=committed_at.isoformat()) |
| 340 | valid_sig = sign_commit_ed25519(payload, key) |
| 341 | write_commit(repo, CommitRecord( |
| 342 | commit_id=cid, branch="main", |
| 343 | snapshot_id=snap_id, message="unknown pk alg", committed_at=committed_at, |
| 344 | signature=valid_sig, |
| 345 | signer_public_key=unknown_pk, # unknown prefix on key |
| 346 | agent_id="future-agent", |
| 347 | )) |
| 348 | (heads_dir(repo) / "main").write_text(cid) |
| 349 | |
| 350 | result = run_verify(repo) |
| 351 | |
| 352 | assert result["all_ok"] is False |
| 353 | kinds = [f["kind"] for f in result["failures"]] |
| 354 | assert "key_missing" in kinds, f"Expected 'key_missing', got: {kinds}" |
| 355 | assert "signature" not in kinds |
| 356 | |
| 357 | def test_s6_malformed_pubkey_base64_causes_signature_failure(self, tmp_path: pathlib.Path) -> None: |
| 358 | """S6: pub_raw='ed25519:!!!' (valid prefix, invalid base64) → decode_pubkey raises |
| 359 | ValueError → pub_bytes=b'' → kind='signature'.""" |
| 360 | repo = _init_repo(tmp_path) |
| 361 | key = _make_key() |
| 362 | content = b"bad-b64-key" |
| 363 | obj_id = blob_id(content) |
| 364 | write_object(repo, obj_id, content) |
| 365 | manifest = {"f.txt": obj_id} |
| 366 | snap_id = compute_snapshot_id(manifest) |
| 367 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 368 | committed_at = datetime.datetime(2026, 3, 3, tzinfo=datetime.timezone.utc) |
| 369 | bad_pk = "ed25519:!!!notvalidbase64!!!" |
| 370 | cid = compute_commit_id( |
| 371 | parent_ids=[], snapshot_id=snap_id, |
| 372 | message="bad b64 key", committed_at_iso=committed_at.isoformat(), |
| 373 | signer_public_key=bad_pk, |
| 374 | ) |
| 375 | payload = provenance_payload(cid, agent_id="agent", |
| 376 | committed_at=committed_at.isoformat()) |
| 377 | valid_sig = sign_commit_ed25519(payload, key) |
| 378 | write_commit(repo, CommitRecord( |
| 379 | commit_id=cid, branch="main", |
| 380 | snapshot_id=snap_id, message="bad b64 key", committed_at=committed_at, |
| 381 | signature=valid_sig, |
| 382 | signer_public_key=bad_pk, # prefix ok, content not valid base64 |
| 383 | agent_id="agent", |
| 384 | )) |
| 385 | (heads_dir(repo) / "main").write_text(cid) |
| 386 | |
| 387 | result = run_verify(repo) |
| 388 | |
| 389 | assert result["all_ok"] is False |
| 390 | kinds = [f["kind"] for f in result["failures"]] |
| 391 | assert "signature" in kinds, f"Expected 'signature' failure, got: {kinds}" |
| 392 | |
| 393 | def test_s7_empty_signer_public_key_reported_as_key_missing(self, tmp_path: pathlib.Path) -> None: |
| 394 | """S7: signer_public_key='' → sig_algo('') == '' != 'ed25519' → kind='key_missing'.""" |
| 395 | repo = _init_repo(tmp_path) |
| 396 | key = _make_key() |
| 397 | content = b"no-pk" |
| 398 | obj_id = blob_id(content) |
| 399 | write_object(repo, obj_id, content) |
| 400 | manifest = {"f.txt": obj_id} |
| 401 | snap_id = compute_snapshot_id(manifest) |
| 402 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 403 | committed_at = datetime.datetime(2026, 3, 4, tzinfo=datetime.timezone.utc) |
| 404 | cid = compute_commit_id( |
| 405 | parent_ids=[], snapshot_id=snap_id, |
| 406 | message="no pk", committed_at_iso=committed_at.isoformat(), |
| 407 | ) |
| 408 | payload = provenance_payload(cid, committed_at=committed_at.isoformat()) |
| 409 | valid_sig = sign_commit_ed25519(payload, key) |
| 410 | write_commit(repo, CommitRecord( |
| 411 | commit_id=cid, branch="main", |
| 412 | snapshot_id=snap_id, message="no pk", committed_at=committed_at, |
| 413 | signature=valid_sig, |
| 414 | signer_public_key="", # key rotation / missing key |
| 415 | )) |
| 416 | (heads_dir(repo) / "main").write_text(cid) |
| 417 | |
| 418 | result = run_verify(repo) |
| 419 | |
| 420 | assert result["all_ok"] is False |
| 421 | kinds = [f["kind"] for f in result["failures"]] |
| 422 | assert "key_missing" in kinds, f"Expected 'key_missing', got: {kinds}" |
| 423 | assert "signature" not in kinds |
| 424 | |
| 425 | def test_s8_unsigned_commits_not_counted(self, tmp_path: pathlib.Path) -> None: |
| 426 | """S8: Commits with empty signature field do not increment signatures_checked.""" |
| 427 | repo = _init_repo(tmp_path) |
| 428 | prev = _commit(repo, idx=0) # unsigned |
| 429 | _commit(repo, parent_id=prev, idx=1) # unsigned |
| 430 | |
| 431 | result = run_verify(repo) |
| 432 | |
| 433 | assert result["all_ok"] is True |
| 434 | assert result["signatures_checked"] == 0 |
| 435 | |
| 436 | def test_s9_mixed_chain_counts_only_signed(self, tmp_path: pathlib.Path) -> None: |
| 437 | """S9: 3-commit chain: commit 0 unsigned, commit 1 signed, commit 2 unsigned. |
| 438 | signatures_checked must be exactly 1 and all_ok must be True.""" |
| 439 | repo = _init_repo(tmp_path) |
| 440 | key = _make_key() |
| 441 | c0 = _commit(repo, idx=0) # unsigned |
| 442 | c1 = _commit(repo, parent_id=c0, idx=1, private_key=key) # signed |
| 443 | _commit(repo, parent_id=c1, idx=2) # unsigned |
| 444 | |
| 445 | result = run_verify(repo) |
| 446 | |
| 447 | assert result["all_ok"] is True, f"Failures: {result['failures']}" |
| 448 | assert result["signatures_checked"] == 1 |
| 449 | assert result["commits_checked"] == 3 |
| 450 | |
| 451 | def test_s10_signature_failure_error_names_agent(self, tmp_path: pathlib.Path) -> None: |
| 452 | """S10: Signature failure error message includes agent_id and key reference.""" |
| 453 | repo = _init_repo(tmp_path) |
| 454 | key = _make_key() |
| 455 | cid = _commit(repo, private_key=key, agent_id="my-special-agent", idx=0) |
| 456 | |
| 457 | # Tamper the signature bytes so verification fails |
| 458 | from muse.core.commits import read_commit |
| 459 | from muse.core.types import decode_sig, encode_sig |
| 460 | original = read_commit(repo, cid) |
| 461 | assert original is not None |
| 462 | _, sig_bytes = decode_sig(original.signature) |
| 463 | bad_sig = encode_sig("ed25519", bytes([sig_bytes[0] ^ 0xFF]) + sig_bytes[1:]) |
| 464 | _force_write_commit(repo, CommitRecord( |
| 465 | commit_id=original.commit_id, |
| 466 | branch=original.branch, snapshot_id=original.snapshot_id, |
| 467 | message=original.message, committed_at=original.committed_at, |
| 468 | agent_id="my-special-agent", |
| 469 | signature=bad_sig, |
| 470 | signer_public_key=original.signer_public_key, |
| 471 | signer_key_id=original.signer_key_id, |
| 472 | )) |
| 473 | |
| 474 | result = run_verify(repo) |
| 475 | |
| 476 | assert result["all_ok"] is False |
| 477 | sig_failures = [f for f in result["failures"] if f["kind"] == "signature"] |
| 478 | assert sig_failures |
| 479 | error_msg = sig_failures[0]["error"] |
| 480 | assert "my-special-agent" in error_msg or short_id(cid) in error_msg, ( |
| 481 | f"Error message should name agent or commit: {error_msg!r}" |
| 482 | ) |
| 483 | |
| 484 | |
| 485 | # --------------------------------------------------------------------------- |
| 486 | # M — Merge commits (parent2_commit_id) |
| 487 | # --------------------------------------------------------------------------- |
| 488 | |
| 489 | |
| 490 | class TestMergeCommits: |
| 491 | """parent2_commit_id in the BFS walk — both parent chains verified.""" |
| 492 | |
| 493 | def _make_branch_commit( |
| 494 | self, |
| 495 | root: pathlib.Path, |
| 496 | branch: str, |
| 497 | idx: int, |
| 498 | parent_id: str | None = None, |
| 499 | ) -> tuple[str, str]: |
| 500 | """Create a commit on *branch* and return (commit_id, obj_id).""" |
| 501 | content = f"branch-{branch}-{idx}".encode() |
| 502 | obj_id = blob_id(content) |
| 503 | write_object(root, obj_id, content) |
| 504 | manifest = {f"{branch}_{idx}.py": obj_id} |
| 505 | snap_id = compute_snapshot_id(manifest) |
| 506 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 507 | committed_at = ( |
| 508 | datetime.datetime(2026, 2, 1, tzinfo=datetime.timezone.utc) |
| 509 | + datetime.timedelta(hours=idx) |
| 510 | ) |
| 511 | parent_ids = [parent_id] if parent_id else [] |
| 512 | cid = compute_commit_id( |
| 513 | parent_ids=parent_ids, snapshot_id=snap_id, |
| 514 | message=f"{branch} commit {idx}", committed_at_iso=committed_at.isoformat(), |
| 515 | ) |
| 516 | write_commit(root, CommitRecord( |
| 517 | commit_id=cid, branch=branch, |
| 518 | snapshot_id=snap_id, message=f"{branch} commit {idx}", |
| 519 | committed_at=committed_at, parent_commit_id=parent_id, |
| 520 | )) |
| 521 | (ref_path(root, branch)).write_text(cid) |
| 522 | return cid, obj_id |
| 523 | |
| 524 | def test_m1_merge_commit_both_parents_walked(self, tmp_path: pathlib.Path) -> None: |
| 525 | """M1: A merge commit with two parents; objects from both parent chains verified.""" |
| 526 | repo = _init_repo(tmp_path) |
| 527 | |
| 528 | # main branch: one commit |
| 529 | main_cid, main_obj = self._make_branch_commit(repo, "main", idx=0) |
| 530 | # feat branch: one commit |
| 531 | feat_cid, feat_obj = self._make_branch_commit(repo, "feat", idx=1) |
| 532 | |
| 533 | # Merge commit: parent1=main, parent2=feat |
| 534 | merge_content = b"merge-content" |
| 535 | merge_obj = blob_id(merge_content) |
| 536 | write_object(repo, merge_obj, merge_content) |
| 537 | manifest = {"merge.py": merge_obj} |
| 538 | snap_id = compute_snapshot_id(manifest) |
| 539 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 540 | committed_at = datetime.datetime(2026, 2, 1, 12, tzinfo=datetime.timezone.utc) |
| 541 | merge_cid = compute_commit_id( |
| 542 | parent_ids=[main_cid, feat_cid], |
| 543 | snapshot_id=snap_id, |
| 544 | message="merge feat into main", |
| 545 | committed_at_iso=committed_at.isoformat(), |
| 546 | ) |
| 547 | write_commit(repo, CommitRecord( |
| 548 | commit_id=merge_cid, branch="main", |
| 549 | snapshot_id=snap_id, message="merge feat into main", |
| 550 | committed_at=committed_at, parent_commit_id=main_cid, |
| 551 | parent2_commit_id=feat_cid, |
| 552 | )) |
| 553 | (heads_dir(repo) / "main").write_text(merge_cid) |
| 554 | |
| 555 | result = run_verify(repo) |
| 556 | |
| 557 | assert result["all_ok"] is True, f"Failures: {result['failures']}" |
| 558 | # 3 distinct commits: main + feat + merge (feat also has its own branch ref) |
| 559 | assert result["commits_checked"] >= 3 |
| 560 | # All 3 objects must have been checked |
| 561 | assert result["objects_checked"] >= 3 |
| 562 | |
| 563 | def test_m2_corrupt_object_in_second_parent_chain_detected( |
| 564 | self, tmp_path: pathlib.Path |
| 565 | ) -> None: |
| 566 | """M2: Corruption in an object reachable only via parent2 is caught.""" |
| 567 | repo = _init_repo(tmp_path) |
| 568 | |
| 569 | main_cid, _ = self._make_branch_commit(repo, "main", idx=0) |
| 570 | feat_cid, feat_obj = self._make_branch_commit(repo, "feat", idx=1) |
| 571 | |
| 572 | # Corrupt the feat object |
| 573 | feat_file = object_path(repo, feat_obj) |
| 574 | os.chmod(feat_file, 0o644) |
| 575 | feat_file.write_bytes(b"corrupted by test") |
| 576 | |
| 577 | # Merge with feat as parent2 |
| 578 | merge_content = b"merge" |
| 579 | merge_obj = blob_id(merge_content) |
| 580 | write_object(repo, merge_obj, merge_content) |
| 581 | manifest = {"m.py": merge_obj} |
| 582 | snap_id = compute_snapshot_id(manifest) |
| 583 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 584 | committed_at = datetime.datetime(2026, 2, 2, tzinfo=datetime.timezone.utc) |
| 585 | merge_cid = compute_commit_id( |
| 586 | parent_ids=[main_cid, feat_cid], snapshot_id=snap_id, |
| 587 | message="merge", committed_at_iso=committed_at.isoformat(), |
| 588 | ) |
| 589 | write_commit(repo, CommitRecord( |
| 590 | commit_id=merge_cid, branch="main", |
| 591 | snapshot_id=snap_id, message="merge", committed_at=committed_at, |
| 592 | parent_commit_id=main_cid, parent2_commit_id=feat_cid, |
| 593 | )) |
| 594 | (heads_dir(repo) / "main").write_text(merge_cid) |
| 595 | |
| 596 | result = run_verify(repo, check_objects=True) |
| 597 | |
| 598 | assert result["all_ok"] is False |
| 599 | object_failures = [f for f in result["failures"] if f["kind"] == "object"] |
| 600 | assert any(f["id"] == feat_obj for f in object_failures), ( |
| 601 | f"Expected feat_obj failure, got: {object_failures}" |
| 602 | ) |
| 603 | |
| 604 | def test_m3_missing_second_parent_commit_reported(self, tmp_path: pathlib.Path) -> None: |
| 605 | """M3: parent2_commit_id points to a nonexistent commit → kind='commit'.""" |
| 606 | repo = _init_repo(tmp_path) |
| 607 | |
| 608 | main_cid, _ = self._make_branch_commit(repo, "main", idx=0) |
| 609 | phantom_parent = long_id("d" * 64) # will be stubbed — verify must report it missing |
| 610 | from muse.core.commits import commit_path as _cp |
| 611 | _stub = _cp(repo, phantom_parent) |
| 612 | _stub.parent.mkdir(parents=True, exist_ok=True) |
| 613 | _stub.write_bytes(b"") # unreadable stub; verify walks it and reports missing |
| 614 | |
| 615 | merge_content = b"merge-phantom" |
| 616 | merge_obj = blob_id(merge_content) |
| 617 | write_object(repo, merge_obj, merge_content) |
| 618 | manifest = {"mp.py": merge_obj} |
| 619 | snap_id = compute_snapshot_id(manifest) |
| 620 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 621 | committed_at = datetime.datetime(2026, 2, 3, tzinfo=datetime.timezone.utc) |
| 622 | merge_cid = compute_commit_id( |
| 623 | parent_ids=[main_cid, phantom_parent], snapshot_id=snap_id, |
| 624 | message="merge phantom", committed_at_iso=committed_at.isoformat(), |
| 625 | ) |
| 626 | write_commit(repo, CommitRecord( |
| 627 | commit_id=merge_cid, branch="main", |
| 628 | snapshot_id=snap_id, message="merge phantom", committed_at=committed_at, |
| 629 | parent_commit_id=main_cid, parent2_commit_id=phantom_parent, |
| 630 | )) |
| 631 | (heads_dir(repo) / "main").write_text(merge_cid) |
| 632 | |
| 633 | result = run_verify(repo) |
| 634 | |
| 635 | assert result["all_ok"] is False |
| 636 | commit_failures = [f for f in result["failures"] if f["kind"] == "commit"] |
| 637 | assert any(f["id"] == phantom_parent for f in commit_failures), ( |
| 638 | f"Expected commit failure for phantom parent: {commit_failures}" |
| 639 | ) |
| 640 | |
| 641 | |
| 642 | # --------------------------------------------------------------------------- |
| 643 | # P — Path traversal and ref security |
| 644 | # --------------------------------------------------------------------------- |
| 645 | |
| 646 | |
| 647 | class TestRefSecurity: |
| 648 | """Ref file security: path traversal, binary content, oversized files.""" |
| 649 | |
| 650 | def test_p1_path_traversal_via_branch_param_does_not_escape( |
| 651 | self, tmp_path: pathlib.Path |
| 652 | ) -> None: |
| 653 | """P1: branch='../../evil' cannot traverse outside the heads directory. |
| 654 | |
| 655 | _branch_refs constructs heads_dir / branch. Python's Path resolves |
| 656 | '..' lazily — 'heads/../../evil' normalises to '.muse/evil' which |
| 657 | should not exist. The result must be an empty ref list (not a |
| 658 | failure, just nothing found). |
| 659 | """ |
| 660 | repo = _init_repo(tmp_path) |
| 661 | # Write a file the traversal might try to read |
| 662 | evil_file = muse_dir(repo) / "evil" |
| 663 | evil_file.write_text(long_id("a" * 64)) |
| 664 | |
| 665 | from muse.core.verify import _branch_refs # type: ignore[attr-defined] |
| 666 | refs = _branch_refs(repo, branch="../../evil") |
| 667 | # Must return empty — either the file didn't resolve into heads/ or |
| 668 | # was not found. The critical requirement: no crash and no refs returned |
| 669 | # that would cause BFS to walk attacker-controlled data as a commit ID. |
| 670 | assert refs == [] or all(commit_id.startswith("sha256:") for _, commit_id in refs) |
| 671 | |
| 672 | def test_p2_absolute_path_branch_does_not_read_outside_repo( |
| 673 | self, tmp_path: pathlib.Path |
| 674 | ) -> None: |
| 675 | """P2: branch='/etc/passwd' is joined to heads_dir — Path joins strip leading / |
| 676 | on some platforms or produce a heads_dir-relative path. Either way no |
| 677 | sensitive file is read and no crash occurs.""" |
| 678 | repo = _init_repo(tmp_path) |
| 679 | |
| 680 | from muse.core.verify import _branch_refs # type: ignore[attr-defined] |
| 681 | # Must not raise; may return [] or a ref if heads_dir//etc/passwd exists (it won't) |
| 682 | try: |
| 683 | refs = _branch_refs(repo, branch="/etc/passwd") |
| 684 | except Exception as exc: |
| 685 | pytest.fail(f"_branch_refs raised on absolute branch path: {exc}") |
| 686 | # No valid commit ID should come from /etc/passwd content |
| 687 | for _, cid in refs: |
| 688 | assert cid.startswith("sha256:") and len(cid) == 71, ( |
| 689 | f"Suspicious commit ID from absolute path branch: {cid!r}" |
| 690 | ) |
| 691 | |
| 692 | def test_p3_binary_ref_file_handled_gracefully(self, tmp_path: pathlib.Path) -> None: |
| 693 | """P3: Binary (non-UTF-8) content in a ref file is decoded with errors='replace' |
| 694 | and produces an invalid ref ID → kind='ref' failure, no crash.""" |
| 695 | repo = _init_repo(tmp_path) |
| 696 | # Write binary garbage to the ref file |
| 697 | (heads_dir(repo) / "main").write_bytes(b"\xff\xfe\x00binary\x01garbage") |
| 698 | |
| 699 | result = run_verify(repo) |
| 700 | |
| 701 | # Must not raise; the invalid ref ID should be reported |
| 702 | assert result["all_ok"] is False |
| 703 | kinds = [f["kind"] for f in result["failures"]] |
| 704 | assert "ref" in kinds, f"Expected 'ref' failure for binary content, got: {kinds}" |
| 705 | |
| 706 | |
| 707 | # --------------------------------------------------------------------------- |
| 708 | # T — IOError / TOCTOU |
| 709 | # --------------------------------------------------------------------------- |
| 710 | |
| 711 | |
| 712 | class TestIOErrorHandling: |
| 713 | """IOError propagation from _rehash_object and related paths.""" |
| 714 | |
| 715 | def test_t1_object_deleted_between_state_check_and_read( |
| 716 | self, tmp_path: pathlib.Path |
| 717 | ) -> None: |
| 718 | """T1: Object file exists when object_state runs but is deleted before |
| 719 | _rehash_object opens it → OSError propagates through run_verify. |
| 720 | The CLI must exit with code 3 (INTERNAL_ERROR).""" |
| 721 | repo = _init_repo(tmp_path) |
| 722 | content = b"will be deleted" |
| 723 | obj_id = blob_id(content) |
| 724 | write_object(repo, obj_id, content) |
| 725 | manifest = {"toctou.py": obj_id} |
| 726 | snap_id = compute_snapshot_id(manifest) |
| 727 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 728 | committed_at = datetime.datetime(2026, 4, 10, tzinfo=datetime.timezone.utc) |
| 729 | cid = compute_commit_id( |
| 730 | parent_ids=[], snapshot_id=snap_id, |
| 731 | message="toctou test", committed_at_iso=committed_at.isoformat(), |
| 732 | ) |
| 733 | write_commit(repo, CommitRecord( |
| 734 | commit_id=cid, branch="main", |
| 735 | snapshot_id=snap_id, message="toctou test", committed_at=committed_at, |
| 736 | )) |
| 737 | (heads_dir(repo) / "main").write_text(cid) |
| 738 | |
| 739 | # Delete the object after writing it (simulate TOCTOU) |
| 740 | obj_file = object_path(repo, obj_id) |
| 741 | os.chmod(obj_file, 0o644) |
| 742 | os.unlink(obj_file) |
| 743 | |
| 744 | # run_verify itself should raise OSError (not silently swallow it) |
| 745 | # OR handle it and produce a failure. Both are acceptable; what's NOT |
| 746 | # acceptable is silently reporting all_ok=True. |
| 747 | try: |
| 748 | result = run_verify(repo, check_objects=True) |
| 749 | # If run_verify catches the OSError internally, it must report a failure |
| 750 | assert result["all_ok"] is False, ( |
| 751 | "run_verify must not report all_ok=True when an object is unreadable" |
| 752 | ) |
| 753 | except OSError: |
| 754 | # Also acceptable: OSError propagates to CLI level |
| 755 | pass |
| 756 | |
| 757 | |
| 758 | # --------------------------------------------------------------------------- |
| 759 | # J — JSON schema completeness |
| 760 | # --------------------------------------------------------------------------- |
| 761 | |
| 762 | |
| 763 | class TestJsonSchema: |
| 764 | """JSON output must include all documented fields.""" |
| 765 | |
| 766 | def test_j1_strict_field_present_in_json(self, tmp_path: pathlib.Path) -> None: |
| 767 | """J1: The 'strict' key must appear in --json output.""" |
| 768 | repo = _init_repo(tmp_path) |
| 769 | _commit(repo, idx=0) |
| 770 | result = _invoke(repo, "--json") |
| 771 | assert result.exit_code == 0 |
| 772 | data = json.loads(result.output) |
| 773 | assert "strict" in data, f"'strict' missing from JSON: {list(data.keys())}" |
| 774 | |
| 775 | def test_j2_strict_false_by_default(self, tmp_path: pathlib.Path) -> None: |
| 776 | """J2: Default invocation must have strict=False in JSON output.""" |
| 777 | repo = _init_repo(tmp_path) |
| 778 | _commit(repo, idx=0) |
| 779 | data = json.loads(_invoke(repo, "--json").output) |
| 780 | assert data["strict"] is False |
| 781 | |
| 782 | def test_j2b_strict_true_when_flag_passed(self, tmp_path: pathlib.Path) -> None: |
| 783 | """J2b: --strict must set strict=True in JSON output.""" |
| 784 | repo = _init_repo(tmp_path) |
| 785 | _commit(repo, idx=0) |
| 786 | data = json.loads(_invoke(repo, "--strict", "--json").output) |
| 787 | assert data["strict"] is True |
| 788 | |
| 789 | def test_j3_check_objects_present_in_all_branches(self, tmp_path: pathlib.Path) -> None: |
| 790 | """J3: 'check_objects' must appear whether or not --no-objects is passed.""" |
| 791 | repo = _init_repo(tmp_path) |
| 792 | _commit(repo, idx=0) |
| 793 | d1 = json.loads(_invoke(repo, "--json").output) |
| 794 | d2 = json.loads(_invoke(repo, "--no-objects", "--json").output) |
| 795 | assert "check_objects" in d1 |
| 796 | assert "check_objects" in d2 |
| 797 | assert d1["check_objects"] is True |
| 798 | assert d2["check_objects"] is False |
| 799 | |
| 800 | def test_j4_all_documented_fields_present(self, tmp_path: pathlib.Path) -> None: |
| 801 | """J4: Every field documented in the command docstring appears in JSON.""" |
| 802 | repo = _init_repo(tmp_path) |
| 803 | _commit(repo, idx=0) |
| 804 | data = json.loads(_invoke(repo, "--json").output) |
| 805 | required_fields = { |
| 806 | "repo_id", "refs_checked", "commits_checked", "snapshots_checked", |
| 807 | "objects_checked", "signatures_checked", "all_ok", "nothing_checked", |
| 808 | "check_objects", "strict", "branch", "fail_fast", "failures", |
| 809 | "shallow_commits", "promised_objects", "is_shallow", "promisor_remotes", |
| 810 | "muse_version", "schema", "exit_code", "duration_ms", "timestamp", |
| 811 | "warnings", |
| 812 | } |
| 813 | missing = required_fields - set(data.keys()) |
| 814 | assert not missing, f"JSON output missing fields: {missing}" |
| 815 | |
| 816 | def test_j5_failures_list_empty_when_all_ok(self, tmp_path: pathlib.Path) -> None: |
| 817 | """J5: When all_ok=True the failures list must be [] (not absent).""" |
| 818 | repo = _init_repo(tmp_path) |
| 819 | _commit(repo, idx=0) |
| 820 | data = json.loads(_invoke(repo, "--json").output) |
| 821 | assert data["all_ok"] is True |
| 822 | assert data["failures"] == [] |
| 823 | |
| 824 | |
| 825 | # --------------------------------------------------------------------------- |
| 826 | # C — Counter accuracy |
| 827 | # --------------------------------------------------------------------------- |
| 828 | |
| 829 | |
| 830 | class TestCounterAccuracy: |
| 831 | """Verify that all counters are accurate, deduplicated, and never inflated.""" |
| 832 | |
| 833 | def test_c1_same_object_across_two_snapshots_counted_once( |
| 834 | self, tmp_path: pathlib.Path |
| 835 | ) -> None: |
| 836 | """C1: One object ID referenced by two different snapshots must appear |
| 837 | in objects_checked exactly once (deduplication via verified_objects set).""" |
| 838 | repo = _init_repo(tmp_path) |
| 839 | shared_content = b"shared object" |
| 840 | shared_obj = blob_id(shared_content) |
| 841 | write_object(repo, shared_obj, shared_content) |
| 842 | |
| 843 | # Commit 0: snapshot references shared_obj |
| 844 | manifest0 = {"shared.py": shared_obj} |
| 845 | snap0 = compute_snapshot_id(manifest0) |
| 846 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap0, manifest=manifest0)) |
| 847 | committed_at0 = datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc) |
| 848 | cid0 = compute_commit_id( |
| 849 | parent_ids=[], snapshot_id=snap0, |
| 850 | message="c0", committed_at_iso=committed_at0.isoformat(), |
| 851 | ) |
| 852 | write_commit(repo, CommitRecord( |
| 853 | commit_id=cid0, branch="main", |
| 854 | snapshot_id=snap0, message="c0", committed_at=committed_at0, |
| 855 | )) |
| 856 | |
| 857 | # Commit 1: different snapshot, same shared_obj |
| 858 | extra_content = b"extra" |
| 859 | extra_obj = blob_id(extra_content) |
| 860 | write_object(repo, extra_obj, extra_content) |
| 861 | manifest1 = {"shared.py": shared_obj, "extra.py": extra_obj} |
| 862 | snap1 = compute_snapshot_id(manifest1) |
| 863 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap1, manifest=manifest1)) |
| 864 | committed_at1 = datetime.datetime(2026, 5, 2, tzinfo=datetime.timezone.utc) |
| 865 | cid1 = compute_commit_id( |
| 866 | parent_ids=[cid0], snapshot_id=snap1, |
| 867 | message="c1", committed_at_iso=committed_at1.isoformat(), |
| 868 | ) |
| 869 | write_commit(repo, CommitRecord( |
| 870 | commit_id=cid1, branch="main", |
| 871 | snapshot_id=snap1, message="c1", committed_at=committed_at1, |
| 872 | parent_commit_id=cid0, |
| 873 | )) |
| 874 | (heads_dir(repo) / "main").write_text(cid1) |
| 875 | |
| 876 | result = run_verify(repo, check_objects=True) |
| 877 | |
| 878 | assert result["all_ok"] is True |
| 879 | # 2 distinct objects: shared_obj + extra_obj (shared_obj counted once) |
| 880 | assert result["objects_checked"] == 2, ( |
| 881 | f"Expected 2 unique objects, got {result['objects_checked']}" |
| 882 | ) |
| 883 | |
| 884 | def test_c2_signatures_checked_exact_count(self, tmp_path: pathlib.Path) -> None: |
| 885 | """C2: signatures_checked equals exactly the number of commits with |
| 886 | a non-empty 'signature' field.""" |
| 887 | repo = _init_repo(tmp_path) |
| 888 | key = _make_key() |
| 889 | prev = None |
| 890 | for i in range(5): |
| 891 | # Alternate: even-indexed commits are signed |
| 892 | pk = key if i % 2 == 0 else None |
| 893 | prev = _commit(repo, parent_id=prev, idx=i, private_key=pk) |
| 894 | |
| 895 | result = run_verify(repo) |
| 896 | |
| 897 | # Commits 0, 2, 4 are signed → 3 signatures_checked |
| 898 | assert result["all_ok"] is True, f"Failures: {result['failures']}" |
| 899 | assert result["signatures_checked"] == 3 |
| 900 | |
| 901 | def test_c3_hash_mismatch_error_shows_both_ids(self, tmp_path: pathlib.Path) -> None: |
| 902 | """C3: A hash mismatch failure's error string contains both the expected |
| 903 | short ID and the actual short ID computed from the corrupted content.""" |
| 904 | repo = _init_repo(tmp_path) |
| 905 | content = b"original content for c3" |
| 906 | obj_id = blob_id(content) |
| 907 | write_object(repo, obj_id, content) |
| 908 | manifest = {"c3.py": obj_id} |
| 909 | snap_id = compute_snapshot_id(manifest) |
| 910 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 911 | committed_at = datetime.datetime(2026, 5, 3, tzinfo=datetime.timezone.utc) |
| 912 | cid = compute_commit_id( |
| 913 | parent_ids=[], snapshot_id=snap_id, |
| 914 | message="c3", committed_at_iso=committed_at.isoformat(), |
| 915 | ) |
| 916 | write_commit(repo, CommitRecord( |
| 917 | commit_id=cid, branch="main", |
| 918 | snapshot_id=snap_id, message="c3", committed_at=committed_at, |
| 919 | )) |
| 920 | (heads_dir(repo) / "main").write_text(cid) |
| 921 | |
| 922 | corrupt_content = b"corrupted replacement bytes for c3" |
| 923 | obj_file = object_path(repo, obj_id) |
| 924 | os.chmod(obj_file, 0o644) |
| 925 | obj_file.write_bytes(corrupt_content) |
| 926 | |
| 927 | result = run_verify(repo, check_objects=True) |
| 928 | |
| 929 | assert result["all_ok"] is False |
| 930 | obj_failures = [f for f in result["failures"] if f["kind"] == "object"] |
| 931 | assert obj_failures |
| 932 | error_msg = obj_failures[0]["error"] |
| 933 | # Error must mention the expected short ID or the actual short ID |
| 934 | actual_id = blob_id(corrupt_content) |
| 935 | assert short_id(obj_id) in error_msg or short_id(actual_id) in error_msg, ( |
| 936 | f"Error message should contain short ID reference: {error_msg!r}" |
| 937 | ) |
| 938 | # Keyword "mismatch" or "corruption" must appear |
| 939 | assert "mismatch" in error_msg or "corruption" in error_msg, ( |
| 940 | f"Error must describe the problem: {error_msg!r}" |
| 941 | ) |
| 942 | |
| 943 | def test_c4_commit_count_accurate_on_diamond_dag(self, tmp_path: pathlib.Path) -> None: |
| 944 | """C4: Diamond-shaped DAG (main←A, main←B, merge←A+B) — each commit |
| 945 | counted exactly once despite two paths to common ancestors.""" |
| 946 | repo = _init_repo(tmp_path) |
| 947 | |
| 948 | # Common ancestor |
| 949 | base_cid, _ = self._make_raw_commit(repo, "main", idx=0, parent=None) |
| 950 | # Two diverging branches |
| 951 | a_cid, _ = self._make_raw_commit(repo, "feat-a", idx=1, parent=base_cid) |
| 952 | b_cid, _ = self._make_raw_commit(repo, "feat-b", idx=2, parent=base_cid) |
| 953 | # Merge |
| 954 | merge_content = b"diamond-merge" |
| 955 | merge_obj = blob_id(merge_content) |
| 956 | write_object(repo, merge_obj, merge_content) |
| 957 | manifest = {"m.py": merge_obj} |
| 958 | snap_id = compute_snapshot_id(manifest) |
| 959 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 960 | committed_at = datetime.datetime(2026, 5, 10, tzinfo=datetime.timezone.utc) |
| 961 | merge_cid = compute_commit_id( |
| 962 | parent_ids=[a_cid, b_cid], snapshot_id=snap_id, |
| 963 | message="merge", committed_at_iso=committed_at.isoformat(), |
| 964 | ) |
| 965 | write_commit(repo, CommitRecord( |
| 966 | commit_id=merge_cid, branch="main", |
| 967 | snapshot_id=snap_id, message="merge", committed_at=committed_at, |
| 968 | parent_commit_id=a_cid, parent2_commit_id=b_cid, |
| 969 | )) |
| 970 | (heads_dir(repo) / "main").write_text(merge_cid) |
| 971 | |
| 972 | result = run_verify(repo) |
| 973 | |
| 974 | assert result["all_ok"] is True |
| 975 | # 4 commits: base + A + B + merge — base must NOT be counted twice |
| 976 | assert result["commits_checked"] == 4, ( |
| 977 | f"Expected 4 commits in diamond DAG, got {result['commits_checked']}" |
| 978 | ) |
| 979 | |
| 980 | def _make_raw_commit( |
| 981 | self, |
| 982 | root: pathlib.Path, |
| 983 | branch: str, |
| 984 | idx: int, |
| 985 | parent: str | None, |
| 986 | ) -> tuple[str, str]: |
| 987 | content = f"raw-{branch}-{idx}".encode() |
| 988 | obj_id = blob_id(content) |
| 989 | write_object(root, obj_id, content) |
| 990 | manifest = {f"{branch}_{idx}.py": obj_id} |
| 991 | snap_id = compute_snapshot_id(manifest) |
| 992 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 993 | committed_at = ( |
| 994 | datetime.datetime(2026, 5, 1, tzinfo=datetime.timezone.utc) |
| 995 | + datetime.timedelta(hours=idx) |
| 996 | ) |
| 997 | parent_ids = [parent] if parent else [] |
| 998 | cid = compute_commit_id( |
| 999 | parent_ids=parent_ids, snapshot_id=snap_id, |
| 1000 | message=f"{branch} {idx}", committed_at_iso=committed_at.isoformat(), |
| 1001 | ) |
| 1002 | write_commit(root, CommitRecord( |
| 1003 | commit_id=cid, branch=branch, |
| 1004 | snapshot_id=snap_id, message=f"{branch} {idx}", |
| 1005 | committed_at=committed_at, parent_commit_id=parent, |
| 1006 | )) |
| 1007 | (ref_path(root, branch)).write_text(cid) |
| 1008 | return cid, obj_id |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
29 days ago