"""I-1: Read-time SHA-256 verification in read_object. Every read now re-verifies the digest of the bytes returned against the object_id. This test suite: Unit — confirms clean objects pass, corrupt objects raise OSError. Unit — MAX_FILE_BYTES size limit enforcement. Unit — CRITICAL log emission on corruption. Integration — confirms the cat-object command surfaces the error. Integration — verify-pack catches bit-flipped LOCAL STORE objects. Integration — verify-object --all audits the full local store. Perf — 256 MiB object re-hash must complete within 500 ms on NVMe. Stress — bit-flips at every byte position; multi-bit and random fuzz. Regression — write→corrupt→read round-trip never silently returns bad data. """ from __future__ import annotations from collections.abc import Mapping type _ObjPayload = dict[str, str | bytes] import json import logging import os import pathlib import random import struct import tempfile import time from typing import TypedDict import msgpack import pytest from muse.core.object_store import ( objects_dir, object_path, read_object, write_object, ) from muse.core.paths import muse_dir from muse.core.validation import MAX_FILE_BYTES from muse.core.types import Manifest, blob_id, fake_id from tests.cli_test_helper import CliRunner, InvokeResult # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal .muse/ skeleton.""" muse_dir(tmp_path).mkdir() return tmp_path def _write(repo: pathlib.Path, data: bytes) -> str: oid = blob_id(data) write_object(repo, oid, data) return oid def _stored_path(repo: pathlib.Path, oid: str) -> pathlib.Path: return object_path(repo, oid) def _flip_bit(data: bytes, byte_idx: int, bit_idx: int) -> bytes: """Return *data* with one bit flipped at position (byte_idx, bit_idx).""" ba = bytearray(data) ba[byte_idx] ^= 1 << bit_idx return bytes(ba) def _corrupt_file(p: pathlib.Path, new_content: bytes) -> None: """Overwrite *p* with *new_content*, temporarily lifting the 0o444 guard. Object files are written with mode 0o444 (read-only) to enforce content-addressability at the OS level. Corruption tests must override that protection to simulate disk errors, cosmic-ray bit flips, etc. The permission is restored to 0o444 after the write. """ os.chmod(p, 0o644) try: p.write_bytes(new_content) finally: os.chmod(p, 0o444) def _corrupt_stored(repo: pathlib.Path, oid: str, byte_idx: int = 0, bit_idx: int = 0) -> None: """Flip one bit in the on-disk object file.""" p = _stored_path(repo, oid) data = p.read_bytes() _corrupt_file(p, _flip_bit(data, byte_idx, bit_idx)) def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: runner = CliRunner() env = {"MUSE_REPO_ROOT": str(repo)} return runner.invoke(None, ["cat-object", *args], env=env) # --------------------------------------------------------------------------- # Unit: happy path — clean objects always pass # --------------------------------------------------------------------------- class TestCleanObjectsPass: def test_empty_bytes(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write(repo, b"") assert read_object(repo, oid) == b"" def test_small_ascii(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"hello muse" oid = _write(repo, data) assert read_object(repo, oid) == data def test_binary_blob(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = bytes(range(256)) * 100 oid = _write(repo, data) assert read_object(repo, oid) == data def test_1_mib_object(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = os.urandom(1024 * 1024) oid = _write(repo, data) assert read_object(repo, oid) == data def test_read_twice_same_result(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"idempotent read" oid = _write(repo, data) assert read_object(repo, oid) == read_object(repo, oid) def test_absent_returns_none(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) absent = fake_id("absent") assert read_object(repo, absent) is None # --------------------------------------------------------------------------- # Unit: single-bit corruption always raises OSError # --------------------------------------------------------------------------- class TestSingleBitCorruption: def test_flip_first_byte_first_bit(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write(repo, b"critical data") _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_flip_first_byte_last_bit(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write(repo, b"critical data") _corrupt_stored(repo, oid, byte_idx=0, bit_idx=7) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_flip_last_byte(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"end matters too" oid = _write(repo, data) _corrupt_stored(repo, oid, byte_idx=len(data) - 1, bit_idx=3) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_flip_middle_byte(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"middle byte flip" oid = _write(repo, data) mid = len(data) // 2 _corrupt_stored(repo, oid, byte_idx=mid, bit_idx=4) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_error_message_contains_expected_prefix(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write(repo, b"check message content") _corrupt_stored(repo, oid, byte_idx=0, bit_idx=1) with pytest.raises(OSError) as exc_info: read_object(repo, oid) msg = str(exc_info.value) assert oid[:8] in msg assert "SHA-256" in msg or "integrity" in msg.lower() def test_error_suggests_verify_pack(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write(repo, b"suggest remedy") _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0) with pytest.raises(OSError) as exc_info: read_object(repo, oid) assert "verify-pack" in str(exc_info.value) def test_clean_sibling_unaffected(self, tmp_path: pathlib.Path) -> None: """Corruption of one object must not affect a sibling object.""" repo = _repo(tmp_path) oid_a = _write(repo, b"object a - clean") oid_b = _write(repo, b"object b - will corrupt") _corrupt_stored(repo, oid_b, byte_idx=0, bit_idx=0) # b is corrupt with pytest.raises(OSError): read_object(repo, oid_b) # a is still fine assert read_object(repo, oid_a) == b"object a - clean" def test_truncated_file_raises(self, tmp_path: pathlib.Path) -> None: """A file truncated to zero bytes is caught by the hash check.""" repo = _repo(tmp_path) data = b"will be truncated" oid = _write(repo, data) _corrupt_file(_stored_path(repo, oid), b"") with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_fully_zeroed_file_raises(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"zeroed out" oid = _write(repo, data) _corrupt_file(_stored_path(repo, oid), b"\x00" * len(data)) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_appended_byte_raises(self, tmp_path: pathlib.Path) -> None: """A byte appended to the end is caught.""" repo = _repo(tmp_path) data = b"exact bytes" oid = _write(repo, data) _corrupt_file(_stored_path(repo, oid), data + b"\xff") with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_prepended_byte_raises(self, tmp_path: pathlib.Path) -> None: """A byte prepended to the start is caught.""" repo = _repo(tmp_path) data = b"exact bytes" oid = _write(repo, data) _corrupt_file(_stored_path(repo, oid), b"\x00" + data) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) # --------------------------------------------------------------------------- # Stress: exhaustive single-bit sweep # --------------------------------------------------------------------------- class TestExhaustiveBitFlip: def test_every_bit_in_32_byte_object(self, tmp_path: pathlib.Path) -> None: """Every one of the 256 single-bit flips in a 32-byte payload is caught.""" repo = _repo(tmp_path) data = bytes(range(32)) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() caught = 0 for byte_idx in range(len(original)): for bit_idx in range(8): flipped = _flip_bit(original, byte_idx, bit_idx) _corrupt_file(p, flipped) try: read_object(repo, oid) # A flip that happens to produce a valid SHA-256 preimage # is theoretically impossible — if this branch is hit, fail. pytest.fail( f"Bit flip at byte={byte_idx} bit={bit_idx} " "was not caught — corrupt data returned silently" ) except OSError: caught += 1 finally: _corrupt_file(p, original) assert caught == len(original) * 8 @pytest.mark.slow def test_every_bit_in_4096_byte_object(self, tmp_path: pathlib.Path) -> None: """Every bit flip in a 4 KiB object is caught (32 768 checks).""" repo = _repo(tmp_path) data = os.urandom(4096) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() for byte_idx in range(len(original)): for bit_idx in range(8): flipped = _flip_bit(original, byte_idx, bit_idx) _corrupt_file(p, flipped) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) # --------------------------------------------------------------------------- # Stress: multi-bit and random fuzz # --------------------------------------------------------------------------- class TestFuzzCorruption: def test_5_random_bits_1000_iterations(self, tmp_path: pathlib.Path) -> None: """Random 5-bit corruption: zero silent passes in 1000 trials.""" repo = _repo(tmp_path) data = os.urandom(256) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() rng = random.Random(42) silent_passes = 0 for _ in range(1000): ba = bytearray(original) for _ in range(5): idx = rng.randrange(len(ba)) bit = rng.randrange(8) ba[idx] ^= 1 << bit _corrupt_file(p, bytes(ba)) try: read_object(repo, oid) silent_passes += 1 except OSError: pass finally: _corrupt_file(p, original) assert silent_passes == 0, f"{silent_passes} corrupt reads went undetected" def test_completely_random_content_1000_iterations(self, tmp_path: pathlib.Path) -> None: """Replacing the file with entirely random bytes is always caught.""" repo = _repo(tmp_path) data = os.urandom(128) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() rng = random.Random(99) for _ in range(1000): garbage = bytes(rng.randrange(256) for _ in range(len(original))) _corrupt_file(p, garbage) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) def test_struct_pack_corruption(self, tmp_path: pathlib.Path) -> None: """Struct-level 4-byte word corruption is always caught.""" repo = _repo(tmp_path) data = b"struct corruption test " * 10 oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() for word_offset in range(0, len(original) - 3, 4): ba = bytearray(original) # XOR one 32-bit word word = struct.unpack_from(">I", ba, word_offset)[0] struct.pack_into(">I", ba, word_offset, word ^ 0xDEADBEEF) _corrupt_file(p, bytes(ba)) with pytest.raises(OSError): read_object(repo, oid) _corrupt_file(p, original) # --------------------------------------------------------------------------- # Integration: cat-object surfaces the error # --------------------------------------------------------------------------- class TestCatObjectIntegration: def test_cat_clean_object_json(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"cat-object integration test" oid = _write(repo, data) r = _invoke(repo, "--json", oid) assert r.exit_code == 0 import json d = json.loads(r.output) assert d["object_id"] == oid def test_cat_corrupt_object_errors(self, tmp_path: pathlib.Path) -> None: """cat-object raw mode on a bit-flipped object must exit non-zero. The --json (info) mode only checks file existence/size — it intentionally does not read content. The raw mode MUST verify the hash before streaming any bytes to stdout. """ repo = _repo(tmp_path) data = b"will be corrupted for cat-object test" oid = _write(repo, data) _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0) # Raw mode (no --json) is the mode that reads and streams bytes. r = _invoke(repo, oid) assert r.exit_code != 0 def test_cat_corrupt_object_no_raw_data_in_output(self, tmp_path: pathlib.Path) -> None: """A corrupt object must NEVER have its raw bytes echoed to stdout.""" repo = _repo(tmp_path) sentinel = b"TOP_SECRET_PAYLOAD_MUST_NOT_LEAK" oid = _write(repo, sentinel) _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0) r = _invoke(repo, oid) # The sentinel string must not appear anywhere in stdout assert b"TOP_SECRET_PAYLOAD" not in r.stdout_bytes # --------------------------------------------------------------------------- # Regression: write → corrupt → read never returns bad data # --------------------------------------------------------------------------- class TestRegressionSilentCorruption: def test_concurrent_read_after_corruption(self, tmp_path: pathlib.Path) -> None: """Simulate a read race: write clean, corrupt disk, read — must raise.""" import threading repo = _repo(tmp_path) data = os.urandom(4096) oid = _write(repo, data) results: list[str] = [] def corrupt_then_read() -> None: _corrupt_stored(repo, oid, byte_idx=100, bit_idx=3) try: read_object(repo, oid) results.append("silent_pass") except OSError: results.append("caught") t = threading.Thread(target=corrupt_then_read) t.start() t.join() assert "silent_pass" not in results, "Corrupt data returned silently in thread" def test_large_object_stream_integrity(self, tmp_path: pathlib.Path) -> None: """16 MiB object: corruption in the second chunk boundary is caught.""" repo = _repo(tmp_path) # 16 MiB — forces multiple 64 KiB streaming chunks data = os.urandom(16 * 1024 * 1024) oid = _write(repo, data) p = _stored_path(repo, oid) original = p.read_bytes() # Corrupt a byte at the second chunk boundary (64 KiB + 1) _corrupt_file(p, _flip_bit(original, 65537, 0)) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) # Restore and confirm clean read works _corrupt_file(p, original) assert read_object(repo, oid) == data # --------------------------------------------------------------------------- # Gap 2: CRITICAL log emission on corruption # --------------------------------------------------------------------------- class TestCriticalLogOnCorruption: """Corruption must be logged at CRITICAL — agents parsing structured logs must receive a severity signal, not just a silent Python exception.""" def test_critical_logged_on_bit_flip( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: repo = _repo(tmp_path) data = b"must log at critical" oid = _write(repo, data) _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0) with caplog.at_level(logging.CRITICAL, logger="muse.core.object_store"): with pytest.raises(OSError): read_object(repo, oid) critical_records = [r for r in caplog.records if r.levelno >= logging.CRITICAL] assert critical_records, "No CRITICAL log emitted on bit-flip corruption" assert any(oid[:8] in r.getMessage() for r in critical_records), ( "CRITICAL log does not include the object ID" ) def test_critical_message_mentions_corruption( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: repo = _repo(tmp_path) data = b"critical message check" oid = _write(repo, data) _corrupt_stored(repo, oid, byte_idx=5, bit_idx=2) with caplog.at_level(logging.CRITICAL, logger="muse.core.object_store"): with pytest.raises(OSError): read_object(repo, oid) messages = " ".join(r.getMessage() for r in caplog.records) assert "corrupt" in messages.lower() or "integrity" in messages.lower(), ( f"CRITICAL log does not mention corruption: {messages!r}" ) def test_no_critical_on_clean_read( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """A clean read must NOT emit CRITICAL — no false alarms.""" repo = _repo(tmp_path) data = b"clean - no alarm" oid = _write(repo, data) with caplog.at_level(logging.CRITICAL, logger="muse.core.object_store"): result = read_object(repo, oid) assert result == data critical_records = [r for r in caplog.records if r.levelno >= logging.CRITICAL] assert critical_records == [], f"False CRITICAL alarm on clean read: {critical_records}" # --------------------------------------------------------------------------- # Gap 3: MAX_FILE_BYTES size limit enforcement # --------------------------------------------------------------------------- class TestMaxFileBytesLimit: """read_object must reject objects that exceed MAX_FILE_BYTES before reading their content into memory — preventing OOM on pathological input.""" def test_oversized_object_raises_oserror(self, tmp_path: pathlib.Path) -> None: """A file exceeding MAX_FILE_BYTES raises OSError before any read.""" from unittest.mock import patch as _patch, MagicMock repo = _repo(tmp_path) data = b"placeholder" oid = _write(repo, data) # Inject a fake stat result with an inflated st_size so we don't # need to allocate gigabytes of real data. mock_stat = MagicMock() mock_stat.st_size = MAX_FILE_BYTES + 1 with _patch.object(pathlib.Path, "stat", return_value=mock_stat): with pytest.raises(OSError, match="MiB read limit"): read_object(repo, oid) def test_exactly_max_size_allowed(self, tmp_path: pathlib.Path) -> None: """An object exactly at MAX_FILE_BYTES must be readable (boundary check).""" from unittest.mock import patch as _patch, MagicMock repo = _repo(tmp_path) data = b"boundary" oid = _write(repo, data) # st_size == MAX_FILE_BYTES: the guard is strict greater-than, so this # should not fire. The actual (small) file is then read and verified. mock_stat = MagicMock() mock_stat.st_size = MAX_FILE_BYTES with _patch.object(pathlib.Path, "stat", return_value=mock_stat): result = read_object(repo, oid) assert result == data def test_error_message_includes_mib_limit(self, tmp_path: pathlib.Path) -> None: """The OSError message must include the configured MiB limit.""" from unittest.mock import patch as _patch, MagicMock repo = _repo(tmp_path) data = b"size limit error msg" oid = _write(repo, data) mock_stat = MagicMock() mock_stat.st_size = MAX_FILE_BYTES + 1024 with _patch.object(pathlib.Path, "stat", return_value=mock_stat): with pytest.raises(OSError) as exc_info: read_object(repo, oid) assert "MiB" in str(exc_info.value), ( f"Error message does not include MiB limit: {exc_info.value}" ) # --------------------------------------------------------------------------- # Gap 4: verify-pack catches bit-flipped LOCAL STORE objects # --------------------------------------------------------------------------- def _full_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Create a minimal repo with objects, HEAD and config for CLI invocation.""" dot_muse = muse_dir(tmp_path) dot_muse.mkdir() for d in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") (dot_muse / "repo.json").write_text( '{"repo_id": "test-repo", "domain": "code", "default_branch": "main"}', encoding="utf-8", ) return tmp_path class _BundleSnapEntry(TypedDict, total=False): snapshot_id: str manifest: Manifest _BUNDLE_META = { "mode": "full", "base_commits": [], "created_at": "2026-01-01T00:00:00Z", } class _BundleDict(TypedDict): objects: list[Mapping[str, str | bytes]] snapshots: list[_BundleSnapEntry] commits: list[Mapping[str, str]] meta: Mapping[str, object] def _good_bundle_obj(data: bytes) -> _ObjPayload: oid = blob_id(data) return {"object_id": oid, "content": data} def _make_pack(objects: list[_ObjPayload]) -> bytes: mpack: _BundleDict = {"blobs": objects, "snapshots": [], "commits": [], "meta": _BUNDLE_META} packed: bytes = msgpack.packb(mpack, use_bin_type=True) return packed def _snap_bundle(snap_id: str, manifest: Manifest) -> bytes: """Build a mpack whose snapshot references objects in the LOCAL STORE.""" mpack: _BundleDict = { "blobs": [], "snapshots": [{"snapshot_id": snap_id, "manifest": manifest}], "commits": [], "meta": _BUNDLE_META, } packed: bytes = msgpack.packb(mpack, use_bin_type=True) return packed class TestVerifyPackLocalStoreIntegrity: """verify-pack must catch SHA-256 mismatches in LOCAL STORE objects that are referenced by mpack snapshots — not just objects inside the mpack. Before the fix: has_object() (existence check only) was used. After the fix: read_object() (hash-verified read) is used — a bit-flipped local store object is reported as a failure. """ def _vp( self, repo: pathlib.Path, extra_args: list[str], env_root: pathlib.Path | None = None, ) -> "InvokeResult": env_root = env_root or repo runner = CliRunner() return runner.invoke(None, ["verify-pack", "--json"] + extra_args, env={"MUSE_REPO_ROOT": str(env_root)}) def test_clean_local_store_object_passes(self, tmp_path: pathlib.Path) -> None: """A snapshot referencing a clean local store object must verify OK.""" repo = _full_repo(tmp_path) data = b"clean local object" oid = blob_id(data) write_object(repo, oid, data) snap_id = fake_id("snap1") bf = tmp_path / "mpack.muse" bf.write_bytes(_snap_bundle(snap_id, {"file.py": oid})) r = self._vp(repo, ["--file", str(bf)]) assert r.exit_code == 0, f"Expected 0 but got {r.exit_code}: {r.output}" d = json.loads(r.output) assert d["all_ok"] is True def test_bit_flipped_local_store_object_fails(self, tmp_path: pathlib.Path) -> None: """A snapshot referencing a bit-flipped local store object must fail. This is the core regression test for the has_object→read_object fix. Before the fix, verify-pack would report all_ok=True even when the local store contained a corrupt object. """ repo = _full_repo(tmp_path) data = b"will be bit-flipped in local store" oid = blob_id(data) write_object(repo, oid, data) # Flip a bit in the locally stored object. stored = object_path(repo, oid) original = stored.read_bytes() _corrupt_file(stored, _flip_bit(original, 0, 0)) snap_id = fake_id("snap2") bf = tmp_path / "mpack.muse" bf.write_bytes(_snap_bundle(snap_id, {"code.py": oid})) r = self._vp(repo, ["--file", str(bf)]) assert r.exit_code != 0, ( "verify-pack reported all_ok=True on a bit-flipped local store object " "(regression: has_object() was used instead of read_object())" ) d = json.loads(r.output) assert d["all_ok"] is False assert any( "integrity" in f["error"].lower() or "sha-256" in f["error"].lower() for f in d["failures"] ), f"No integrity error in failures: {d['failures']}" def test_zeroed_local_store_object_fails(self, tmp_path: pathlib.Path) -> None: """Zeroing the stored file content is caught by verify-pack.""" repo = _full_repo(tmp_path) data = b"will be zeroed" oid = blob_id(data) write_object(repo, oid, data) _corrupt_file(object_path(repo, oid), b"\x00" * len(data)) snap_id = fake_id("snap3") bf = tmp_path / "mpack.muse" bf.write_bytes(_snap_bundle(snap_id, {"z.py": oid})) r = self._vp(repo, ["--file", str(bf)]) d = json.loads(r.output) assert d["all_ok"] is False def test_no_local_flag_skips_local_check(self, tmp_path: pathlib.Path) -> None: """--no-local skips local store checks entirely — corrupt local object not reported.""" repo = _full_repo(tmp_path) data = b"corrupt but skipped" oid = blob_id(data) write_object(repo, oid, data) stored = object_path(repo, oid) _corrupt_file(stored, _flip_bit(stored.read_bytes(), 0, 0)) snap_id = fake_id("snap4") bf = tmp_path / "mpack.muse" bf.write_bytes(_snap_bundle(snap_id, {"s.py": oid})) r = self._vp(repo, ["--file", str(bf), "--no-local"]) # --no-local skips the integrity check; the object is "missing" from # the mpack (not present in bundle_object_ids) and local check is skipped, # so the snapshot manifest entry reports a missing object — not a corruption. d = json.loads(r.output) # Either all_ok (if no manifest check happens) or a "missing" failure — # crucially NOT an integrity/SHA-256 failure. if not d["all_ok"]: for f in d["failures"]: assert "integrity" not in f["error"].lower(), ( "--no-local should not report integrity failures" ) # --------------------------------------------------------------------------- # Gap 5: verify-object --all audits the full local store # --------------------------------------------------------------------------- class TestVerifyObjectAllCorrupt: """muse verify-object --all must surface bit-flipped objects across the entire local store — not just objects passed on the CLI.""" def _vobj( self, repo: pathlib.Path, args: list[str] ) -> "InvokeResult": runner = CliRunner() return runner.invoke( None, ["verify-object", "--json"] + args, env={"MUSE_REPO_ROOT": str(repo)}, ) def test_verify_all_clean_store_passes(self, tmp_path: pathlib.Path) -> None: repo = _full_repo(tmp_path) for i in range(5): write_object(repo, blob_id(f"obj{i}".encode()), f"obj{i}".encode()) r = self._vobj(repo, ["--all"]) assert r.exit_code == 0 d = json.loads(r.output) assert d["all_ok"] is True assert d["checked"] == 5 def test_verify_all_catches_single_bit_flip(self, tmp_path: pathlib.Path) -> None: """--all must detect a bit-flip in one object among many clean ones.""" repo = _full_repo(tmp_path) oids: list[str] = [] for i in range(10): data = f"object-{i}".encode() oid = blob_id(data) write_object(repo, oid, data) oids.append(oid) # Corrupt exactly one object. corrupt_oid = oids[4] p = object_path(repo, corrupt_oid) _corrupt_file(p, _flip_bit(p.read_bytes(), 0, 0)) r = self._vobj(repo, ["--all"]) assert r.exit_code != 0 d = json.loads(r.output) assert d["all_ok"] is False assert d["failed"] == 1 assert any(res["object_id"] == corrupt_oid for res in d["results"] if not res["ok"]) def test_verify_all_catches_multiple_corruptions(self, tmp_path: pathlib.Path) -> None: """--all must catch ALL corrupt objects, not just the first.""" repo = _full_repo(tmp_path) oids: list[str] = [] for i in range(6): data = f"multi-corrupt-{i}".encode() oid = blob_id(data) write_object(repo, oid, data) oids.append(oid) # Corrupt three objects. corrupt = {oids[0], oids[2], oids[5]} for oid in corrupt: p = object_path(repo, oid) _corrupt_file(p, _flip_bit(p.read_bytes(), 0, 0)) r = self._vobj(repo, ["--all"]) d = json.loads(r.output) assert d["all_ok"] is False failed_ids = {res["object_id"] for res in d["results"] if not res["ok"]} assert failed_ids == corrupt, f"Expected {corrupt}, got {failed_ids}" def test_verify_explicit_id_corrupt(self, tmp_path: pathlib.Path) -> None: """Passing a corrupt object ID explicitly must detect the failure.""" repo = _full_repo(tmp_path) data = b"explicit check" oid = blob_id(data) write_object(repo, oid, data) p = object_path(repo, oid) _corrupt_file(p, _flip_bit(p.read_bytes(), 3, 1)) r = self._vobj(repo, [oid]) assert r.exit_code != 0 d = json.loads(r.output) assert d["all_ok"] is False # --------------------------------------------------------------------------- # Gap 6: Performance benchmark — 256 MiB re-hash < 500 ms # --------------------------------------------------------------------------- class TestPerformanceBenchmark: """The read-time re-hash must not introduce unacceptable latency. Plan requirement: overhead of re-hashing a 256 MiB object must be < 500 ms on modern hardware (streaming SHA-256 is I/O-bound, not CPU-bound — the bottleneck is NVMe throughput, not the hash itself). This test writes to a tmpfs / in-memory filesystem (tmp_path) so it measures pure CPU and memory bandwidth, which is the lower bound. Real NVMe latency may be higher but SHA-256 itself adds < 50 ms for 256 MiB on any modern CPU. """ @pytest.mark.perf def test_256_mib_hash_under_500ms(self, tmp_path: pathlib.Path) -> None: """read_object on a 256 MiB blob must complete within 500 ms.""" repo = _repo(tmp_path) size = 256 * 1024 * 1024 - 64 # leave room for the blob header data = os.urandom(size) oid = _write(repo, data) start = time.perf_counter() result = read_object(repo, oid) duration_ms = (time.perf_counter() - start) * 1000 assert result == data, "256 MiB object content corrupted" assert duration_ms < 500, ( f"read_object re-hash took {duration_ms:.1f} ms on a 256 MiB object " f"— exceeds the 500 ms budget. SHA-256 performance regression detected." ) @pytest.mark.perf def test_1_mib_hash_under_10ms(self, tmp_path: pathlib.Path) -> None: """1 MiB object hash must be sub-10 ms — baseline for small commits.""" repo = _repo(tmp_path) data = os.urandom(1024 * 1024) oid = _write(repo, data) start = time.perf_counter() result = read_object(repo, oid) duration_ms = (time.perf_counter() - start) * 1000 assert result == data assert duration_ms < 10, ( f"1 MiB read_object took {duration_ms:.1f} ms — performance regression" )