"""Phase 1.5 — Commit record integrity on re-read. Tests cover: - write_commit idempotency: silent drop of duplicate ID - write_commit collision detection: existing file is corrupt → CRITICAL + overwrite - write_commit integrity violation: existing record has mismatched commit_id - read_commit: WARNING→CRITICAL upgrade for corrupt files - read_commit_result: discriminated union (ok / not_found / corrupt) - read_snapshot / read_snapshot_result: same guarantees - get_all_commits / get_all_tags: CRITICAL on corrupt (previously silent) - list_releases: CRITICAL on corrupt (previously silent) - verify-pack integration after write_commit - Concurrent write with same ID: first writer always wins (idempotency at scale) - Regression: corrupt file must log CRITICAL (level 50), never WARNING (level 30) """ from __future__ import annotations import datetime import json import logging import pathlib import threading import pytest from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.types import Manifest, fake_id, long_id from muse.core.paths import muse_dir _REPO_ID = fake_id("repo") from muse.core.object_store import object_path as _obj_path from muse.core.semver import SemVerTag from muse.core.commits import ( CommitReadCorrupt, CommitReadNotFound, CommitReadOk, CommitRecord, commit_read_is_corrupt, commit_read_is_not_found, commit_read_is_ok, get_all_commits, read_commit, read_commit_result, write_commit, ) from muse.core.snapshots import ( SnapshotReadCorrupt, SnapshotReadNotFound, SnapshotReadOk, SnapshotRecord, read_snapshot, read_snapshot_result, snapshot_read_is_corrupt, snapshot_read_is_ok, write_snapshot, ) from muse.core.tags import ( TagRecord, get_all_tags, tag_path, write_tag, ) from muse.core.releases import ( ReleaseRecord, list_releases, release_path as _release_path, write_release, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_commit( root: pathlib.Path, message: str = "msg", branch: str = "main", parent: str | None = None, write: bool = True, ) -> CommitRecord: """Create a CommitRecord with a content-addressed commit_id. Uses ``compute_commit_id`` so every record passes ``_verify_commit_id`` on read-back. ``write=False`` builds the record without persisting it — useful for testing concurrent or idempotent write scenarios. """ committed_at = datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc) snap_id = compute_snapshot_id({}) parent_ids = [parent] if parent else [] cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), author="tester", ) c = CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=message, committed_at=committed_at, author="tester", parent_commit_id=parent, parent2_commit_id=None, ) if write: write_commit(root, c) return c def _make_snapshot( root: pathlib.Path, manifest: Manifest | None = None ) -> SnapshotRecord: """Create a SnapshotRecord with a content-addressed snapshot_id. Pass distinct ``manifest`` dicts to get distinct snapshot_ids — e.g. ``{"file-A.py": "a" * 64}`` vs ``{"file-B.py": "b" * 64}``. """ m = manifest or {} sid = compute_snapshot_id(m) s = SnapshotRecord( snapshot_id=sid, manifest=m, created_at=datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc), ) write_snapshot(root, s) return s def _make_tag(root: pathlib.Path, tag_name: str) -> TagRecord: t = TagRecord( repo_id=_REPO_ID, tag_id=fake_id(tag_name), commit_id=fake_id("tag-commit"), tag=tag_name, created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), ) write_tag(root, t) return t def _make_release(root: pathlib.Path, tag: str, semver: SemVerTag) -> ReleaseRecord: r = ReleaseRecord( repo_id=_REPO_ID, release_id=fake_id(tag + "-release"), tag=tag, semver=semver, channel="stable", commit_id=fake_id("release-commit"), snapshot_id=fake_id(tag), title=tag, body="", changelog=[], ) write_release(root, r) return r def _tag_path(root: pathlib.Path, tag_id: str) -> pathlib.Path: return tag_path(root, _REPO_ID, tag_id) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "tags").mkdir(parents=True) (dot_muse / "releases").mkdir(parents=True) (dot_muse / "repo.json").write_text(json.dumps({"repo_id": _REPO_ID})) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") return tmp_path # =========================================================================== # 1. write_commit — idempotency # =========================================================================== class TestWriteCommitIdempotency: def test_first_writer_wins(self, repo: pathlib.Path) -> None: """A record with wrong incoming hash is rejected before it can overwrite anything. The old "first writer wins via silent drop" path is superseded by incoming hash verification: a record whose commit_id doesn't match its content hash raises ValueError immediately — the good file on disk is never touched. """ c1 = _make_commit(repo, message="first-wins") # Construct a record with the same commit_id but different content — # the hash won't match, so write_commit must raise before touching disk. c2 = CommitRecord( commit_id=c1.commit_id, branch="main", snapshot_id=c1.snapshot_id, message="second-attempt", committed_at=c1.committed_at, author="tester", parent_commit_id=None, parent2_commit_id=None, ) with pytest.raises(ValueError): write_commit(repo, c2) loaded = read_commit(repo, c1.commit_id) assert loaded is not None assert loaded.message == "first-wins", "bad incoming record must not overwrite good file" def test_exact_duplicate_emits_no_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """Writing the exact same record twice must not log CRITICAL.""" c = _make_commit(repo, message="exact-dup-no-critical") with caplog.at_level(logging.DEBUG, logger="muse.core.store"): write_commit(repo, c) assert not any(r.levelno >= logging.CRITICAL for r in caplog.records) def test_idempotent_round_trip_preserves_all_fields(self, repo: pathlib.Path) -> None: c = _make_commit(repo, message="preserve-me", branch="feat/x") write_commit(repo, c) # second write — must be completely harmless loaded = read_commit(repo, c.commit_id) assert loaded is not None assert loaded.message == "preserve-me" assert loaded.branch == "feat/x" # =========================================================================== # 2. write_commit — corrupt existing file → CRITICAL + overwrite # =========================================================================== class TestWriteCommitCorruptExistingFile: def test_corrupt_existing_is_skipped_by_write_commit( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """write_commit is idempotent: if object_path exists, it is skipped. Corruption written to object_path after the initial write is NOT repaired by a subsequent write_commit call — first writer wins. read_commit detects the corruption and returns None. """ c = _make_commit(repo, message="original-overwrite") # Simulate disk corruption after the initial write _obj_path(repo, c.commit_id).write_bytes(b"\xff\xfe\x00bad-data\x99") write_commit(repo, c) # idempotent — skips, file already exists loaded = read_commit(repo, c.commit_id) assert loaded is None # corruption detected at read time def test_empty_existing_is_skipped_by_write_commit( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """Zero-byte commit file is not repaired by write_commit (idempotent).""" c = _make_commit(repo, message="after-crash-overwrite") _obj_path(repo, c.commit_id).write_bytes(b"") write_commit(repo, c) # skips — file exists loaded = read_commit(repo, c.commit_id) assert loaded is None # empty file is corrupt def test_truncated_existing_is_skipped_by_write_commit( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """Truncated commit file is not repaired by write_commit (idempotent).""" c = _make_commit(repo, message="after-truncation-overwrite") path = _obj_path(repo, c.commit_id) good_bytes = path.read_bytes() path.write_bytes(good_bytes[: len(good_bytes) // 2]) write_commit(repo, c) # skips — file exists loaded = read_commit(repo, c.commit_id) assert loaded is None # truncated file is corrupt # =========================================================================== # 3. write_commit — store integrity violation # =========================================================================== class TestWriteCommitIntegrityViolation: def test_commit_id_mismatch_detected_at_read_time(self, repo: pathlib.Path) -> None: """Impostor bytes at object_path are detected by read_commit, not write_commit. write_commit is idempotent: if object_path exists, it is skipped regardless of content. Hash verification happens at read time — read_commit returns None when the stored payload's commit_id doesn't match the recomputed hash. """ import json as _json_mod c_legit = _make_commit(repo, message="legitimate-mismatch") c_impostor = _make_commit(repo, message="impostor-mismatch") # Overwrite c_legit's object_path with impostor's JSON payload impostor_dict = c_impostor.to_dict() payload = _json_mod.dumps(impostor_dict, separators=(",", ":")).encode() _obj_path(repo, c_legit.commit_id).write_bytes( f"commit {len(payload)}\0".encode() + payload ) # write_commit skips — file exists, no OSError raised write_commit(repo, c_legit) # read_commit detects hash mismatch → returns None assert read_commit(repo, c_legit.commit_id) is None # =========================================================================== # 4. read_commit — CRITICAL log for corrupt # =========================================================================== class TestReadCommitCriticalLogging: def test_corrupt_file_logs_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: c = _make_commit(repo, message="garbage-payload") _obj_path(repo, c.commit_id).write_bytes(b"\x00\x01garbage\xff") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): result = read_commit(repo, c.commit_id) assert result is None crits = [r for r in caplog.records if r.levelno >= logging.CRITICAL] assert crits, "Must log CRITICAL for corrupt commit file" assert any("Corrupt" in r.message for r in crits) def test_missing_file_returns_none_no_log( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: with caplog.at_level(logging.DEBUG, logger="muse.core.store"): result = read_commit(repo, fake_id("missing-commit")) assert result is None assert not any(r.levelno >= logging.WARNING for r in caplog.records) def test_valid_file_returns_record_no_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: c = _make_commit(repo, message="clean-read") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): result = read_commit(repo, c.commit_id) assert result is not None assert result.message == "clean-read" assert not any(r.levelno >= logging.CRITICAL for r in caplog.records) def test_corrupt_log_references_filename( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: c = _make_commit(repo, message="not-msgpack-content") _obj_path(repo, c.commit_id).write_bytes(b"not-msgpack") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): read_commit(repo, c.commit_id) messages = " ".join(r.message + str(r.args) for r in caplog.records) bare = long_id(c.commit_id, strip=True) assert bare[:8] in messages or bare in messages # =========================================================================== # 5. read_commit_result — discriminated union # =========================================================================== class TestReadCommitResult: def test_ok_status_on_valid_record(self, repo: pathlib.Path) -> None: c = _make_commit(repo, message="typed-ok") r = read_commit_result(repo, c.commit_id) assert commit_read_is_ok(r) assert isinstance(r["commit"], CommitRecord) assert r["commit"].message == "typed-ok" def test_not_found_status_when_missing(self, repo: pathlib.Path) -> None: r = read_commit_result(repo, fake_id("ff-missing-commit")) assert commit_read_is_not_found(r) def test_corrupt_status_on_bad_bytes( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: c = _make_commit(repo, message="corrupt-bytes") _obj_path(repo, c.commit_id).write_bytes(b"\xff\x00garbage") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): r = read_commit_result(repo, c.commit_id) assert commit_read_is_corrupt(r) assert r["path"] != "" assert r["error"] != "" crits = [rec for rec in caplog.records if rec.levelno >= logging.CRITICAL] assert crits def test_corrupt_result_path_contains_commit_id(self, repo: pathlib.Path) -> None: c = _make_commit(repo, message="path-in-corrupt") _obj_path(repo, c.commit_id).write_bytes(b"") r = read_commit_result(repo, c.commit_id) assert commit_read_is_corrupt(r) # object_path splits the 64-char hex at position 2 (dir prefix), so # the full hex is never a contiguous substring. Check the first 2 # chars (the dir component) and the following chars (the filename). bare = long_id(c.commit_id, strip=True) assert bare[:2] in r["path"] and bare[2:10] in r["path"] def test_ok_result_roundtrips_all_metadata(self, repo: pathlib.Path) -> None: # Build with a real content-addressed ID so _verify_commit_id passes. snap_id = fake_id("snap-meta-roundtrip") committed_at = datetime.datetime(2026, 3, 15, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="full metadata", committed_at_iso=committed_at.isoformat(), author="alice", ) c = CommitRecord( commit_id=cid, branch="dev", snapshot_id=snap_id, message="full metadata", committed_at=committed_at, author="alice", parent_commit_id=None, parent2_commit_id=None, metadata={"key": "val"}, ) write_commit(repo, c) r = read_commit_result(repo, cid) assert commit_read_is_ok(r) assert r["commit"].branch == "dev" assert r["commit"].author == "alice" assert r["commit"].metadata == {"key": "val"} def test_status_field_is_string(self, repo: pathlib.Path) -> None: """Status values are plain strings — easy for agents to pattern-match.""" c = _make_commit(repo, message="status-str-check") r = read_commit_result(repo, c.commit_id) assert isinstance(r["status"], str) def test_not_found_has_only_status_key(self, repo: pathlib.Path) -> None: r = read_commit_result(repo, fake_id("90-not-existing")) assert set(r.keys()) == {"status"} def test_three_outcomes_are_mutually_exclusive(self, repo: pathlib.Path) -> None: """Confirm all three outcome strings are distinct and unambiguous.""" c_ok = _make_commit(repo, message="outcome-ok") c_corrupt = _make_commit(repo, message="outcome-corrupt") _obj_path(repo, c_corrupt.commit_id).write_bytes(b"bad") statuses = { read_commit_result(repo, c_ok.commit_id)["status"], read_commit_result(repo, fake_id("cc-missing"))["status"], read_commit_result(repo, c_corrupt.commit_id)["status"], } assert statuses == {"ok", "not_found", "corrupt"} # =========================================================================== # 6. read_snapshot / read_snapshot_result # =========================================================================== class TestReadSnapshotIntegrity: def test_corrupt_snapshot_logs_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: s = _make_snapshot(repo, manifest={"snap-critical.py": fake_id("oid-a")}) _obj_path(repo, s.snapshot_id).write_bytes(b"\xde\xad\xbe\xef") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): result = read_snapshot(repo, s.snapshot_id) assert result is None assert any(r.levelno >= logging.CRITICAL for r in caplog.records) def test_snapshot_result_ok(self, repo: pathlib.Path) -> None: s = _make_snapshot(repo, manifest={"snap-ok.py": fake_id("oid-b")}) r = read_snapshot_result(repo, s.snapshot_id) assert snapshot_read_is_ok(r) assert isinstance(r["snapshot"], SnapshotRecord) def test_snapshot_result_not_found(self, repo: pathlib.Path) -> None: r = read_snapshot_result(repo, fake_id("no-snap")) assert r["status"] == "not_found" assert set(r.keys()) == {"status"} def test_snapshot_result_corrupt( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: s = _make_snapshot(repo, manifest={"snap-corrupt.py": fake_id("oid-c")}) _obj_path(repo, s.snapshot_id).write_bytes(b"garbage-bytes\x00") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): r = read_snapshot_result(repo, s.snapshot_id) assert snapshot_read_is_corrupt(r) assert r["path"] != "" assert r["error"] != "" def test_missing_snapshot_no_log( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: with caplog.at_level(logging.DEBUG, logger="muse.core.store"): result = read_snapshot(repo, fake_id("missing")) assert result is None assert not any(r.levelno >= logging.WARNING for r in caplog.records) # =========================================================================== # 7. get_all_commits — CRITICAL on corrupt (previously silent) # =========================================================================== class TestGetAllCommitsCorruptLogging: def test_one_corrupt_skipped_with_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """Corrupt commit is skipped; good commits returned; CRITICAL emitted.""" c_good = _make_commit(repo, message="good-survives") c_bad = _make_commit(repo, message="will-corrupt") _obj_path(repo, c_bad.commit_id).write_bytes(b"\xff\x00") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): commits = get_all_commits(repo) ids = {c.commit_id for c in commits} assert c_good.commit_id in ids, "good commit must still appear" assert c_bad.commit_id not in ids, "corrupt commit must be excluded" assert any(r.levelno >= logging.CRITICAL for r in caplog.records) def test_all_corrupt_returns_empty_with_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: written = [_make_commit(repo, message=f"c{i}") for i in range(3)] for c in written: _obj_path(repo, c.commit_id).write_bytes(b"bad") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): commits = get_all_commits(repo) assert commits == [] crits = [r for r in caplog.records if r.levelno >= logging.CRITICAL] assert len(crits) == 3 def test_empty_store_returns_empty_no_log( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: with caplog.at_level(logging.DEBUG, logger="muse.core.store"): commits = get_all_commits(repo) assert commits == [] assert not any(r.levelno >= logging.WARNING for r in caplog.records) def test_mixed_good_and_corrupt_correct_count(self, repo: pathlib.Path) -> None: good = [_make_commit(repo, message=f"g{i}") for i in range(5)] bad = [_make_commit(repo, message=f"b{i}") for i in range(3)] for c in bad: _obj_path(repo, c.commit_id).write_bytes(b"corrupt") commits = get_all_commits(repo) assert len(commits) == len(good) # =========================================================================== # 8. get_all_tags — CRITICAL on corrupt (previously silent) # =========================================================================== class TestGetAllTagsCorruptLogging: def test_corrupt_tag_skipped_with_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: t1 = _make_tag(repo, "v1.0.0") t2 = _make_tag(repo, "v2.0.0") _tag_path(repo, t2.tag_id).write_bytes(b"\x00bad") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): tags = get_all_tags(repo, _REPO_ID) tag_values = {t.tag for t in tags} assert "v1.0.0" in tag_values assert "v2.0.0" not in tag_values assert any(r.levelno >= logging.CRITICAL for r in caplog.records) def test_good_tags_all_returned(self, repo: pathlib.Path) -> None: _make_tag(repo, "v0.1") _make_tag(repo, "v0.2") tags = get_all_tags(repo, _REPO_ID) assert len(tags) == 2 def test_all_corrupt_tags_returns_empty_with_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: for name in ("v1", "v2", "v3"): t = _make_tag(repo, name) _tag_path(repo, t.tag_id).write_bytes(b"bad") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): tags = get_all_tags(repo, _REPO_ID) assert tags == [] crits = [r for r in caplog.records if r.levelno >= logging.CRITICAL] assert len(crits) == 3 # =========================================================================== # 9. list_releases — CRITICAL on corrupt (previously silent) # =========================================================================== class TestListReleasesCorruptLogging: def test_corrupt_release_skipped_with_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: good = _make_release( repo, "v1.0.0", SemVerTag(major=1, minor=0, patch=0, pre="", build="") ) bad = _make_release( repo, "v2.0.0", SemVerTag(major=2, minor=0, patch=0, pre="", build="") ) _release_path(repo, _REPO_ID, bad.release_id).write_bytes(b"\xff\x00garbage") with caplog.at_level(logging.CRITICAL, logger="muse.core.store"): releases = list_releases(repo, _REPO_ID) ids = {r.release_id for r in releases} assert good.release_id in ids assert bad.release_id not in ids assert any(r.levelno >= logging.CRITICAL for r in caplog.records) def test_all_releases_good_returns_all(self, repo: pathlib.Path) -> None: _make_release(repo, "v1.0.0", SemVerTag(major=1, minor=0, patch=0, pre="", build="")) _make_release(repo, "v1.1.0", SemVerTag(major=1, minor=1, patch=0, pre="", build="")) releases = list_releases(repo, _REPO_ID) assert len(releases) == 2 # =========================================================================== # 10. verify-pack integration after write_commit # =========================================================================== class TestVerifyPackAfterWriteCommit: def test_cmd_read_commit_roundtrip(self, repo: pathlib.Path) -> None: """``muse read-commit`` must succeed for every written commit.""" from tests.cli_test_helper import CliRunner c = _make_commit(repo, message="plumbing-check") runner = CliRunner() result = runner.invoke( None, ["read-commit", c.commit_id, "--json"], env={"MUSE_REPO_ROOT": str(repo)}, ) assert result.exit_code == 0 import json as _json data = _json.loads(result.output) assert data["commit_id"] == c.commit_id assert data["message"] == "plumbing-check" # =========================================================================== # 11. Concurrent idempotency — 50 threads race to write the same commit # =========================================================================== class TestConcurrentIdempotentWrite: def test_50_threads_same_commit_id_first_wins(self, repo: pathlib.Path) -> None: """50 threads writing the EXACT same commit — idempotent, exactly one file written.""" # In a content-addressed system, identical content → identical commit_id. c = _make_commit(repo, message="concurrent-idempotent", write=False) errors: list[Exception] = [] def write_one() -> None: try: write_commit(repo, c) except Exception as exc: errors.append(exc) threads = [threading.Thread(target=write_one) for _ in range(50)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Unexpected errors in same-ID concurrent writes: {errors[:3]}" loaded = read_commit(repo, c.commit_id) assert loaded is not None assert loaded.commit_id == c.commit_id assert loaded.message == "concurrent-idempotent" def test_50_threads_distinct_ids_all_survive(self, repo: pathlib.Path) -> None: """50 threads writing distinct commit IDs must all persist without errors.""" errors: list[Exception] = [] def write_unique(i: int) -> None: # _make_commit uses compute_commit_id so the hash always matches content. c = _make_commit(repo, message=f"unique {i}", write=False) try: write_commit(repo, c) except Exception as exc: errors.append(exc) threads = [threading.Thread(target=write_unique, args=(i,)) for i in range(50)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Unexpected errors in distinct-ID concurrent writes: {errors[:3]}" commits = get_all_commits(repo) assert len(commits) == 50 # =========================================================================== # 12. Regression: WARNING→CRITICAL upgrade is permanent # =========================================================================== class TestRegressionCorruptLevelUpgrade: """Confirm the upgrade from WARNING to CRITICAL is permanent and precise.""" def test_corrupt_commit_logs_at_critical_not_warning( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: c = _make_commit(repo, message="level-upgrade-commit") _obj_path(repo, c.commit_id).write_bytes(b"trash") with caplog.at_level(logging.DEBUG, logger="muse.core.store"): read_commit(repo, c.commit_id) levels = [r.levelno for r in caplog.records] assert any(lvl == logging.CRITICAL for lvl in levels), ( f"Expected CRITICAL (50) but got levels: {levels}" ) assert not any(lvl == logging.WARNING for lvl in levels), ( "Must not downgrade corruption to WARNING — only CRITICAL is acceptable" ) def test_corrupt_snapshot_logs_at_critical_not_warning( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: s = _make_snapshot(repo, manifest={"snap-level.py": fake_id("oid-d")}) _obj_path(repo, s.snapshot_id).write_bytes(b"bad") with caplog.at_level(logging.DEBUG, logger="muse.core.store"): read_snapshot(repo, s.snapshot_id) levels = [r.levelno for r in caplog.records] assert any(lvl == logging.CRITICAL for lvl in levels) assert not any(lvl == logging.WARNING for lvl in levels) def test_get_all_commits_logs_corrupt_at_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: c = _make_commit(repo, message="level-upgrade-get-all") _obj_path(repo, c.commit_id).write_bytes(b"trash") with caplog.at_level(logging.DEBUG, logger="muse.core.store"): get_all_commits(repo) levels = [r.levelno for r in caplog.records] assert any(lvl == logging.CRITICAL for lvl in levels) assert not any(lvl == logging.WARNING for lvl in levels) def test_get_all_tags_logs_corrupt_at_critical( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: t = _make_tag(repo, "v-crit") _tag_path(repo, t.tag_id).write_bytes(b"trash") with caplog.at_level(logging.DEBUG, logger="muse.core.store"): get_all_tags(repo, _REPO_ID) levels = [r.levelno for r in caplog.records] assert any(lvl == logging.CRITICAL for lvl in levels) assert not any(lvl == logging.WARNING for lvl in levels)