"""Tests for Bug 9: write_snapshot accepts incoming SnapshotRecord whose snapshot_id doesn't match the hash of its manifest — creating a file that read_snapshot always reports as corrupt (hash mismatch), permanently unreadable. The symmetrical fix to Bug 8b (write_commit incoming verification): both write_commit and write_snapshot must verify the incoming record hash before touching disk. Attack scenarios: - apply_mpack receives a mpack where snapshot_id is wrong (corruption/attack) - A manually-constructed SnapshotRecord is passed with mismatched snapshot_id - An adversary injects a snapshot that checksums fine for the wrong ID Scope of tests -------------- Unit (write_snapshot incoming hash verification): - write_snapshot rejects incoming record with wrong snapshot_id (new file) - write_snapshot rejects incoming record with wrong snapshot_id (existing good file) - write_snapshot accepts valid incoming record (new file) - write_snapshot is idempotent on valid record (second call skips) - write_snapshot rejects incoming record with one wrong manifest entry - write_snapshot rejects incoming record with extra injected manifest entry - write_snapshot rejects incoming record with missing manifest entry - write_snapshot rejects incoming record with wrong directories hash Integration (apply_mpack with corrupt snapshot_id): - apply_mpack skips snapshot with wrong snapshot_id - apply_mpack does not skip valid snapshots when one is corrupt - apply_mpack: written snapshot must be readable via read_snapshot - apply_mpack: corrupt snapshot_id in mpack cannot poison existing good snapshot Security: - A mpack cannot substitute a manifest that passes a different snapshot's hash - Injected manifest entries are rejected before reaching disk Stress: - 100-snapshot mpack with one corrupt snapshot_id: 99 written, 1 skipped """ from __future__ import annotations import datetime import pathlib import pytest from muse.core.mpack import apply_mpack, MPack, SnapshotDeltaDict from muse.core.paths import muse_dir from muse.core.ids import hash_snapshot as compute_snapshot_id from muse.core.types import Manifest, NULL_COMMIT_ID, blob_id, fake_id, long_id from muse.core.snapshots import ( SnapshotRecord, read_snapshot, write_snapshot, ) _TS = datetime.datetime(2024, 6, 15, 10, 0, 0, tzinfo=datetime.timezone.utc) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: repo = tmp_path / "repo" repo.mkdir() muse_dir(repo).mkdir() return repo def _good_snap(manifest: Manifest | None = None) -> SnapshotRecord: m = manifest or {"src/main.py": long_id("a" * 64)} snap_id = compute_snapshot_id(m) return SnapshotRecord( snapshot_id=snap_id, manifest=m, directories=[], created_at=_TS, note="", ) def _snap_with_wrong_id(manifest: Manifest | None = None) -> SnapshotRecord: """SnapshotRecord whose snapshot_id doesn't match the hash of its manifest.""" m = manifest or {"src/main.py": long_id("a" * 64)} return SnapshotRecord( snapshot_id=long_id("f" * 64), # wrong — doesn't match manifest hash manifest=m, directories=[], created_at=_TS, note="", ) # ────────────────────────────────────────────────────────────────────────────── # Unit: write_snapshot incoming hash verification # ────────────────────────────────────────────────────────────────────────────── class TestWriteSnapshotIncomingVerification: def test_rejects_wrong_snapshot_id_new_file(self, tmp_path: pathlib.Path) -> None: """BUG: write_snapshot writes the bad record; read_snapshot returns None forever.""" repo = _make_repo(tmp_path) bad = _snap_with_wrong_id() with pytest.raises((ValueError, OSError)): write_snapshot(repo, bad) def test_rejects_wrong_snapshot_id_existing_good_file(self, tmp_path: pathlib.Path) -> None: """write_snapshot must not write over an existing good file with a bad incoming record. (The bad record has the same snapshot_id as good, but different manifest.) """ repo = _make_repo(tmp_path) good = _good_snap() write_snapshot(repo, good) # Construct bad record with SAME snapshot_id but different manifest bad = SnapshotRecord( snapshot_id=good.snapshot_id, # same ID manifest={"other.py": long_id("b" * 64)}, # different content — hash won't match directories=[], created_at=_TS, note="", ) with pytest.raises((ValueError, OSError)): write_snapshot(repo, bad) # Good file must still be intact stored = read_snapshot(repo, good.snapshot_id) assert stored is not None assert stored.manifest == good.manifest def test_accepts_valid_incoming_record(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) good = _good_snap() write_snapshot(repo, good) # must not raise stored = read_snapshot(repo, good.snapshot_id) assert stored is not None assert stored.snapshot_id == good.snapshot_id def test_idempotent_on_valid_record(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) good = _good_snap() write_snapshot(repo, good) write_snapshot(repo, good) # second call must not raise assert read_snapshot(repo, good.snapshot_id) is not None def test_rejects_incoming_with_wrong_object_id(self, tmp_path: pathlib.Path) -> None: """Incoming snapshot with a tampered object ID must be rejected.""" repo = _make_repo(tmp_path) good = _good_snap({"src/main.py": long_id("a" * 64)}) tampered = SnapshotRecord( snapshot_id=good.snapshot_id, # original hash manifest={"src/main.py": long_id("b" * 64)}, # different object ID directories=[], created_at=_TS, note="", ) with pytest.raises((ValueError, OSError)): write_snapshot(repo, tampered) def test_rejects_incoming_with_injected_manifest_entry(self, tmp_path: pathlib.Path) -> None: """Incoming snapshot with an injected extra file must be rejected.""" repo = _make_repo(tmp_path) good = _good_snap({"src/main.py": long_id("a" * 64)}) tampered = SnapshotRecord( snapshot_id=good.snapshot_id, manifest={"src/main.py": long_id("a" * 64), "malicious.sh": long_id("e" * 64)}, # injected directories=[], created_at=_TS, note="", ) with pytest.raises((ValueError, OSError)): write_snapshot(repo, tampered) def test_rejects_incoming_with_missing_manifest_entry(self, tmp_path: pathlib.Path) -> None: """Incoming snapshot with a removed file entry must be rejected.""" repo = _make_repo(tmp_path) good = _good_snap({"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)}) tampered = SnapshotRecord( snapshot_id=good.snapshot_id, manifest={"src/a.py": long_id("a" * 64)}, # missing src/b.py directories=[], created_at=_TS, note="", ) with pytest.raises((ValueError, OSError)): write_snapshot(repo, tampered) def test_rejects_incoming_with_wrong_directories_hash(self, tmp_path: pathlib.Path) -> None: """Incoming snapshot with different directories list must be rejected.""" repo = _make_repo(tmp_path) manifest = {"src/main.py": long_id("a" * 64)} snap_id = compute_snapshot_id(manifest, ["src"]) good = SnapshotRecord( snapshot_id=snap_id, manifest=manifest, directories=["src"], created_at=_TS, note="", ) write_snapshot(repo, good) # good write tampered = SnapshotRecord( snapshot_id=snap_id, manifest=manifest, directories=["src", "malicious"], # different directories created_at=_TS, note="", ) with pytest.raises((ValueError, OSError)): write_snapshot(repo, tampered) # ────────────────────────────────────────────────────────────────────────────── # Integration: apply_mpack with corrupt snapshot_id # ────────────────────────────────────────────────────────────────────────────── def _bundle_with_snapshots(snapshots: list[SnapshotDeltaDict]) -> MPack: return MPack(objects=[], snapshots=snapshots, commits=[], tags=[]) def _to_delta(snap: SnapshotRecord) -> SnapshotDeltaDict: """Convert a SnapshotRecord to a standalone SnapshotDeltaDict for mpack construction.""" return SnapshotDeltaDict( snapshot_id=snap.snapshot_id, parent_snapshot_id=None, delta_add=dict(snap.manifest), delta_remove=[], ) class TestApplyPackCorruptSnapshotId: def test_apply_pack_skips_snapshot_with_wrong_snapshot_id(self, tmp_path: pathlib.Path) -> None: """apply_mpack must not write a snapshot with mismatched snapshot_id.""" repo = _make_repo(tmp_path) good = _good_snap() wire = _to_delta(good) wire["snapshot_id"] = long_id("f" * 64) # mismatch mpack = _bundle_with_snapshots([wire]) apply_mpack(repo, mpack) # The corrupt entry must not be on disk, or if on disk must be unreadable result = read_snapshot(repo, long_id("f" * 64)) assert result is None, ( "SECURITY: apply_mpack wrote a snapshot with mismatched snapshot_id; " "the file is on disk but permanently unreadable." ) def test_apply_pack_valid_snapshot_is_readable(self, tmp_path: pathlib.Path) -> None: """Regression: valid snapshots must still be written and readable.""" repo = _make_repo(tmp_path) good = _good_snap() mpack = _bundle_with_snapshots([_to_delta(good)]) result = apply_mpack(repo, mpack) assert result["snapshots_written"] == 1 stored = read_snapshot(repo, good.snapshot_id) assert stored is not None assert stored.manifest == good.manifest def test_apply_pack_one_corrupt_does_not_block_valid_snapshots(self, tmp_path: pathlib.Path) -> None: """One corrupt snapshot in a mpack must not block the valid ones.""" repo = _make_repo(tmp_path) good1 = _good_snap({"a.py": long_id("a" * 64)}) good2 = _good_snap({"b.py": long_id("b" * 64)}) corrupt_wire = _to_delta(good1) corrupt_wire["snapshot_id"] = NULL_COMMIT_ID # mismatch mpack = _bundle_with_snapshots([ corrupt_wire, _to_delta(good1), _to_delta(good2), ]) result = apply_mpack(repo, mpack) assert result["snapshots_written"] >= 2 assert read_snapshot(repo, good1.snapshot_id) is not None assert read_snapshot(repo, good2.snapshot_id) is not None def test_apply_pack_corrupt_bundle_cannot_poison_existing_good_snapshot(self, tmp_path: pathlib.Path) -> None: """A malicious mpack must not be able to overwrite an existing valid snapshot.""" repo = _make_repo(tmp_path) good = _good_snap({"src/main.py": long_id("a" * 64)}) write_snapshot(repo, good) # write the good snapshot first # MPack: same snapshot_id, different (injected) delta_add — hash won't match wire = SnapshotDeltaDict( snapshot_id=good.snapshot_id, parent_snapshot_id=None, delta_add={"src/main.py": long_id("b" * 64)}, # tampered delta_remove=[], ) mpack = _bundle_with_snapshots([wire]) apply_mpack(repo, mpack) stored = read_snapshot(repo, good.snapshot_id) assert stored is not None, "Good snapshot was destroyed by malicious mpack" assert stored.manifest == good.manifest, ( f"SECURITY: manifest was overwritten by malicious mpack. " f"Expected {good.manifest}, got {stored.manifest}" ) # ────────────────────────────────────────────────────────────────────────────── # Stress: large mpack with one corrupt snapshot_id # ────────────────────────────────────────────────────────────────────────────── class TestApplyPackSnapshotStress: def test_100_snapshot_bundle_one_corrupt(self, tmp_path: pathlib.Path) -> None: """100-snapshot mpack, one with wrong snapshot_id: 99 written, no crash.""" repo = _make_repo(tmp_path) snaps = [_good_snap({f"src/f{i}.py": fake_id(f"obj-{i}")}) for i in range(100)] wires = [] corrupt_id = None for i, snap in enumerate(snaps): wire = _to_delta(snap) if i == 50: wire["snapshot_id"] = fake_id("corrupt-snap-50") corrupt_id = fake_id("corrupt-snap-50") wires.append((snap.snapshot_id, wire, i != 50)) mpack = _bundle_with_snapshots([w for _, w, _ in wires]) result = apply_mpack(repo, mpack) assert result["snapshots_written"] >= 0 # no crash # corrupt entry must not be readable if corrupt_id: assert read_snapshot(repo, corrupt_id) is None # spot-check first 5 valid snapshots for snap_id, _, is_good in wires[:5]: if is_good: assert read_snapshot(repo, snap_id) is not None, ( f"Valid snapshot {snap_id[:8]} not readable after apply_mpack" )