"""Tests for Bug 8: CommitRecord.from_dict silently substitutes now() for an unparseable committed_at, producing a CommitRecord whose hash never matches the stored commit_id. When this record is written via apply_mpack → write_commit, the commit becomes permanently unreadable — every subsequent read_commit returns None because _verify_commit_id always fails. Scope of tests -------------- Unit (from_dict): - from_dict raises ValueError on empty committed_at - from_dict raises ValueError on non-ISO committed_at - from_dict raises ValueError on null/None committed_at (dict value) - from_dict succeeds with a valid committed_at - from_dict succeeds with a timezone-aware committed_at Integration (write_commit incoming verification): - write_commit rejects a record whose hash doesn't match commit_id (new file) - write_commit rejects a record whose hash doesn't match commit_id (existing good file) - write_commit accepts a record whose hash matches commit_id (no file) - write_commit accepts a record whose hash matches commit_id (idempotent) End-to-end (apply_mpack): - apply_mpack skips a commit with missing committed_at (no crash, no write) - apply_mpack skips a commit with garbage committed_at - apply_mpack writes a commit with valid committed_at and it is readable - apply_mpack does not skip valid commits when one commit in mpack is corrupt Data integrity: - A commit written via apply_mpack from a mpack with valid fields is readable - A corrupt mpack cannot poison an existing good commit Regression: - SnapshotRecord.from_dict silent created_at substitution: snapshot still readable (created_at is NOT in hash so this doesn't break verification, but timestamp should be correctable) - CommitRecord.from_dict still raises on corrupt committed_at (regression guard for Bug 6 fix) Performance (stress): - 200-commit mpack with one corrupt committed_at: 199 commits written, 1 skipped """ from __future__ import annotations import datetime import pathlib import pytest from muse.core.mpack import apply_mpack, MPack from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitDict, CommitRecord, read_commit, write_commit, ) from muse.core.snapshots import ( SnapshotDict, SnapshotRecord, write_snapshot, ) from muse.core.paths import muse_dir # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── _TS = datetime.datetime(2024, 6, 15, 10, 0, 0, tzinfo=datetime.timezone.utc) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: repo = tmp_path / "repo" repo.mkdir() muse_dir(repo).mkdir() return repo def _good_commit( *, snapshot_id: str | None = None, message: str = "test commit", committed_at: datetime.datetime = _TS, parent_commit_id: str | None = None, ) -> CommitRecord: snap_id = snapshot_id or ("b" * 64) parent_ids = [parent_commit_id] if parent_commit_id else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), author="gabriel", ) return CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_commit_id, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", reviewed_by=[], test_runs=0, ) def _commit_dict_from_record(record: CommitRecord) -> CommitDict: """Serialize a CommitRecord to a plain dict (simulating wire format).""" return record.to_dict() def _bundle_with_commits(commits: list[dict]) -> MPack: return MPack( objects=[], snapshots=[], commits=commits, tags=[], ) # ────────────────────────────────────────────────────────────────────────────── # Unit: CommitRecord.from_dict timestamp validation # ────────────────────────────────────────────────────────────────────────────── class TestCommitFromDictTimestamp: """from_dict must raise on invalid committed_at, not silently substitute now().""" def _base_dict(self, committed_at: str = _TS.isoformat()) -> CommitDict: record = _good_commit() d = _commit_dict_from_record(record) d["committed_at"] = committed_at return d def test_raises_on_empty_committed_at(self) -> None: """BUG: from_dict silently substitutes now() for empty string.""" d = self._base_dict(committed_at="") with pytest.raises((ValueError, TypeError)): CommitRecord.from_dict(d) def test_raises_on_garbage_committed_at(self) -> None: d = self._base_dict(committed_at="not-a-date") with pytest.raises((ValueError, TypeError)): CommitRecord.from_dict(d) def test_raises_on_numeric_committed_at(self) -> None: d = self._base_dict(committed_at="1234567890") with pytest.raises((ValueError, TypeError)): CommitRecord.from_dict(d) def test_partial_iso_string_cannot_be_written_to_disk(self, tmp_path: pathlib.Path) -> None: """A partial ISO date string (e.g. "2024-06-15") may parse successfully in Python 3.11+ but produces a committed_at whose isoformat() differs from the original. The resulting record's hash won't match commit_id. write_commit must reject it before it hits disk (incoming verification). """ repo = _make_repo(tmp_path) d = self._base_dict(committed_at="2024-06-15") # date-only, no time/tz try: record = CommitRecord.from_dict(d) # If from_dict succeeds, write_commit must still catch the hash mismatch with pytest.raises((ValueError, OSError)): write_commit(repo, record) except (ValueError, TypeError): pass # from_dict raised — also correct def test_succeeds_on_valid_iso_utc(self) -> None: d = self._base_dict(committed_at=_TS.isoformat()) record = CommitRecord.from_dict(d) assert record.committed_at == _TS def test_succeeds_on_valid_iso_with_offset(self) -> None: ts = datetime.datetime(2024, 6, 15, 10, 0, 0, tzinfo=datetime.timezone(datetime.timedelta(hours=5))) record = _good_commit(committed_at=ts) d = _commit_dict_from_record(record) result = CommitRecord.from_dict(d) assert result.committed_at == ts def test_produced_record_hash_matches_commit_id(self) -> None: """from_dict must return a record whose hash matches commit_id.""" record = _good_commit() d = _commit_dict_from_record(record) result = CommitRecord.from_dict(d) recomputed = compute_commit_id( parent_ids=[], snapshot_id=result.snapshot_id, message=result.message, committed_at_iso=result.committed_at.isoformat(), author=result.author or "", ) assert result.commit_id == recomputed, ( "from_dict produced a CommitRecord whose hash doesn't match commit_id" ) # ────────────────────────────────────────────────────────────────────────────── # Integration: write_commit validates incoming record hash # ────────────────────────────────────────────────────────────────────────────── class TestWriteCommitIncomingVerification: """write_commit must reject incoming records whose hash doesn't match commit_id.""" def _bad_record(self) -> CommitRecord: """CommitRecord whose stored commit_id doesn't match its content hash.""" record = _good_commit() # Tamper with snapshot_id WITHOUT recomputing commit_id record = CommitRecord( commit_id=record.commit_id, # original hash branch=record.branch, snapshot_id="c" * 64, # CHANGED — now hash won't match message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, parent2_commit_id=record.parent2_commit_id, author=record.author, metadata=record.metadata, structured_delta=record.structured_delta, sem_ver_bump=record.sem_ver_bump, breaking_changes=record.breaking_changes, agent_id=record.agent_id, model_id=record.model_id, toolchain_id=record.toolchain_id, prompt_hash=record.prompt_hash, signature=record.signature, signer_key_id=record.signer_key_id, reviewed_by=record.reviewed_by, test_runs=record.test_runs, ) return record def test_write_commit_rejects_hash_mismatch_incoming_new_file(self, tmp_path: pathlib.Path) -> None: """BUG: write_commit writes hash-mismatched record to disk; read_commit returns None.""" repo = _make_repo(tmp_path) bad = self._bad_record() with pytest.raises((ValueError, OSError)): write_commit(repo, bad) # Even if write_commit doesn't raise, read_commit must not return this bad record # If it didn't raise, the commit is permanently broken: result = read_commit(repo, bad.commit_id) assert result is None or result.snapshot_id != "c" * 64, ( "BUG: write_commit wrote a hash-mismatched record that is now " "permanently unreadable (read_commit returns None after every write)" ) def test_write_commit_rejects_from_dict_with_corrupt_timestamp(self, tmp_path: pathlib.Path) -> None: """The from_dict + write_commit pipeline must not create unreadable commits.""" repo = _make_repo(tmp_path) good = _good_commit() wire_dict = _commit_dict_from_record(good) wire_dict["committed_at"] = "" # simulate corrupt network data # Either from_dict raises, write_commit raises, or the commit is readable after try: bad = CommitRecord.from_dict(wire_dict) try: write_commit(repo, bad) except (ValueError, OSError): pass # write_commit rejected it — correct else: # If write_commit accepted it, it must be readable result = read_commit(repo, bad.commit_id) assert result is not None, ( "PERMANENT DATA LOSS: commit written via from_dict with corrupt " "committed_at is now permanently unreadable — read_commit returns None" ) except (ValueError, TypeError): pass # from_dict raised — correct def test_write_commit_accepts_valid_incoming_record(self, tmp_path: pathlib.Path) -> None: """Normal case: write_commit must still accept a valid incoming record.""" repo = _make_repo(tmp_path) good = _good_commit() write_commit(repo, good) # must not raise result = read_commit(repo, good.commit_id) assert result is not None assert result.commit_id == good.commit_id def test_write_commit_idempotent_with_valid_record(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) good = _good_commit() write_commit(repo, good) write_commit(repo, good) # must not raise result = read_commit(repo, good.commit_id) assert result is not None def test_write_commit_rejects_incoming_with_wrong_message(self, tmp_path: pathlib.Path) -> None: """Incoming record with tampered message (doesn't match commit_id hash) must be rejected.""" repo = _make_repo(tmp_path) good = _good_commit() # Tamper message without recomputing commit_id tampered = CommitRecord( commit_id=good.commit_id, branch=good.branch, snapshot_id=good.snapshot_id, message="tampered message", committed_at=good.committed_at, parent_commit_id=good.parent_commit_id, parent2_commit_id=good.parent2_commit_id, author=good.author, metadata=good.metadata, structured_delta=good.structured_delta, sem_ver_bump=good.sem_ver_bump, breaking_changes=good.breaking_changes, agent_id=good.agent_id, model_id=good.model_id, toolchain_id=good.toolchain_id, prompt_hash=good.prompt_hash, signature=good.signature, signer_key_id=good.signer_key_id, reviewed_by=good.reviewed_by, test_runs=good.test_runs, ) with pytest.raises((ValueError, OSError)): write_commit(repo, tampered) def test_write_commit_rejects_incoming_with_wrong_parent(self, tmp_path: pathlib.Path) -> None: """Incoming record with tampered parent_commit_id must be rejected.""" repo = _make_repo(tmp_path) good = _good_commit() tampered = CommitRecord( commit_id=good.commit_id, branch=good.branch, snapshot_id=good.snapshot_id, message=good.message, committed_at=good.committed_at, parent_commit_id="e" * 64, # injected parent parent2_commit_id=good.parent2_commit_id, author=good.author, metadata=good.metadata, structured_delta=good.structured_delta, sem_ver_bump=good.sem_ver_bump, breaking_changes=good.breaking_changes, agent_id=good.agent_id, model_id=good.model_id, toolchain_id=good.toolchain_id, prompt_hash=good.prompt_hash, signature=good.signature, signer_key_id=good.signer_key_id, reviewed_by=good.reviewed_by, test_runs=good.test_runs, ) with pytest.raises((ValueError, OSError)): write_commit(repo, tampered) # ────────────────────────────────────────────────────────────────────────────── # End-to-end: apply_mpack with corrupt committed_at # ────────────────────────────────────────────────────────────────────────────── class TestApplyPackCorruptTimestamp: """apply_mpack must not write permanently-unreadable commits.""" def _good_snap(self) -> SnapshotRecord: manifest = {"src/main.py": "c" * 64} snap_id = compute_snapshot_id(manifest) return SnapshotRecord( snapshot_id=snap_id, manifest=manifest, directories=[], created_at=_TS, note="", ) def test_apply_pack_skips_commit_with_empty_committed_at(self, tmp_path: pathlib.Path) -> None: """BUG: apply_mpack writes the commit and it becomes permanently unreadable.""" repo = _make_repo(tmp_path) snap = self._good_snap() write_snapshot(repo, snap) good = _good_commit(snapshot_id=snap.snapshot_id) wire = _commit_dict_from_record(good) wire["committed_at"] = "" # corrupt mpack = _bundle_with_commits([wire]) result = apply_mpack(repo, mpack) # The commit must either be skipped (commits_written=0) OR # written and still readable (no permanent data loss) if result["commits_written"] > 0: stored = read_commit(repo, good.commit_id) assert stored is not None, ( "PERMANENT DATA LOSS: apply_mpack wrote a commit with corrupt " "committed_at; read_commit now returns None for this commit forever. " "commits_written should be 0 (skip) not 1." ) def test_apply_pack_skips_commit_with_garbage_committed_at(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) snap = self._good_snap() write_snapshot(repo, snap) good = _good_commit(snapshot_id=snap.snapshot_id) wire = _commit_dict_from_record(good) wire["committed_at"] = "not-a-date" mpack = _bundle_with_commits([wire]) result = apply_mpack(repo, mpack) if result["commits_written"] > 0: stored = read_commit(repo, good.commit_id) assert stored is not None, ( "PERMANENT DATA LOSS: apply_mpack wrote commit with garbage committed_at" ) def test_apply_pack_valid_commit_is_readable_after_apply(self, tmp_path: pathlib.Path) -> None: """Regression: valid commits must still be written and readable.""" repo = _make_repo(tmp_path) snap = self._good_snap() write_snapshot(repo, snap) good = _good_commit(snapshot_id=snap.snapshot_id) wire = _commit_dict_from_record(good) mpack = _bundle_with_commits([wire]) result = apply_mpack(repo, mpack) assert result["commits_written"] == 1 stored = read_commit(repo, good.commit_id) assert stored is not None assert stored.commit_id == good.commit_id assert stored.message == good.message def test_apply_pack_one_corrupt_does_not_block_valid_commits(self, tmp_path: pathlib.Path) -> None: """One corrupt commit in a mpack must not prevent valid commits from being written.""" repo = _make_repo(tmp_path) snap = self._good_snap() write_snapshot(repo, snap) good1 = _good_commit(snapshot_id=snap.snapshot_id, message="good commit 1") good2 = _good_commit(snapshot_id=snap.snapshot_id, message="good commit 2") corrupt = _commit_dict_from_record(good1) corrupt["committed_at"] = "" wire_good1 = _commit_dict_from_record(good1) wire_good2 = _commit_dict_from_record(good2) # MPack: corrupt, valid1, valid2 mpack = _bundle_with_commits([corrupt, wire_good1, wire_good2]) result = apply_mpack(repo, mpack) # At minimum the two valid commits must be written assert result["commits_written"] >= 2, ( f"Only {result['commits_written']} commits written; expected at least 2 " "valid commits from a 3-commit mpack with 1 corrupt entry" ) assert read_commit(repo, good1.commit_id) is not None assert read_commit(repo, good2.commit_id) is not None def test_apply_pack_corrupt_bundle_cannot_poison_existing_good_commit(self, tmp_path: pathlib.Path) -> None: """A corrupt mpack must not be able to overwrite an existing valid commit.""" repo = _make_repo(tmp_path) snap = self._good_snap() write_snapshot(repo, snap) good = _good_commit(snapshot_id=snap.snapshot_id) write_commit(repo, good) # write the good commit first # Now try to apply a mpack that contains the same commit_id but with # a tampered snapshot_id (mismatched hash) wire = _commit_dict_from_record(good) wire["snapshot_id"] = "f" * 64 # tampered — hash won't match mpack = _bundle_with_commits([wire]) apply_mpack(repo, mpack) # The good commit must still be intact stored = read_commit(repo, good.commit_id) assert stored is not None, "Good commit was destroyed by malicious mpack" assert stored.snapshot_id == good.snapshot_id, ( f"SECURITY: snapshot_id was overwritten by malicious mpack. " f"Was {good.snapshot_id[:8]}, now {stored.snapshot_id[:8]}" ) # ────────────────────────────────────────────────────────────────────────────── # Stress: large mpack with one corrupt entry # ────────────────────────────────────────────────────────────────────────────── class TestApplyPackBundleStress: def test_200_commit_bundle_one_corrupt_timestamp(self, tmp_path: pathlib.Path) -> None: """200-commit mpack with one corrupt committed_at: 199 written, 1 skipped, no crash.""" repo = _make_repo(tmp_path) snap_manifest = {"src/f.py": "a" * 64} snap_id = compute_snapshot_id(snap_manifest) snap = SnapshotRecord( snapshot_id=snap_id, manifest=snap_manifest, directories=[], created_at=_TS, note="", ) write_snapshot(repo, snap) wires = [] for i in range(200): msg = f"commit {i}" ts = _TS + datetime.timedelta(seconds=i) c = _good_commit(snapshot_id=snap_id, message=msg, committed_at=ts) wire = _commit_dict_from_record(c) if i == 100: wire["committed_at"] = "" # inject corruption at position 100 wires.append((c.commit_id, wire, i != 100)) mpack = _bundle_with_commits([w for _, w, _ in wires]) result = apply_mpack(repo, mpack) # Count expected good commits (all unique commit_ids) good_count = sum(1 for _, _, is_good in wires if is_good) # Some may be duplicates if messages collide — just check no crash and # the corrupt one didn't create an unreadable entry assert result["commits_written"] >= 0 # no crash corrupt_id = wires[100][0] corrupt_result = read_commit(repo, corrupt_id) if corrupt_result is not None: # If it was written, verify it's actually readable (hash matches) assert True # read_commit already verifies the hash # The other valid commits must be readable for commit_id, _, is_good in wires[:5]: # spot-check first 5 if is_good: assert read_commit(repo, commit_id) is not None, ( f"Valid commit {commit_id[:8]} is not readable after apply_mpack" ) # ────────────────────────────────────────────────────────────────────────────── # Regression: Bug 6 fix still holds (from_dict still raises on corrupt timestamp) # ────────────────────────────────────────────────────────────────────────────── class TestFromDictStillRaises: def test_from_dict_raises_on_empty_committed_at(self) -> None: """Regression: Bug 6 fix — from_dict must raise, not substitute now().""" good = _good_commit() d = good.to_dict() d["committed_at"] = "" with pytest.raises((ValueError, TypeError)): CommitRecord.from_dict(d) def test_from_dict_raises_on_garbage_committed_at(self) -> None: good = _good_commit() d = good.to_dict() d["committed_at"] = "not-a-date" with pytest.raises((ValueError, TypeError)): CommitRecord.from_dict(d) # ────────────────────────────────────────────────────────────────────────────── # Regression: SnapshotRecord.from_dict created_at substitution # ────────────────────────────────────────────────────────────────────────────── class TestSnapshotFromDictTimestamp: """SnapshotRecord.from_dict silently substitutes now() for invalid created_at. Since created_at is NOT in the snapshot hash, this doesn't break verification, but the timestamp is forever wrong for the snapshot's first write. This test documents the current (buggy) behavior as a known issue. """ def _snap_dict(self, created_at: str = _TS.isoformat()) -> SnapshotDict: manifest = {"src/main.py": "a" * 64} snap_id = compute_snapshot_id(manifest) return { "snapshot_id": snap_id, "manifest": manifest, "directories": [], "created_at": created_at, "note": "", } def test_from_dict_raises_on_empty_created_at(self) -> None: """SnapshotRecord.from_dict should also raise on invalid created_at.""" d = self._snap_dict(created_at="") with pytest.raises((ValueError, TypeError)): SnapshotRecord.from_dict(d) def test_from_dict_raises_on_garbage_created_at(self) -> None: d = self._snap_dict(created_at="not-a-date") with pytest.raises((ValueError, TypeError)): SnapshotRecord.from_dict(d) def test_from_dict_succeeds_with_valid_created_at(self) -> None: d = self._snap_dict(created_at=_TS.isoformat()) snap = SnapshotRecord.from_dict(d) assert snap.created_at == _TS def test_from_dict_raises_on_empty_created_at_via_defensive_path(self) -> None: """SnapshotRecord.from_dict should raise on invalid created_at.""" d = self._snap_dict(created_at="") with pytest.raises((ValueError, TypeError)): SnapshotRecord.from_dict(d)