""" Tests for data-integrity behaviour of write_commit / write_snapshot. === Current architecture: idempotent writes, detection at read time === write_commit and write_snapshot are both idempotent: if the object already exists at object_path, the call returns immediately without modifying disk. This means: - Corruption that lands at object_path is NOT repaired by write_commit or write_snapshot. - Corruption IS detected at read time: read_commit and read_snapshot recompute the hash from stored fields and return None on mismatch. === Coverage === Unit — write_commit skips clean existing record (no regression) Unit — write_commit skips on corrupt object (idempotent) Unit — read_commit returns None for corrupt snapshot_id field Unit — read_commit returns None for corrupt message field Unit — read_commit returns None for corrupt parent_commit_id field Unit — write_commit skips on content-level hash mismatch (no OSError) Unit — write_snapshot skips clean existing record (no regression) Unit — read_snapshot returns None for corrupt manifest Data — parent chain (A→B→C): corrupt B → B unreadable, A/C readable Data — corrupting one snapshot does not affect sibling snapshots Security — corrupt snapshot_id in commit is rejected at read time Security — injected manifest entry is rejected at read time Stress — 20 concurrent write_commit calls are all idempotent Stress — 20 concurrent write_snapshot calls are all idempotent Stress — 50 sequential commits all written and readable Regression — write_commit new file works Regression — write_snapshot new file works Regression — write_commit idempotent on clean file Regression — write_snapshot idempotent on clean file """ from __future__ import annotations import datetime import json as _json import pathlib import threading import pytest from muse.core.types import Manifest, fake_id from muse.core.object_store import object_path as _obj_path from muse.core.paths import muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _DEFAULT_BLOB = fake_id("default-blob") _DEFAULT_SNAP = fake_id("default-snap") _CorruptField = dict[str, str | int | None] def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: (muse_dir(tmp_path) / "objects" / "sha256").mkdir(parents=True, exist_ok=True) return tmp_path def _ts(year: int = 2024) -> str: return f"{year}-01-01T00:00:00+00:00" def _good_commit( snapshot_id: str | None = None, message: str = "test commit", parent_commit_id: str | None = None, ts: str | None = None, ) -> "CommitRecord": from muse.core.commits import CommitRecord from muse.core.ids import hash_commit snap_id = snapshot_id or _DEFAULT_SNAP timestamp = ts or _ts() parent_ids = [parent_commit_id] if parent_commit_id else [] commit_id = hash_commit( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=timestamp, author="gabriel", ) return CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=datetime.datetime.fromisoformat(timestamp), parent_commit_id=parent_commit_id, parent2_commit_id=None, author="gabriel", metadata={}, ) def _good_snapshot(manifest: Manifest | None = None) -> "SnapshotRecord": from muse.core.snapshots import SnapshotRecord from muse.core.ids import hash_snapshot m = manifest or {"src/main.py": _DEFAULT_BLOB} snapshot_id = hash_snapshot(m) return SnapshotRecord(snapshot_id=snapshot_id, manifest=m, directories={}) def _write_corrupt_commit(repo: pathlib.Path, good: "CommitRecord", corrupt_field: _CorruptField) -> None: """Write a corrupt commit object to object_path (valid header, wrong field values).""" base = { "commit_id": good.commit_id, "repo_id": "test-repo", "branch": "main", "snapshot_id": good.snapshot_id, "message": good.message, "committed_at": good.committed_at.isoformat(), "parent_commit_id": good.parent_commit_id, "parent2_commit_id": None, "author": "gabriel", "metadata": {}, "reviewed_by": [], } base.update(corrupt_field) payload = _json.dumps(base, separators=(",", ":")).encode() path = _obj_path(repo, good.commit_id) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(f"commit {len(payload)}\0".encode() + payload) def _write_corrupt_snapshot(repo: pathlib.Path, good: "SnapshotRecord", corrupt_manifest: Manifest) -> None: """Write a corrupt snapshot object to object_path (valid header, wrong manifest).""" record = { "snapshot_id": good.snapshot_id, "manifest": corrupt_manifest, "directories": {}, } payload = _json.dumps(record, separators=(",", ":")).encode() path = _obj_path(repo, good.snapshot_id) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(f"snapshot {len(payload)}\0".encode() + payload) # ============================================================================= # 1. UNIT — write_commit idempotency and corruption detection # ============================================================================= class TestWriteCommitHashVerification: def test_idempotent_skip_clean_record(self, tmp_path: pathlib.Path) -> None: """Regression: write_commit on a clean existing file still returns fast.""" from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit() write_commit(repo, good) write_commit(repo, good) # second call: must not raise, must not change data result = read_commit(repo, good.commit_id) assert result is not None assert result.commit_id == good.commit_id def test_corrupt_snapshot_id_detected_at_read(self, tmp_path: pathlib.Path) -> None: """ A commit object with a corrupt snapshot_id is detected at read time. write_commit is idempotent: it skips if object_path exists, so a pre-existing corrupt file is NOT overwritten. read_commit recomputes the hash and returns None when snapshot_id doesn't match commit_id. """ from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit() _write_corrupt_commit(repo, good, {"snapshot_id": fake_id("attacker-snapshot")}) write_commit(repo, good) # skips — object already exists result = read_commit(repo, good.commit_id) assert result is None, ( "read_commit must detect corrupt snapshot_id via hash verification " "and return None — not silently serve corrupt content." ) def test_corrupt_message_detected_at_read(self, tmp_path: pathlib.Path) -> None: """A commit with a corrupt message is detected at read time.""" from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit(message="original message") _write_corrupt_commit(repo, good, {"message": "CORRUPTED MESSAGE"}) write_commit(repo, good) # skips — object already exists result = read_commit(repo, good.commit_id) assert result is None, "read_commit must detect corrupt message via hash verification" def test_corrupt_parent_commit_id_detected_at_read(self, tmp_path: pathlib.Path) -> None: """A commit with a corrupt parent_commit_id is detected at read time.""" from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit(parent_commit_id=None) _write_corrupt_commit(repo, good, {"parent_commit_id": fake_id("injected-parent")}) write_commit(repo, good) # skips — object already exists result = read_commit(repo, good.commit_id) assert result is None, "read_commit must detect corrupt parent_commit_id via hash verification" def test_content_hash_mismatch_skipped_not_raised(self, tmp_path: pathlib.Path) -> None: """ write_commit is always idempotent — never raises OSError for content-level mismatches. A corrupt object at object_path is silently skipped. Hash mismatches are detected later by read_commit. """ from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good_a = _good_commit(message="commit A", ts=_ts(2024)) good_b = _good_commit(message="commit B", ts=_ts(2025)) # Write B's data under A's object_path (commit_id field mismatch) _write_corrupt_commit(repo, good_a, { "commit_id": good_b.commit_id, "message": good_b.message, "snapshot_id": good_b.snapshot_id, }) # write_commit must NOT raise — it skips (idempotent) write_commit(repo, good_a) # read_commit detects the hash mismatch and returns None result = read_commit(repo, good_a.commit_id) assert result is None, "read_commit must detect commit_id field mismatch" # ============================================================================= # 2. UNIT — write_snapshot idempotency and corruption detection # ============================================================================= class TestWriteSnapshotHashVerification: def test_idempotent_skip_clean_snapshot(self, tmp_path: pathlib.Path) -> None: """Regression: write_snapshot on a clean existing file still skips correctly.""" from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) good = _good_snapshot() write_snapshot(repo, good) write_snapshot(repo, good) # second call: idempotent result = read_snapshot(repo, good.snapshot_id) assert result is not None assert result.snapshot_id == good.snapshot_id def test_corrupt_object_id_in_manifest_detected_at_read(self, tmp_path: pathlib.Path) -> None: """ A snapshot with a wrong object ID for a file is detected at read time. write_snapshot is idempotent: pre-existing corrupt object is skipped. read_snapshot recomputes the manifest hash and returns None on mismatch. """ from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) blob = fake_id("main-blob") good = _good_snapshot({"src/main.py": blob}) _write_corrupt_snapshot(repo, good, {"src/main.py": fake_id("wrong-blob")}) write_snapshot(repo, good) # skips — object already exists result = read_snapshot(repo, good.snapshot_id) assert result is None, "read_snapshot must detect corrupt manifest object ID" def test_extra_manifest_entry_detected_at_read(self, tmp_path: pathlib.Path) -> None: """An extra file in the manifest (hash mismatch) is detected at read time.""" from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) blob = fake_id("main-blob") good = _good_snapshot({"src/main.py": blob}) _write_corrupt_snapshot(repo, good, { "src/main.py": blob, "INJECTED_FILE.py": fake_id("injected-blob"), }) write_snapshot(repo, good) # skips result = read_snapshot(repo, good.snapshot_id) assert result is None, "read_snapshot must detect injected manifest entry" def test_empty_manifest_detected_at_read(self, tmp_path: pathlib.Path) -> None: """A snapshot with an empty manifest (should have files) is detected at read time.""" from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) good = _good_snapshot({ "src/main.py": fake_id("main-blob"), "src/utils.py": fake_id("utils-blob"), }) _write_corrupt_snapshot(repo, good, {}) # manifest wiped write_snapshot(repo, good) # skips result = read_snapshot(repo, good.snapshot_id) assert result is None, "read_snapshot must detect empty manifest (hash mismatch)" def test_missing_manifest_entry_detected_at_read(self, tmp_path: pathlib.Path) -> None: """A snapshot with a missing file entry is detected at read time.""" from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) main_blob = fake_id("main-blob") good = _good_snapshot({ "src/main.py": main_blob, "src/utils.py": fake_id("utils-blob"), }) _write_corrupt_snapshot(repo, good, {"src/main.py": main_blob}) # utils.py missing write_snapshot(repo, good) # skips result = read_snapshot(repo, good.snapshot_id) assert result is None, "read_snapshot must detect missing manifest entry" # ============================================================================= # 3. DATA INTEGRITY — full commit → snapshot chain # ============================================================================= class TestCommitSnapshotChain: def test_clean_commit_and_snapshot_both_readable(self, tmp_path: pathlib.Path) -> None: """Clean commit and snapshot written correctly are both readable.""" from muse.core.commits import ( read_commit, write_commit, ) from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) good_snap = _good_snapshot({"src/main.py": fake_id("main-blob")}) good_commit = _good_commit(snapshot_id=good_snap.snapshot_id) write_snapshot(repo, good_snap) write_commit(repo, good_commit) commit = read_commit(repo, good_commit.commit_id) assert commit is not None snap = read_snapshot(repo, commit.snapshot_id) assert snap is not None def test_parent_chain_corrupt_middle_unreadable(self, tmp_path: pathlib.Path) -> None: """A→B→C chain: corrupt B's object → B unreadable; A and C still readable.""" from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) commit_a = _good_commit(message="commit A", ts=_ts(2022)) commit_b = _good_commit(message="commit B", parent_commit_id=commit_a.commit_id, ts=_ts(2023)) commit_c = _good_commit(message="commit C", parent_commit_id=commit_b.commit_id, ts=_ts(2024)) write_commit(repo, commit_a) write_commit(repo, commit_b) write_commit(repo, commit_c) # Corrupt B by overwriting its object with a bad payload _write_corrupt_commit(repo, commit_b, {"snapshot_id": fake_id("wrong-snap")}) assert read_commit(repo, commit_a.commit_id) is not None, "A must be readable" assert read_commit(repo, commit_b.commit_id) is None, "B must be unreadable after corruption" assert read_commit(repo, commit_c.commit_id) is not None, "C must be readable" def test_corrupting_one_snapshot_does_not_affect_siblings(self, tmp_path: pathlib.Path) -> None: """Corrupting one snapshot leaves sibling snapshots readable.""" from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) snap_a = _good_snapshot({"a.py": fake_id("a-blob")}) snap_b = _good_snapshot({"b.py": fake_id("b-blob")}) snap_c = _good_snapshot({"c.py": fake_id("c-blob")}) write_snapshot(repo, snap_a) write_snapshot(repo, snap_b) write_snapshot(repo, snap_c) _write_corrupt_snapshot(repo, snap_b, {"b.py": fake_id("wrong-blob")}) assert read_snapshot(repo, snap_a.snapshot_id) is not None, "snap_a must be readable" assert read_snapshot(repo, snap_b.snapshot_id) is None, "snap_b must be unreadable after corruption" assert read_snapshot(repo, snap_c.snapshot_id) is not None, "snap_c must be readable" # ============================================================================= # 4. SECURITY — corrupt fields cannot forge content # ============================================================================= class TestSecurityCorruptFields: def test_corrupt_snapshot_id_in_commit_rejected_at_read(self, tmp_path: pathlib.Path) -> None: """ An attacker who corrupts a commit's snapshot_id cannot make Muse read different content — the hash mismatch is detected by read_commit. """ from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit() attacker_snapshot = fake_id("attacker-snapshot") _write_corrupt_commit(repo, good, {"snapshot_id": attacker_snapshot}) write_commit(repo, good) # skips — object exists result = read_commit(repo, good.commit_id) assert result is None, ( "Corrupt commit with attacker's snapshot_id must be rejected at read " "time — hash verification must detect the field substitution." ) def test_injected_manifest_entry_rejected_at_read(self, tmp_path: pathlib.Path) -> None: """ An injected file in the manifest (hash mismatch) is rejected at read time. """ from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) blob = fake_id("main-blob") good = _good_snapshot({"src/main.py": blob}) _write_corrupt_snapshot(repo, good, { "src/main.py": blob, "malicious_backdoor.py": fake_id("backdoor-blob"), }) write_snapshot(repo, good) # skips result = read_snapshot(repo, good.snapshot_id) assert result is None, ( "Snapshot with injected manifest entry must be rejected at read time." ) # ============================================================================= # 5. STRESS — concurrent writes are idempotent # ============================================================================= class TestStressConcurrentWrite: def test_concurrent_write_commit_all_idempotent(self, tmp_path: pathlib.Path) -> None: """20 concurrent write_commit calls on a good commit are all idempotent.""" from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit() def worker() -> None: write_commit(repo, good) threads = [threading.Thread(target=worker) for _ in range(20)] for t in threads: t.start() for t in threads: t.join() result = read_commit(repo, good.commit_id) assert result is not None, "After 20 concurrent write_commit calls, commit must be readable" assert result.snapshot_id == good.snapshot_id def test_concurrent_write_snapshot_all_idempotent(self, tmp_path: pathlib.Path) -> None: """20 concurrent write_snapshot calls on a good snapshot are all idempotent.""" from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) good = _good_snapshot() threads = [threading.Thread(target=lambda: write_snapshot(repo, good)) for _ in range(20)] for t in threads: t.start() for t in threads: t.join() result = read_snapshot(repo, good.snapshot_id) assert result is not None, "After 20 concurrent write_snapshot calls, snapshot must be readable" def test_50_sequential_commits_all_readable(self, tmp_path: pathlib.Path) -> None: """50 different commits all written and readable.""" from muse.core.commits import ( read_commit, write_commit, ) for i in range(50): repo = _make_repo(tmp_path / str(i)) good = _good_commit(message=f"commit {i}", ts=f"202{i % 10}-01-01T00:00:00+00:00") write_commit(repo, good) result = read_commit(repo, good.commit_id) assert result is not None, f"commit {i} not readable" # ============================================================================= # 6. REGRESSION — normal write paths still work # ============================================================================= class TestRegression: def test_write_commit_new_file_works(self, tmp_path: pathlib.Path) -> None: from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit() write_commit(repo, good) assert read_commit(repo, good.commit_id) is not None def test_write_snapshot_new_file_works(self, tmp_path: pathlib.Path) -> None: from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) good = _good_snapshot() write_snapshot(repo, good) assert read_snapshot(repo, good.snapshot_id) is not None def test_write_commit_idempotent_on_clean_file(self, tmp_path: pathlib.Path) -> None: from muse.core.commits import ( read_commit, write_commit, ) repo = _make_repo(tmp_path) good = _good_commit() write_commit(repo, good) for _ in range(10): write_commit(repo, good) assert read_commit(repo, good.commit_id) is not None def test_write_snapshot_idempotent_on_clean_file(self, tmp_path: pathlib.Path) -> None: from muse.core.snapshots import ( read_snapshot, write_snapshot, ) repo = _make_repo(tmp_path) good = _good_snapshot() write_snapshot(repo, good) for _ in range(10): write_snapshot(repo, good) assert read_snapshot(repo, good.snapshot_id) is not None