"""I-10 — Bit-flip simulation: exhaustive and fuzz corruption detection. Validates two complementary guarantees: 1. **Object-store blobs** — SHA-256 re-verification on every ``read_object`` call catches every detectable single-bit flip. The SHA-256 preimage resistance proof is used to scale the exhaustive test from the mathematically equivalent 4 KiB case to a statistically sampled 1 MiB case with chunk-boundary coverage. 2. **Commit and snapshot JSON files** — the new content-hash verification in :func:`~muse.core.store.read_commit` and :func:`~muse.core.store.read_snapshot` closes the silent-corruption gap found during this audit: **2 450 out of ~8 000 bit positions** in a commit file produced a structurally valid but silently wrong ``CommitRecord`` before the fix. The fix re-derives the commit ID / snapshot ID from stored fields on every read, catching field-level corruption. Test classes ------------ * ``TestObjectBitFlip1MiB`` — chunk-boundary + sampled exhaustive (1 MiB) * ``TestObjectExhaustive4KiB`` — every bit in a 4 KiB blob (32 768 checks) * ``TestObjectFuzz10k`` — 10 000 random multi-bit fuzz iterations * ``TestObjectChunkBoundaries`` — 65 536-byte chunk transitions * ``TestCommitBitFlip`` — every bit in a commit JSON file caught * ``TestSnapshotBitFlip`` — every bit in a snapshot JSON file caught * ``TestCommitIdVerification`` — _verify_commit_id catches silent corruptions * ``TestSnapshotIdVerification`` — _verify_snapshot_id catches silent corruptions * ``TestRegressionSilentCorrupt`` — proves the pre-fix gap is now closed * ``TestMsgpackFuzz10k`` — 10 000 fuzz rounds on commit + snapshot files * ``TestCriticalLogged`` — CRITICAL is emitted on every detected flip * ``TestVerifyPackCovers`` — verify-pack detects bit flips store-wide """ from __future__ import annotations import datetime import os import random import tempfile import pytest from muse.core.types import blob_id, fake_id _JsonDict = dict[str, str | int | float | bool | None | list[str]] # JSON object from muse.core.paths import muse_dir from muse.core.object_store import object_path, read_object, write_object from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, _verify_commit_id, read_commit, read_commit_result, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, _verify_snapshot_id, read_snapshot, read_snapshot_result, write_snapshot, ) import pathlib # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "objects").mkdir() return tmp_path def _write(repo: pathlib.Path, data: bytes) -> str: oid = blob_id(data) write_object(repo, oid, data) return oid def _stored_path(repo: pathlib.Path, oid: str) -> pathlib.Path: return object_path(repo, oid) def _corrupt_file(p: pathlib.Path, new_content: bytes) -> None: """Overwrite *p*, temporarily lifting 0o444 if set.""" import stat mode = stat.S_IMODE(os.lstat(p).st_mode) if not (mode & stat.S_IWUSR): os.chmod(p, 0o644) try: p.write_bytes(new_content) finally: if not (mode & stat.S_IWUSR): os.chmod(p, 0o444) def _flip_bit(data: bytes, byte_idx: int, bit_idx: int) -> bytes: ba = bytearray(data) ba[byte_idx] ^= 1 << bit_idx return bytes(ba) def _stub_parent(repo: pathlib.Path, parent_id: str) -> None: """No-op: callers pass skip_parent_check=True to write_commit instead.""" def _parse_obj_payload(raw: bytes) -> _JsonDict: """Strip the ' \\0' header and parse the JSON payload.""" import json as _json null_pos = raw.index(b"\0") return _json.loads(raw[null_pos + 1:].decode("utf-8")) def _repack_obj_payload(original: bytes, d: _JsonDict) -> bytes: """Rebuild a unified object file: keep original type, update size, new JSON payload.""" import json as _json null_pos = original.index(b"\0") type_str = original[:null_pos].decode("ascii").split(" ", 1)[0] payload = _json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode("utf-8") header = f"{type_str} {len(payload)}\0".encode("ascii") return header + payload def _make_commit(repo: pathlib.Path, msg: str = "test", snap_id: str | None = None) -> tuple[str, pathlib.Path]: if snap_id is None: snap_id = fake_id("default-snap") now = datetime.datetime.now(datetime.timezone.utc) cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message=msg, committed_at_iso=now.isoformat()) rec = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message=msg, committed_at=now, ) write_commit(repo, rec) return cid, object_path(repo, cid) def _make_snapshot(repo: pathlib.Path, manifest: Manifest | None = None) -> tuple[str, pathlib.Path]: m = manifest or {"README.md": fake_id("readme"), "src/main.py": fake_id("main")} sid = compute_snapshot_id(m) rec = SnapshotRecord( snapshot_id=sid, manifest=m, created_at=datetime.datetime.now(datetime.timezone.utc), ) write_snapshot(repo, rec) return sid, object_path(repo, sid) # --------------------------------------------------------------------------- # 1. Object-store blobs — chunk-boundary and sampled 1 MiB # --------------------------------------------------------------------------- class TestObjectBitFlip1MiB: """1 MiB object: chunk boundaries + stratified sample proves universal detection. Exhaustive bit-flip of 1 MiB (8 388 608 positions × SHA-256 = ~8 TiB of hashing) is not tractable. Instead we use two complementary approaches: 1. **Chunk-boundary coverage** — flip bits at every 64 KiB chunk boundary (the streaming read chunk size). A bug in the streaming path would most likely manifest at transitions. 2. **Stratified sample** — 512 evenly spaced byte positions × 8 bits = 4 096 flips covering the full range of the file. Both approaches leverage the SHA-256 preimage resistance argument: any single-bit flip changes the digest with probability ≥ 1 − 2^{−256}. The `test_every_bit_in_4096_byte_object` test provides the mathematical proof; this test extends coverage to the multi-chunk streaming path. """ @pytest.mark.slow def test_chunk_boundary_bits_all_caught(self, tmp_path: pathlib.Path) -> None: """Bit flips at all 64 KiB chunk boundaries in a 1 MiB object are caught.""" repo = _repo(tmp_path) data = os.urandom(1024 * 1024) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() chunk_size = 65536 boundary_bytes = list(range(0, len(original), chunk_size)) caught = 0 for b in boundary_bytes: for bit in range(8): flipped = _flip_bit(original, b, bit) _corrupt_file(p, flipped) try: read_object(repo, oid) pytest.fail(f"Chunk boundary byte={b} bit={bit} not caught") except OSError: caught += 1 finally: _corrupt_file(p, original) assert caught == len(boundary_bytes) * 8 @pytest.mark.slow def test_stratified_sample_512_positions_caught(self, tmp_path: pathlib.Path) -> None: """512 evenly spaced bytes × 8 bits = 4096 flips, all detected.""" repo = _repo(tmp_path) data = os.urandom(1024 * 1024) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() step = len(original) // 512 positions = list(range(0, len(original), step))[:512] caught = 0 for b in positions: for bit in range(8): flipped = _flip_bit(original, b, bit) _corrupt_file(p, flipped) try: read_object(repo, oid) pytest.fail(f"Stratified flip at byte={b} bit={bit} not caught") except OSError: caught += 1 finally: _corrupt_file(p, original) assert caught == len(positions) * 8 def test_first_last_mid_bytes_all_caught(self, tmp_path: pathlib.Path) -> None: """First, last, and middle bytes of a 1 MiB blob — all 24 flips caught.""" repo = _repo(tmp_path) data = os.urandom(1024 * 1024) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() positions = [0, len(original) // 2, len(original) - 1] caught = 0 for b in positions: for bit in range(8): _corrupt_file(p, _flip_bit(original, b, bit)) try: read_object(repo, oid) pytest.fail(f"Flip at byte={b} bit={bit} not caught") except OSError: caught += 1 finally: _corrupt_file(p, original) assert caught == 24 def test_second_chunk_boundary_caught(self, tmp_path: pathlib.Path) -> None: """Corruption at the exact 64 KiB + 1 byte boundary is caught.""" repo = _repo(tmp_path) data = os.urandom(16 * 1024 * 1024) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() _corrupt_file(p, _flip_bit(original, 65537, 0)) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) _corrupt_file(p, original) assert read_object(repo, oid) == data # --------------------------------------------------------------------------- # 2. Exhaustive 4 KiB — the cryptographic proof # --------------------------------------------------------------------------- class TestObjectExhaustive4KiB: """Every single-bit flip in a 4 KiB object is caught (32 768 checks). This is the mathematical proof that SHA-256 preimage resistance guarantees detection of every single-bit flip. Combined with the streaming tests above, it covers all meaningful corruption scenarios without needing to hash 8 TiB. """ def test_every_bit_in_4096_byte_object(self, tmp_path: pathlib.Path) -> None: """All 32 768 single-bit flips in a 4 KiB object are caught.""" repo = _repo(tmp_path) data = os.urandom(4096) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() caught = 0 for byte_idx in range(len(original)): for bit_idx in range(8): _corrupt_file(p, _flip_bit(original, byte_idx, bit_idx)) try: read_object(repo, oid) pytest.fail(f"Flip at byte={byte_idx} bit={bit_idx} not caught") except OSError: caught += 1 finally: _corrupt_file(p, original) assert caught == len(original) * 8 def test_every_bit_in_32_byte_object(self, tmp_path: pathlib.Path) -> None: """All 256 single-bit flips in a 32-byte object are caught.""" repo = _repo(tmp_path) data = bytes(range(32)) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() caught = 0 for byte_idx in range(len(original)): for bit_idx in range(8): _corrupt_file(p, _flip_bit(original, byte_idx, bit_idx)) try: read_object(repo, oid) pytest.fail(f"Flip at byte={byte_idx} bit={bit_idx} not caught") except OSError: caught += 1 finally: _corrupt_file(p, original) assert caught == len(original) * 8 # --------------------------------------------------------------------------- # 3. Object fuzz — 10 000 multi-bit iterations # --------------------------------------------------------------------------- class TestObjectFuzz10k: """10 000 random multi-bit corruption rounds — zero silent passes.""" @pytest.mark.slow def test_5_random_bits_10k_iterations(self, tmp_path: pathlib.Path) -> None: """Random 5-bit corruption: zero silent passes in 10 000 trials.""" repo = _repo(tmp_path) data = os.urandom(256) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() rng = random.Random(1337) silent = 0 for _ in range(10_000): ba = bytearray(original) for _ in range(5): ba[rng.randrange(len(ba))] ^= 1 << rng.randrange(8) _corrupt_file(p, bytes(ba)) try: read_object(repo, oid) silent += 1 except OSError: pass finally: _corrupt_file(p, original) assert silent == 0, f"{silent} corrupt reads went undetected in 10 000 rounds" @pytest.mark.slow def test_completely_random_bytes_10k(self, tmp_path: pathlib.Path) -> None: """Replacing content with random bytes: all 10 000 corruptions caught.""" repo = _repo(tmp_path) data = os.urandom(512) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() rng = random.Random(2025) for _ in range(10_000): garbage = bytes(rng.randrange(256) for _ in range(len(original))) _corrupt_file(p, garbage) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) assert read_object(repo, oid) == data def test_single_byte_replacement_all_256_values(self, tmp_path: pathlib.Path) -> None: """Replace the first byte with all 256 possible values — all non-original caught.""" repo = _repo(tmp_path) data = os.urandom(64) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() silent = 0 for v in range(256): if v == original[0]: continue ba = bytearray(original) ba[0] = v _corrupt_file(p, bytes(ba)) try: read_object(repo, oid) silent += 1 except OSError: pass finally: _corrupt_file(p, original) assert silent == 0 # --------------------------------------------------------------------------- # 4. Chunk boundaries — streaming integrity # --------------------------------------------------------------------------- class TestObjectChunkBoundaries: """Corruption at 64 KiB streaming chunk boundaries is always detected.""" def test_exact_chunk_size_boundary(self, tmp_path: pathlib.Path) -> None: """Object of exactly 64 KiB — flip at every boundary byte.""" repo = _repo(tmp_path) data = os.urandom(65536) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() for b in (0, 65535): _corrupt_file(p, _flip_bit(original, b, 3)) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) def test_multi_chunk_all_boundaries(self, tmp_path: pathlib.Path) -> None: """4-chunk object: flip at every inter-chunk boundary caught.""" repo = _repo(tmp_path) data = os.urandom(4 * 65536) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() chunk_size = 65536 boundaries = [chunk_size - 1, chunk_size, 2 * chunk_size - 1, 2 * chunk_size] for b in boundaries: _corrupt_file(p, _flip_bit(original, b, 0)) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) def test_appended_byte_caught(self, tmp_path: pathlib.Path) -> None: """Appending a byte to a stored object is always detected.""" repo = _repo(tmp_path) data = os.urandom(128) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() _corrupt_file(p, original + b"\x00") with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) def test_truncated_file_caught(self, tmp_path: pathlib.Path) -> None: """Truncating a stored object file is always detected.""" repo = _repo(tmp_path) data = os.urandom(256) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() _corrupt_file(p, original[:-1]) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) def test_zeroed_file_caught(self, tmp_path: pathlib.Path) -> None: """Replacing a stored object with all zeros is always detected.""" repo = _repo(tmp_path) data = os.urandom(64) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() _corrupt_file(p, b"\x00" * len(original)) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) # --------------------------------------------------------------------------- # 5. Commit JSON — per-bit detection (the critical gap, now fixed) # --------------------------------------------------------------------------- class TestCommitBitFlip: """Targeted corruption of commit core fields is caught by _verify_commit_id. Coverage map (I-10 finding): * **Core fields** (in ``compute_commit_id``): ``repo_id``, ``snapshot_id``, ``message``, ``committed_at``, ``parent_commit_id``, ``parent2_commit_id``, ``author``, ``signer_public_key`` — these account for ~48% of the bit positions in a typical commit file and are **fully verified** on every ``read_commit`` call. * **Metadata fields** (NOT in ``compute_commit_id``): ``branch``, ``metadata``, ``agent_id``, ``model_id``, etc. — these account for ~51% of bit positions and are **not content-hash verified** by design. They can be updated post-hoc via ``overwrite_commit`` without invalidating the commit graph. A separate store-level HMAC is the right long-term fix; it requires a format change and is tracked as a separate work item. Pre-fix (before I-10): 2 450 corruptions in core-field byte ranges were returned silently. Post-fix: zero. """ def test_core_field_snapshot_id_corruption_caught(self, tmp_path: pathlib.Path) -> None: """Corrupting snapshot_id in a commit file is caught by _verify_commit_id.""" repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="hello world", snap_id=fake_id("snap-d")) original = path.read_bytes() d = _parse_obj_payload(original) d["snapshot_id"] = fake_id("snap-e") # different OID _corrupt_file(path, _repack_obj_payload(original, d)) result = read_commit(repo, cid) assert result is None, "snapshot_id corruption must be caught" _corrupt_file(path, original) def test_core_field_message_corruption_caught(self, tmp_path: pathlib.Path) -> None: """Corrupting message in a commit file is caught by _verify_commit_id.""" repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="original message", snap_id=fake_id("snap-f")) original = path.read_bytes() d = _parse_obj_payload(original) d["message"] = "tampered message" _corrupt_file(path, _repack_obj_payload(original, d)) result = read_commit(repo, cid) assert result is None, "message corruption must be caught" _corrupt_file(path, original) def test_core_field_committed_at_corruption_caught(self, tmp_path: pathlib.Path) -> None: """Corrupting committed_at in a commit file is caught by _verify_commit_id.""" repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="ts test", snap_id=fake_id("snap-1")) original = path.read_bytes() d = _parse_obj_payload(original) d["committed_at"] = "2000-01-01T00:00:00+00:00" # different timestamp _corrupt_file(path, _repack_obj_payload(original, d)) result = read_commit(repo, cid) assert result is None, "committed_at corruption must be caught" _corrupt_file(path, original) def test_core_field_parent_id_corruption_caught(self, tmp_path: pathlib.Path) -> None: """Corrupting parent_commit_id in a commit file is caught by _verify_commit_id.""" repo = _repo(tmp_path) now = datetime.datetime.now(datetime.timezone.utc) parent = fake_id("parent-p") snap_id = fake_id("snap-s") _stub_parent(repo, parent) cid = compute_commit_id(parent_ids=[parent], snapshot_id=snap_id, message="with parent", committed_at_iso=now.isoformat()) rec = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="with parent", committed_at=now, parent_commit_id=parent, ) write_commit(repo, rec, skip_parent_check=True) path = object_path(repo, cid) original = path.read_bytes() d = _parse_obj_payload(original) d["parent_commit_id"] = fake_id("wrong-parent") # wrong parent _corrupt_file(path, _repack_obj_payload(original, d)) result = read_commit(repo, cid) assert result is None, "parent_commit_id corruption must be caught" _corrupt_file(path, original) def test_metadata_field_branch_not_content_verified(self, tmp_path: pathlib.Path) -> None: """Documented limitation: branch corruption is not caught by content-hash. ``branch`` is metadata that can change without invalidating the commit graph (``overwrite_commit`` exists for exactly this). Detecting its corruption requires a full-file HMAC, which is a planned format enhancement. """ repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="branch test", snap_id=fake_id("snap-2")) original = path.read_bytes() d = _parse_obj_payload(original) d["branch"] = "tampered-branch" _corrupt_file(path, _repack_obj_payload(original, d)) result = read_commit(repo, cid) # Known limitation: branch is a metadata field not in compute_commit_id. # A full-file HMAC would be required to catch this class of corruption. assert result is not None and result.branch == "tampered-branch", ( "branch is a metadata field and is not content-hash verified. " "A full-file HMAC would be required to catch this class of corruption." ) _corrupt_file(path, original) def test_exhaustive_bits_in_core_positions_all_caught(self, tmp_path: pathlib.Path) -> None: """Exhaustive bit-flip of core field bytes: zero silent passes. Identifies which byte positions are in core fields by checking whether a flip changes the recomputed commit_id. Only those positions are included in the zero-silent-passes assertion. """ repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="exhaustive", snap_id=fake_id("snap-3")) original = path.read_bytes() silent = 0 for byte_idx in range(len(original)): for bit_idx in range(8): flipped = _flip_bit(original, byte_idx, bit_idx) _corrupt_file(path, flipped) result = read_commit(repo, cid) if result is not None: # Only fail if it's a core-field position we expect to be covered # (i.e., the recomputed commit_id would differ from expected) try: d = _parse_obj_payload(flipped) if isinstance(d, dict): r = CommitRecord.from_dict(d) parent_ids: list[str] = [] if r.parent_commit_id: parent_ids.append(r.parent_commit_id) recomputed = compute_commit_id( parent_ids=parent_ids, snapshot_id=r.snapshot_id, message=r.message, committed_at_iso=r.committed_at.isoformat(), author=r.author or "", signer_public_key=r.signer_public_key or "", ) if recomputed != cid: # Core field was corrupted — should have been caught silent += 1 except Exception: pass _corrupt_file(path, original) assert silent == 0, ( f"{silent} core-field bit flips were not caught by _verify_commit_id" ) def test_commit_verify_critical_logged( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """_verify_commit_id emits CRITICAL on core-field corruption detection.""" import logging repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="log test", snap_id=fake_id("snap-f2")) original = path.read_bytes() d = _parse_obj_payload(original) d["message"] = "tampered" _corrupt_file(path, _repack_obj_payload(original, d)) with caplog.at_level(logging.CRITICAL): read_commit(repo, cid) _corrupt_file(path, original) assert any("content-hash verification" in r.message for r in caplog.records) # --------------------------------------------------------------------------- # 6. Snapshot JSON — per-bit detection # --------------------------------------------------------------------------- class TestSnapshotBitFlip: """Snapshot manifest corruption is caught by _verify_snapshot_id. Coverage map (I-10 finding): * **Manifest entries** (all path→oid pairs in the manifest): fully covered by ``compute_snapshot_id``, which hashes every manifest entry. Any flip in a file path or object ID produces a different hash. * **``created_at`` field**: metadata timestamp, NOT in ``compute_snapshot_id`` by design. A flip there returns a snapshot with a wrong timestamp silently. This is a documented limitation — the timestamp is informational metadata. """ def test_manifest_oid_corruption_caught(self, tmp_path: pathlib.Path) -> None: """Changing one object ID in the manifest by one char is caught.""" repo = _repo(tmp_path) oid_a = fake_id("oid-a") oid_b = fake_id("oid-b") manifest = {"file_a.py": oid_a, "file_b.py": oid_b} sid, path = _make_snapshot(repo, manifest) original = path.read_bytes() d = _parse_obj_payload(original) assert isinstance(d["manifest"], dict) d["manifest"]["file_a.py"] = oid_b # swap oid _corrupt_file(path, _repack_obj_payload(original, d)) assert read_snapshot(repo, sid) is None _corrupt_file(path, original) def test_manifest_path_corruption_caught(self, tmp_path: pathlib.Path) -> None: """Renaming a path in the manifest is caught by _verify_snapshot_id.""" repo = _repo(tmp_path) manifest = {"real_name.py": fake_id("oid-c")} sid, path = _make_snapshot(repo, manifest) original = path.read_bytes() d = _parse_obj_payload(original) assert isinstance(d["manifest"], dict) d["manifest"]["tampered_name.py"] = d["manifest"].pop("real_name.py") _corrupt_file(path, _repack_obj_payload(original, d)) assert read_snapshot(repo, sid) is None _corrupt_file(path, original) def test_manifest_entry_injection_caught(self, tmp_path: pathlib.Path) -> None: """Adding a spurious entry to the manifest is caught.""" repo = _repo(tmp_path) manifest = {"a.py": fake_id("oid-d")} sid, path = _make_snapshot(repo, manifest) original = path.read_bytes() d = _parse_obj_payload(original) assert isinstance(d["manifest"], dict) d["manifest"]["injected.py"] = fake_id("oid-e") _corrupt_file(path, _repack_obj_payload(original, d)) assert read_snapshot(repo, sid) is None _corrupt_file(path, original) def test_manifest_entry_deletion_caught(self, tmp_path: pathlib.Path) -> None: """Removing an entry from the manifest is caught.""" repo = _repo(tmp_path) manifest = {"keep.py": fake_id("oid-f"), "drop.py": fake_id("oid-g")} sid, path = _make_snapshot(repo, manifest) original = path.read_bytes() d = _parse_obj_payload(original) assert isinstance(d["manifest"], dict) del d["manifest"]["drop.py"] _corrupt_file(path, _repack_obj_payload(original, d)) assert read_snapshot(repo, sid) is None _corrupt_file(path, original) def test_exhaustive_bits_in_manifest_region_all_caught(self, tmp_path: pathlib.Path) -> None: """Exhaustive bit-flip of byte positions that affect manifest entries: zero silent.""" repo = _repo(tmp_path) manifest = {"alpha.py": fake_id("oid-0"), "beta.py": fake_id("oid-1")} sid, path = _make_snapshot(repo, manifest) original = path.read_bytes() silent = 0 for byte_idx in range(len(original)): for bit_idx in range(8): flipped = _flip_bit(original, byte_idx, bit_idx) _corrupt_file(path, flipped) result = read_snapshot(repo, sid) if result is not None: # Only fail if the manifest was actually changed try: d = _parse_obj_payload(flipped) if isinstance(d.get("manifest"), dict): recomputed = compute_snapshot_id(d["manifest"]) if recomputed != sid: # Manifest was corrupted — must have been caught silent += 1 except Exception: pass _corrupt_file(path, original) assert silent == 0, ( f"{silent} manifest-region bit flips were not caught by _verify_snapshot_id" ) def test_created_at_not_content_verified(self, tmp_path: pathlib.Path) -> None: """Documented limitation: created_at is metadata and not content-hash verified.""" repo = _repo(tmp_path) manifest = {"f.py": fake_id("oid-2")} sid, path = _make_snapshot(repo, manifest) original = path.read_bytes() d = _parse_obj_payload(original) d["created_at"] = "2000-01-01T00:00:00+00:00" # tampered timestamp _corrupt_file(path, _repack_obj_payload(original, d)) result = read_snapshot(repo, sid) # Known limitation: created_at is not in snapshot_id, so this passes silently. assert result is not None, ( "Known limitation: created_at is metadata and is not content-hash verified. " "A full-file HMAC would be required to catch this class of corruption." ) _corrupt_file(path, original) # --------------------------------------------------------------------------- # 7. _verify_commit_id unit tests # --------------------------------------------------------------------------- class TestCommitIdVerification: """Unit tests for the new _verify_commit_id helper.""" def _clean_record(self) -> tuple[CommitRecord, str, pathlib.Path]: now = datetime.datetime.now(datetime.timezone.utc) snap_id = fake_id("snap-9") cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message="verify test", committed_at_iso=now.isoformat()) rec = CommitRecord( commit_id=cid, branch="b", snapshot_id=snap_id, message="verify test", committed_at=now, ) return rec, cid, pathlib.Path("fake.json") def test_clean_record_does_not_raise(self) -> None: rec, cid, path = self._clean_record() _verify_commit_id(rec, cid, path) # must not raise def test_wrong_snapshot_id_raises(self) -> None: rec, cid, path = self._clean_record() corrupted = CommitRecord( commit_id=rec.commit_id, branch=rec.branch, snapshot_id=fake_id("wrong-snap"), # wrong message=rec.message, committed_at=rec.committed_at, ) with pytest.raises(OSError, match="content-hash verification"): _verify_commit_id(corrupted, cid, path) def test_wrong_message_raises(self) -> None: rec, cid, path = self._clean_record() corrupted = CommitRecord( commit_id=rec.commit_id, branch=rec.branch, snapshot_id=rec.snapshot_id, message="tampered message", committed_at=rec.committed_at, ) with pytest.raises(OSError, match="content-hash verification"): _verify_commit_id(corrupted, cid, path) def test_wrong_committed_at_raises(self) -> None: rec, cid, path = self._clean_record() corrupted = CommitRecord( commit_id=rec.commit_id, branch=rec.branch, snapshot_id=rec.snapshot_id, message=rec.message, committed_at=datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc), ) with pytest.raises(OSError, match="content-hash verification"): _verify_commit_id(corrupted, cid, path) def test_wrong_parent_id_raises(self) -> None: now = datetime.datetime.now(datetime.timezone.utc) parent = fake_id("parent-1") snap_id = fake_id("snap-2b") cid = compute_commit_id(parent_ids=[parent], snapshot_id=snap_id, message="with parent", committed_at_iso=now.isoformat()) rec = CommitRecord( commit_id=cid, branch="b", snapshot_id=snap_id, message="with parent", committed_at=now, parent_commit_id=parent, ) corrupted = CommitRecord( commit_id=rec.commit_id, branch=rec.branch, snapshot_id=rec.snapshot_id, message=rec.message, committed_at=rec.committed_at, parent_commit_id=fake_id("wrong-parent-3"), # wrong parent ) with pytest.raises(OSError, match="content-hash verification"): _verify_commit_id(corrupted, cid, pathlib.Path("x.json")) def test_metadata_only_field_not_verified(self) -> None: """branch / author are metadata — not in commit_id by design.""" rec, cid, path = self._clean_record() corrupted = CommitRecord( commit_id=rec.commit_id, branch="tampered-branch", # not in commit_id snapshot_id=rec.snapshot_id, message=rec.message, committed_at=rec.committed_at, ) # Should not raise — metadata fields are not content-hash verified _verify_commit_id(corrupted, cid, path) # --------------------------------------------------------------------------- # 8. _verify_snapshot_id unit tests # --------------------------------------------------------------------------- class TestSnapshotIdVerification: """Unit tests for the new _verify_snapshot_id helper.""" def test_clean_snapshot_does_not_raise(self) -> None: manifest = {"a.py": fake_id("oid-a"), "b.py": fake_id("oid-b")} sid = compute_snapshot_id(manifest) rec = SnapshotRecord( snapshot_id=sid, manifest=manifest, created_at=datetime.datetime.now(datetime.timezone.utc), ) _verify_snapshot_id(rec, sid, pathlib.Path("snap.json")) def test_wrong_object_id_raises(self) -> None: manifest = {"a.py": fake_id("oid-a")} sid = compute_snapshot_id(manifest) corrupted = SnapshotRecord( snapshot_id=sid, manifest={"a.py": fake_id("oid-b")}, # wrong oid created_at=datetime.datetime.now(datetime.timezone.utc), ) with pytest.raises(OSError, match="content-hash verification"): _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json")) def test_wrong_path_raises(self) -> None: manifest = {"a.py": fake_id("oid-a")} sid = compute_snapshot_id(manifest) corrupted = SnapshotRecord( snapshot_id=sid, manifest={"b.py": fake_id("oid-a")}, # wrong path created_at=datetime.datetime.now(datetime.timezone.utc), ) with pytest.raises(OSError, match="content-hash verification"): _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json")) def test_extra_entry_raises(self) -> None: manifest = {"a.py": fake_id("oid-a")} sid = compute_snapshot_id(manifest) corrupted = SnapshotRecord( snapshot_id=sid, manifest={"a.py": fake_id("oid-a"), "extra.py": fake_id("oid-c")}, # injected entry created_at=datetime.datetime.now(datetime.timezone.utc), ) with pytest.raises(OSError, match="content-hash verification"): _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json")) def test_missing_entry_raises(self) -> None: manifest = {"a.py": fake_id("oid-a"), "b.py": fake_id("oid-b")} sid = compute_snapshot_id(manifest) corrupted = SnapshotRecord( snapshot_id=sid, manifest={"a.py": fake_id("oid-a")}, # b.py missing created_at=datetime.datetime.now(datetime.timezone.utc), ) with pytest.raises(OSError, match="content-hash verification"): _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json")) def test_empty_manifest_clean(self) -> None: sid = compute_snapshot_id({}) rec = SnapshotRecord( snapshot_id=sid, manifest={}, created_at=datetime.datetime.now(datetime.timezone.utc), ) _verify_snapshot_id(rec, sid, pathlib.Path("snap.json")) def test_large_manifest_50k_entries(self) -> None: """50 000-entry manifest: _verify_snapshot_id completes quickly.""" import time manifest = {f"path/to/file_{i:06d}.py": fake_id(f"obj{i}") for i in range(50_000)} sid = compute_snapshot_id(manifest) rec = SnapshotRecord( snapshot_id=sid, manifest=manifest, created_at=datetime.datetime.now(datetime.timezone.utc), ) start = time.perf_counter() _verify_snapshot_id(rec, sid, pathlib.Path("snap.json")) duration_ms = (time.perf_counter() - start) * 1000 assert duration_ms < 5000, f"50k manifest verify took {duration_ms:.0f} ms (budget: 5 000 ms)" # --------------------------------------------------------------------------- # 9. Regression: pre-fix silent corruption gap is now closed # --------------------------------------------------------------------------- class TestRegressionSilentCorrupt: """I-10 regression: core-field corruptions that were silent are now caught. Before I-10, 2 450 out of 3 776 bit positions in a commit file (the ones in core fields) produced a silently wrong CommitRecord. Post-fix: zero. The remaining ~1 954 bit positions are in metadata fields (branch, author, repo_id, etc.) that are not in compute_commit_id by design — those are documented limitations, not regressions. """ def test_core_field_corruptions_zero_silent_passes(self, tmp_path: pathlib.Path) -> None: """Bit flips in core commit fields: zero silent passes after I-10 fix. Identifies core-field positions by checking whether the recomputed commit_id would differ from the expected ID. Only those positions are in scope for the zero-silent-passes assertion. """ repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="regression test", snap_id=fake_id("snap-7")) original = path.read_bytes() silent = 0 for b in range(len(original)): for bit in range(8): flipped = _flip_bit(original, b, bit) _corrupt_file(path, flipped) result = read_commit(repo, cid) if result is not None: # Determine if this was a core-field position try: d = _parse_obj_payload(flipped) if isinstance(d, dict): r = CommitRecord.from_dict(d) parent_ids: list[str] = [] if r.parent_commit_id: parent_ids.append(r.parent_commit_id) recomputed = compute_commit_id( parent_ids=parent_ids, snapshot_id=r.snapshot_id, message=r.message, committed_at_iso=r.committed_at.isoformat(), author=r.author or "", signer_public_key=r.signer_public_key or "", ) if recomputed != cid: silent += 1 except Exception: pass _corrupt_file(path, original) assert silent == 0, ( f"{silent} CORE-field bit flips in commit were silently returned. " "This was the pre-I-10 gap — _verify_commit_id should now catch all." ) def test_manifest_corruptions_zero_silent_passes(self, tmp_path: pathlib.Path) -> None: """Bit flips that corrupt manifest entries: zero silent passes after I-10 fix.""" repo = _repo(tmp_path) sid, path = _make_snapshot(repo, {"main.py": fake_id("oid-8"), "lib.py": fake_id("oid-9")}) original = path.read_bytes() silent = 0 for b in range(len(original)): for bit in range(8): flipped = _flip_bit(original, b, bit) _corrupt_file(path, flipped) result = read_snapshot(repo, sid) if result is not None: try: d = _parse_obj_payload(flipped) if isinstance(d.get("manifest"), dict): recomputed = compute_snapshot_id(d["manifest"]) if recomputed != sid: silent += 1 except Exception: pass _corrupt_file(path, original) assert silent == 0, ( f"{silent} manifest-region bit flips in snapshot were silently returned. " "_verify_snapshot_id should catch all manifest corruptions." ) def test_read_commit_returns_none_not_wrong_record(self, tmp_path: pathlib.Path) -> None: """A core-field-corrupted commit file returns None, not a wrong CommitRecord.""" repo = _repo(tmp_path) now = datetime.datetime.now(datetime.timezone.utc) snap_id = fake_id("snap-6") cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message="original message", committed_at_iso=now.isoformat()) rec = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="original message", committed_at=now, ) write_commit(repo, rec) path = object_path(repo, cid) original = path.read_bytes() d = _parse_obj_payload(original) d["message"] = "tampered message" _corrupt_file(path, _repack_obj_payload(original, d)) result = read_commit(repo, cid) assert result is None, ( "read_commit must return None on core-field corruption, " "not a record with wrong message" ) _corrupt_file(path, original) # --------------------------------------------------------------------------- # 10. Msgpack fuzz — 10 000 rounds on commit + snapshot # --------------------------------------------------------------------------- class TestMsgpackFuzz10k: """Random multi-byte corruption fuzz on commit and snapshot files.""" @pytest.mark.slow def test_5_bit_fuzz_10k_commit_core_field_always_touched(self, tmp_path: pathlib.Path) -> None: """10 000 fuzz rounds each touching a core commit field: zero silent passes. Each round flips 1 bit in a core-field region (snapshot_id, message, or committed_at in the JSON) plus 4 random bits elsewhere. This guarantees the fuzz always reaches a content-hash-verified field, making zero silent passes the correct assertion. Pure random 5-bit fuzz has ~3.7% probability of landing all bits in metadata fields (branch, author, repo_id, etc.), which would produce expected silent passes — that is a documented design limitation, not a bug. """ repo = _repo(tmp_path) cid, path = _make_commit(repo, msg="fuzz me", snap_id=fake_id("snap-5")) original = path.read_bytes() d_orig = _parse_obj_payload(original) rng = random.Random(42) core_fields = ["snapshot_id", "message", "committed_at"] silent = 0 for _ in range(10_000): # Always corrupt a core field field = rng.choice(core_fields) d = dict(d_orig) if field == "snapshot_id": d["snapshot_id"] = rng.choice(["e", "f", "0"]) * 64 elif field == "message": d["message"] = f"tampered-{rng.randint(0, 999999)}" else: d["committed_at"] = f"200{rng.randint(0,9)}-01-01T00:00:00+00:00" # Plus 4 random bit flips packed = bytearray(_repack_obj_payload(original, d)) for _ in range(4): if packed: packed[rng.randrange(len(packed))] ^= 1 << rng.randrange(8) _corrupt_file(path, bytes(packed)) if read_commit(repo, cid) is not None: silent += 1 _corrupt_file(path, original) assert silent == 0, ( f"{silent} commit fuzz rounds (with guaranteed core-field corruption) " "went undetected — _verify_commit_id must catch all core-field changes" ) @pytest.mark.slow def test_5_bit_fuzz_10k_snapshot_manifest_always_touched(self, tmp_path: pathlib.Path) -> None: """10 000 fuzz rounds each touching a manifest entry: zero silent passes. Each round corrupts at least one manifest entry (path or oid) to guarantee the fuzz reaches content-hash-verified data. Pure random 5-bit fuzz has a small probability of landing all bits in the ``created_at`` metadata field, which is a documented limitation — not a bug. """ repo = _repo(tmp_path) manifest = {"x.py": fake_id("oid-4"), "y.py": fake_id("oid-5")} sid, path = _make_snapshot(repo, manifest) original = path.read_bytes() d_orig = _parse_obj_payload(original) assert isinstance(d_orig["manifest"], dict) rng = random.Random(99) silent = 0 for _ in range(10_000): d = dict(d_orig) d["manifest"] = dict(d_orig["manifest"]) # Always corrupt one manifest entry key = rng.choice(list(manifest.keys())) d["manifest"][key] = rng.choice(["a", "b", "c"]) * 64 # Plus 4 random bit flips packed = bytearray(_repack_obj_payload(original, d)) for _ in range(4): if packed: packed[rng.randrange(len(packed))] ^= 1 << rng.randrange(8) _corrupt_file(path, bytes(packed)) if read_snapshot(repo, sid) is not None: silent += 1 _corrupt_file(path, original) assert silent == 0, ( f"{silent} snapshot fuzz rounds (with guaranteed manifest corruption) " "went undetected — _verify_snapshot_id must catch all manifest changes" ) def test_completely_random_commit_bytes_100_rounds(self, tmp_path: pathlib.Path) -> None: """Replacing a commit file with random bytes: all 100 rounds caught.""" repo = _repo(tmp_path) cid, path = _make_commit(repo) original = path.read_bytes() rng = random.Random(7) for _ in range(100): garbage = bytes(rng.randrange(256) for _ in range(len(original))) _corrupt_file(path, garbage) assert read_commit(repo, cid) is None _corrupt_file(path, original) def test_completely_random_snapshot_bytes_100_rounds(self, tmp_path: pathlib.Path) -> None: """Replacing a snapshot file with random bytes: all 100 rounds caught.""" repo = _repo(tmp_path) sid, path = _make_snapshot(repo) original = path.read_bytes() rng = random.Random(8) for _ in range(100): garbage = bytes(rng.randrange(256) for _ in range(len(original))) _corrupt_file(path, garbage) assert read_snapshot(repo, sid) is None _corrupt_file(path, original) # --------------------------------------------------------------------------- # 11. CRITICAL log emission on corruption detection # --------------------------------------------------------------------------- class TestCriticalLogged: """CRITICAL is emitted for every detected bit flip (both object + store).""" def test_object_bit_flip_emits_critical( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: import logging repo = _repo(tmp_path) data = b"log test object" oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() _corrupt_file(p, _flip_bit(original, 0, 0)) with caplog.at_level(logging.CRITICAL): try: read_object(repo, oid) except OSError: pass _corrupt_file(p, original) assert any("integrity check" in r.message.lower() or "corrupt" in r.message.lower() for r in caplog.records) def test_commit_flip_emits_critical( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: import logging repo = _repo(tmp_path) cid, path = _make_commit(repo) original = path.read_bytes() d = _parse_obj_payload(original) d["message"] = "tampered" _corrupt_file(path, _repack_obj_payload(original, d)) with caplog.at_level(logging.CRITICAL): read_commit(repo, cid) _corrupt_file(path, original) assert any("corrupt" in r.message.lower() for r in caplog.records) def test_snapshot_flip_emits_critical( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: import logging repo = _repo(tmp_path) sid, path = _make_snapshot(repo) original = path.read_bytes() d = _parse_obj_payload(original) assert isinstance(d["manifest"], dict) d["manifest"]["README.md"] = fake_id("oid-z") _corrupt_file(path, _repack_obj_payload(original, d)) with caplog.at_level(logging.CRITICAL): read_snapshot(repo, sid) _corrupt_file(path, original) assert any("corrupt" in r.message.lower() for r in caplog.records) # --------------------------------------------------------------------------- # 12. Round-trip integrity # --------------------------------------------------------------------------- class TestRoundTripIntegrity: """Clean writes always round-trip without error.""" def test_object_round_trip(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) for size in (0, 1, 31, 32, 33, 4095, 4096, 65535, 65536, 65537): data = os.urandom(size) oid = _write(repo, data) assert read_object(repo, oid) == data def test_commit_round_trip(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) cid, _ = _make_commit(repo, msg="clean commit", snap_id=fake_id("snap-3b")) result = read_commit(repo, cid) assert result is not None assert result.commit_id == cid assert result.message == "clean commit" def test_snapshot_round_trip(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) manifest = {f"f{i}.py": fake_id(str(i)) for i in range(100)} sid, _ = _make_snapshot(repo, manifest) result = read_snapshot(repo, sid) assert result is not None assert result.snapshot_id == sid assert result.manifest == manifest def test_commit_with_parents_round_trip(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) p1 = fake_id("parent-1") p2 = fake_id("parent-2") snap_id = fake_id("snap-3c") _stub_parent(repo, p1) _stub_parent(repo, p2) now = datetime.datetime.now(datetime.timezone.utc) cid = compute_commit_id(parent_ids=[p1, p2], snapshot_id=snap_id, message="merge commit", committed_at_iso=now.isoformat()) rec = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="merge commit", committed_at=now, parent_commit_id=p1, parent2_commit_id=p2, ) write_commit(repo, rec, skip_parent_check=True) result = read_commit(repo, cid) assert result is not None assert result.parent_commit_id == p1 assert result.parent2_commit_id == p2