"""I-4: Store file size limit — prevent OOM from oversized store files. Problem (pre-fix): ``_read_msgpack`` called ``path.read_bytes()`` with no size guard. A 10 GiB corrupt or adversarially crafted store file would allocate 10 GiB of RAM, crashing the process or triggering the OOM killer — a critical data-integrity and availability failure. ``read_object`` in the object store already had a 256 MiB cap. The commit, snapshot, tag, release, shelf, and index stores did not. Fix: added to both ``muse/core/store.py`` and ``muse/core/indices.py``: 1. ``MAX_MSGPACK_BYTES = 64 MiB`` — ``stat().st_size`` is checked *before* ``read_bytes()`` so no allocation ever occurs. The constant name is legacy; it also guards the new JSON/git-header store files. 2. Per-value limits on msgpack wire reads — ``max_str_len``, ``max_bin_len``, ``max_array_len``, ``max_map_len`` — prevent deeply nested or pathologically large single-value documents from consuming unbounded memory even within the size cap. This file proves every aspect of the fix: Tier 0 — constant export Low-level — stat check before read (OOM prevention) High-level — per-value unpack limits Tier 3 — all high-level read functions (read_commit, read_snapshot, …) Tier 4 — index file protection Tier 5 — CLI command (clean JSON error, no traceback) Tier 6 — boundary / exact-limit behaviour Tier 7 — performance (size check adds < 1 ms overhead) Tier 8 — warning log on oversized file """ from __future__ import annotations import datetime import logging import pathlib import time from unittest.mock import patch, MagicMock import msgpack import pytest from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.object_store import object_path as _obj_path from muse.core.io import MAX_MSGPACK_BYTES from muse.core.types import MsgpackValue from muse.core.commits import ( CommitRecord, read_commit, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, read_snapshot, write_snapshot, ) from muse.core.tags import ( TagRecord, get_all_tags, write_tag, ) from muse.core.releases import list_releases from muse.core.types import Manifest, MsgpackDict, fake_id from muse.core.indices import ( load_symbol_history, load_hash_occurrence, ) from muse.core.paths import commits_dir, indices_dir, muse_dir, releases_dir, snapshots_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _REPO_ID = fake_id("test-repo") def _repo(tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) (muse / "commits").mkdir(parents=True) (muse / "snapshots").mkdir() (muse / "tags").mkdir() (muse / "releases").mkdir() (muse / "indices").mkdir() (muse / "refs" / "heads").mkdir(parents=True) (muse / "HEAD").write_text("ref: refs/heads/main\n") (muse / "repo.json").write_text(f'{{"repo_id": "{_REPO_ID}"}}\n') return tmp_path def _commit(idx: int = 0) -> CommitRecord: snapshot_id = compute_snapshot_id({}) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) message = f"commit {idx}" commit_id = compute_commit_id( parent_ids=[], snapshot_id=snapshot_id, message=message, committed_at_iso=committed_at.isoformat(), author="tester", ) return CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snapshot_id, message=message, committed_at=committed_at, author="tester", parent_commit_id=None, parent2_commit_id=None, ) def _snapshot(idx: int = 0) -> SnapshotRecord: manifest: Manifest = {f"__idx__": fake_id(f"snap-{idx}")} sid = compute_snapshot_id(manifest) return SnapshotRecord( snapshot_id=sid, manifest=manifest, ) def _tag(idx: int = 0) -> TagRecord: return TagRecord( repo_id=_REPO_ID, tag_id=fake_id(f"tag-id-{idx}"), commit_id=fake_id(f"tag-commit-{idx}"), tag=f"v{idx}.0.0", ) # --------------------------------------------------------------------------- # Tier 0 — constant export # --------------------------------------------------------------------------- class TestConstantExport: """MAX_MSGPACK_BYTES must be importable and have the correct value. The constant name is legacy (predates the JSON migration); it also guards the new git-header+JSON store files and legacy shelf .msgpack files. """ def test_max_msgpack_bytes_is_exported(self) -> None: from muse.core.io import MAX_MSGPACK_BYTES as cap assert cap == 64 * 1024 * 1024, ( f"Expected 64 MiB (67108864), got {cap}" ) def test_max_msgpack_bytes_is_int(self) -> None: assert isinstance(MAX_MSGPACK_BYTES, int) def test_max_msgpack_bytes_less_than_256mib(self) -> None: """Store records should be capped well below 256 MiB.""" assert MAX_MSGPACK_BYTES < 256 * 1024 * 1024, ( "Store records should be capped below the object store's 256 MiB limit" ) # --------------------------------------------------------------------------- # Low-level — stat check fires BEFORE read_bytes (the OOM prevention) # --------------------------------------------------------------------------- class TestStatCheckBeforeRead: """The size guard must fire before any read_bytes() call. We prove this by mocking stat to report an oversized file while keeping the actual file tiny — if read_bytes() were called first, we would NOT trigger the OSError from the stat check. """ def _oversized_stat(self, real_path: pathlib.Path) -> MagicMock: """Return a MagicMock that reports st_size = MAX_MSGPACK_BYTES + 1.""" stat_result = MagicMock() stat_result.st_size = MAX_MSGPACK_BYTES + 1 return stat_result def test_read_commit_corrupt_object_returns_none( self, tmp_path: pathlib.Path ) -> None: """Commit object store file with corrupt content causes read_commit to return None. The stat-before-read guard existed in the old msgpack store; in the unified object store, any corrupt/unreadable content causes graceful failure. """ root = _repo(tmp_path) c = _commit(0) write_commit(root, c) # Overwrite the object file with garbage — no valid muse object header _obj_path(root, c.commit_id).write_bytes(b"not-valid-content") result = read_commit(root, c.commit_id) assert result is None, "read_commit must return None for corrupt object" def test_read_snapshot_corrupt_object_returns_none( self, tmp_path: pathlib.Path ) -> None: """Snapshot object store file with corrupt content causes read_snapshot to return None.""" root = _repo(tmp_path) s = _snapshot(0) write_snapshot(root, s) _obj_path(root, s.snapshot_id).write_bytes(b"not-valid-content") result = read_snapshot(root, s.snapshot_id) assert result is None # --------------------------------------------------------------------------- # High-level — high-level read functions return None for oversized files # --------------------------------------------------------------------------- class TestReadFunctionsReturnNoneOnOversize: """All public read functions must gracefully handle oversized files. We patch MAX_MSGPACK_BYTES to a small value so we can create real files that exceed it without writing gigabytes to disk. """ def test_read_commit_returns_none_for_corrupt_object( self, tmp_path: pathlib.Path ) -> None: """read_commit returns None (not raises) for corrupt object store content. The old msgpack-based size limit (MAX_MSGPACK_BYTES) is superseded by the unified object store; any corrupt content triggers graceful failure. """ root = _repo(tmp_path) c = _commit(1) write_commit(root, c) # Overwrite with large garbage — no valid muse object header _obj_path(root, c.commit_id).write_bytes(b"\x00" * 200) result = read_commit(root, c.commit_id) assert result is None, "read_commit must return None, not raise, for corrupt object" def test_read_snapshot_returns_none_for_corrupt_object( self, tmp_path: pathlib.Path ) -> None: """read_snapshot returns None for corrupt object store content.""" root = _repo(tmp_path) s = _snapshot(1) write_snapshot(root, s) _obj_path(root, s.snapshot_id).write_bytes(b"\x00" * 200) result = read_snapshot(root, s.snapshot_id) assert result is None def test_get_all_tags_skips_oversized_files( self, tmp_path: pathlib.Path ) -> None: """get_all_tags iterates all tag files — oversized ones are skipped.""" root = _repo(tmp_path) good = _tag(0) bad = _tag(1) write_tag(root, good) write_tag(root, bad) # A real tag record is ~200 bytes packed (64-char IDs + timestamp). # Choose a limit above a real tag but below our inflated bad file. from muse.core.tags import tag_path good_path = tag_path(root, _REPO_ID, good.tag_id) real_size = good_path.stat().st_size test_limit = real_size * 2 # real tag fits; we'll inflate the bad tag to 3× bad_path = tag_path(root, _REPO_ID, bad.tag_id) bad_path.write_bytes(b"\x00" * (real_size * 3)) # definitely exceeds limit with patch("muse.core.io.MAX_MSGPACK_BYTES", test_limit): tags = get_all_tags(root, _REPO_ID) tag_ids = {t.tag_id for t in tags} assert good.tag_id in tag_ids, "Good tag was incorrectly dropped" assert bad.tag_id not in tag_ids, "Oversized tag was not skipped" def test_list_releases_skips_oversized_files( self, tmp_path: pathlib.Path ) -> None: """list_releases must skip oversized release files.""" root = _repo(tmp_path) from muse.core.types import split_id r_algo, r_hex = split_id(_REPO_ID) rel_dir = releases_dir(root) / r_algo / r_hex rel_dir.mkdir(parents=True) # Write a fake oversized release file. fake_release = rel_dir / f"{'a' * 64}.msgpack" fake_release.write_bytes(b"\x00" * 101) with patch("muse.core.io.MAX_MSGPACK_BYTES", 100): results = list_releases(root, _REPO_ID) assert results == [], "Oversized release should be skipped, not crash" # --------------------------------------------------------------------------- # Tier 3 — exact boundary behaviour # --------------------------------------------------------------------------- class TestExactBoundary: """At the boundary: MAX_MSGPACK_BYTES is the last allowed size.""" def test_file_exactly_at_limit_is_read(self, tmp_path: pathlib.Path) -> None: """A file of exactly MAX_MSGPACK_BYTES bytes passes the size check. The content may be unparseable (zeros are not valid msgpack), but the OSError raised is a parse error, not a size-limit error. """ test_limit = 256 # small limit for test speed path = tmp_path / "exactly_at_limit.msgpack" path.write_bytes(b"\x00" * test_limit) with patch("muse.core.io.MAX_MSGPACK_BYTES", test_limit): # Should raise a parse error (invalid msgpack), NOT an OSError about size. from muse.core.io import _read_msgpack try: _read_msgpack(path) pytest.fail("Expected an error for invalid msgpack content") except OSError as exc: assert "MiB read limit" not in str(exc), ( f"Got size-limit OSError at the boundary — should be parse error: {exc}" ) except Exception: pass # Any non-size-limit error is acceptable here def test_file_one_byte_over_limit_raises_oslimit_error( self, tmp_path: pathlib.Path ) -> None: """A file of MAX_MSGPACK_BYTES + 1 bytes raises OSError before reading.""" test_limit = 256 path = tmp_path / "one_over.msgpack" path.write_bytes(b"\x00" * (test_limit + 1)) with patch("muse.core.io.MAX_MSGPACK_BYTES", test_limit): from muse.core.io import _read_msgpack with pytest.raises(OSError, match="read limit"): _read_msgpack(path) def test_zero_byte_file_does_not_trigger_size_limit( self, tmp_path: pathlib.Path ) -> None: """An empty file passes the size check but fails msgpack parse.""" path = tmp_path / "empty.msgpack" path.write_bytes(b"") from muse.core.io import _read_msgpack with pytest.raises(Exception): # parse error, not size error _read_msgpack(path) def test_size_limit_error_message_includes_filename_and_limit( self, tmp_path: pathlib.Path ) -> None: """The OSError message must include the file name and limit in MiB.""" test_limit = 1024 # 1 KiB for test speed path = tmp_path / "big.msgpack" path.write_bytes(b"\x00" * (test_limit + 1)) with patch("muse.core.io.MAX_MSGPACK_BYTES", test_limit): from muse.core.io import _read_msgpack with pytest.raises(OSError) as exc_info: _read_msgpack(path) msg = str(exc_info.value) assert "big.msgpack" in msg, f"Filename missing from error: {msg}" assert "KiB" in msg or "MiB" in msg or "bytes" in msg, ( f"Size info missing from error: {msg}" ) # --------------------------------------------------------------------------- # Tier 4 — per-value unpack limits # --------------------------------------------------------------------------- class TestPerValueUnpackLimits: """Verify that per-value limits from msgpack.unpackb are enforced.""" def _pack_to_path(self, tmp_path: pathlib.Path, data: MsgpackValue) -> pathlib.Path: path = tmp_path / "test.msgpack" path.write_bytes(msgpack.packb(data, use_bin_type=True)) return path def test_string_exceeding_max_str_len_rejected(self, tmp_path: pathlib.Path) -> None: """A string longer than _MSGPACK_MAX_STR_LEN must raise an exception.""" huge_str = "x" * 200 path = self._pack_to_path(tmp_path, {"key": huge_str}) from muse.core.io import _read_msgpack with patch("muse.core.io._MSGPACK_MAX_STR_LEN", 100): with pytest.raises(Exception): _read_msgpack(path) def test_string_within_max_str_len_accepted(self, tmp_path: pathlib.Path) -> None: """A string within the limit unpacks normally.""" path = self._pack_to_path(tmp_path, {"key": "short"}) from muse.core.io import _read_msgpack result = _read_msgpack(path) assert isinstance(result, dict) def test_binary_blob_rejected_in_store_records(self, tmp_path: pathlib.Path) -> None: """Binary data (msgpack bin type) must be rejected for store records. Commit/snapshot/tag records contain no binary fields. A file with binary data is either corrupt or tampered. max_bin_len=0 ensures this is caught immediately during unpack rather than producing a ``bytes`` value that callers are not prepared to handle. """ path = self._pack_to_path(tmp_path, {"body": b"some binary blob"}) from muse.core.io import _read_msgpack # max_bin_len=0 means any bin-type value raises an error. with pytest.raises(Exception): _read_msgpack(path) def test_map_exceeding_max_map_len_rejected(self, tmp_path: pathlib.Path) -> None: """A map with more than _MSGPACK_MAX_MAP_LEN entries must raise.""" big_map: MsgpackDict = {str(i): i for i in range(200)} path = self._pack_to_path(tmp_path, big_map) from muse.core.io import _read_msgpack with patch("muse.core.io._MSGPACK_MAX_MAP_LEN", 100): with pytest.raises(Exception): _read_msgpack(path) def test_array_exceeding_max_array_len_rejected(self, tmp_path: pathlib.Path) -> None: """An array with more than _MSGPACK_MAX_ARRAY_LEN entries must raise.""" big_list: list[MsgpackValue] = list(range(200)) path = self._pack_to_path(tmp_path, big_list) from muse.core.io import _read_msgpack with patch("muse.core.io._MSGPACK_MAX_ARRAY_LEN", 100): with pytest.raises(Exception): _read_msgpack(path) def _make_deep_nested_msgpack(self, depth: int) -> bytes: """Build msgpack bytes for a *depth*-deep nested dict without Python recursion. ``msgpack.packb`` uses Python-level recursion so packing a 600-deep dict hits the default recursion limit. We build the bytes directly: fixmap(1) fixstr("x") fixmap(1) fixstr("x") ... fixmap(0) Each level is 3 bytes: ``0x81`` (fixmap 1 entry) + ``0xa1 0x78`` (fixstr "x"). The leaf is ``0x80`` (fixmap 0 entries). This produces a valid msgpack binary that ``unpackb`` will parse up to its stack limit and then raise ``StackError``. """ # 0x81 = fixmap with 1 item; 0xa1 0x78 = fixstr "x" frame = b"\x81\xa1x" leaf = b"\x80" # fixmap with 0 items return frame * depth + leaf def test_deeply_nested_map_raises_stack_error(self, tmp_path: pathlib.Path) -> None: """A pathologically nested document hits msgpack's StackError. At extreme depth (10 000 levels), msgpack's C-extension stack limit is exceeded and an exception is raised. The file is only ~30 KiB so the size check passes; the protection comes from msgpack's internal stack guard, not the 64 MiB cap. """ packed = self._make_deep_nested_msgpack(10_000) path = tmp_path / "deep_nest.msgpack" path.write_bytes(packed) from muse.core.io import _read_msgpack with pytest.raises(Exception): # msgpack.exceptions.StackError _read_msgpack(path) def test_deeply_nested_terminates_quickly(self, tmp_path: pathlib.Path) -> None: """The StackError for deeply nested documents is raised in < 1 second.""" packed = self._make_deep_nested_msgpack(10_000) path = tmp_path / "deep_nest_perf.msgpack" path.write_bytes(packed) from muse.core.io import _read_msgpack start = time.perf_counter() try: _read_msgpack(path) except Exception: pass elapsed = time.perf_counter() - start assert elapsed < 1.0, ( f"Deeply nested document took {elapsed:.3f}s to fail — not fast enough" ) def test_valid_large_map_within_limits_is_accepted(self, tmp_path: pathlib.Path) -> None: """A large but within-limit map (simulating a 1k-file snapshot) unpacks cleanly.""" # Simulate a 1000-file snapshot manifest: {path: object_id} manifest = {f"src/file_{i:04d}.py": fake_id(f"obj-{i}") for i in range(1000)} path = tmp_path / "big_valid.msgpack" path.write_bytes(msgpack.packb(manifest, use_bin_type=True)) from muse.core.io import _read_msgpack result = _read_msgpack(path) assert isinstance(result, dict) assert len(result) == 1000 # --------------------------------------------------------------------------- # Tier 5 — index file protection # --------------------------------------------------------------------------- class TestIndexReadProtection: """muse/core/indices.py has its own _read_msgpack — must also be protected.""" def test_load_symbol_history_skips_oversized_index( self, tmp_path: pathlib.Path ) -> None: """An oversized symbol history index returns an empty dict, not OOM.""" (indices_dir(tmp_path)).mkdir(parents=True) index_path = indices_dir(tmp_path) / "symbol_history.msgpack" index_path.write_bytes(b"\x00" * 101) with patch("muse.core.indices._MAX_INDEX_BYTES", 100): result = load_symbol_history(tmp_path) assert result == {}, "Oversized index must return empty dict, not crash" def test_load_hash_occurrence_skips_oversized_index( self, tmp_path: pathlib.Path ) -> None: """An oversized hash_occurrence index returns an empty dict.""" (indices_dir(tmp_path)).mkdir(parents=True) index_path = indices_dir(tmp_path) / "hash_occurrence.msgpack" index_path.write_bytes(b"\x00" * 101) with patch("muse.core.indices._MAX_INDEX_BYTES", 100): result = load_hash_occurrence(tmp_path) assert result == {} def test_index_size_limit_is_more_generous_than_store(self) -> None: """Index files are allowed to be larger than store records.""" from muse.core.indices import _MAX_INDEX_BYTES assert _MAX_INDEX_BYTES > MAX_MSGPACK_BYTES, ( "Index limit should be larger than store limit — indices grow with repo size" ) def test_index_read_checks_stat_before_read_bytes( self, tmp_path: pathlib.Path ) -> None: """The index stat check must fire before read_bytes (no allocation).""" (indices_dir(tmp_path)).mkdir(parents=True) index_path = indices_dir(tmp_path) / "symbol_history.msgpack" index_path.write_bytes(b"\x85") # 1 byte — well within any size limit read_bytes_called = [False] real_rb = index_path.read_bytes def tracking_rb() -> bytes: read_bytes_called[0] = True return real_rb() stat_result = MagicMock() stat_result.st_size = 1024 * 1024 * 1024 # 1 GiB — way over limit with patch.object(type(index_path), "stat", return_value=stat_result): with patch.object(type(index_path), "read_bytes", tracking_rb): result = load_symbol_history(tmp_path) assert result == {} assert not read_bytes_called[0], "read_bytes was called before the stat check!" # --------------------------------------------------------------------------- # Tier 6 — warning log on oversized file # --------------------------------------------------------------------------- class TestWarningLogOnOversizedFile: """Operators need to know when oversized files are detected. read_commit / read_snapshot log a WARNING when they catch the OSError from _read_msgpack — this surfaces corruption or tampering in monitoring. """ def test_warning_logged_for_corrupt_commit( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """CRITICAL is logged when a corrupt commit object is detected. The old msgpack size-limit guard produced WARNING; the unified object store produces CRITICAL for any corrupt content (consistent with read_commit behavior). """ root = _repo(tmp_path) c = _commit(10) write_commit(root, c) _obj_path(root, c.commit_id).write_bytes(b"\x00" * 51) with caplog.at_level(logging.WARNING, logger="muse.core.store"): result = read_commit(root, c.commit_id) assert result is None assert any( "Corrupt" in rec.message or "corrupt" in rec.message for rec in caplog.records ), f"No log for corrupt commit. Records: {[r.message for r in caplog.records]}" def test_warning_logged_for_corrupt_snapshot( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """CRITICAL is logged when a corrupt snapshot object is detected.""" root = _repo(tmp_path) s = _snapshot(10) write_snapshot(root, s) _obj_path(root, s.snapshot_id).write_bytes(b"\x00" * 51) with caplog.at_level(logging.WARNING, logger="muse.core.store"): result = read_snapshot(root, s.snapshot_id) assert result is None assert any( "Corrupt" in rec.message or "corrupt" in rec.message for rec in caplog.records ), f"No log for corrupt snapshot. Records: {[r.message for r in caplog.records]}" # --------------------------------------------------------------------------- # Tier 7 — CLI: clean JSON error, no traceback # --------------------------------------------------------------------------- class TestPlumbingReadCommitOversized: """muse read-commit with an oversized commit file must produce a clean, machine-readable JSON error — no Python traceback, no process crash. """ def test_corrupt_commit_produces_json_error_not_traceback( self, tmp_path: pathlib.Path ) -> None: """write a commit, corrupt its object store file, run read-commit — must get JSON error.""" import json import sys from tests.cli_test_helper import CliRunner root = _repo(tmp_path) c = _commit(99) write_commit(root, c) # Corrupt the commit object file (unified store). _obj_path(root, c.commit_id).write_bytes(b"\x00" * 101) runner = CliRunner() result = runner.invoke(None, ["read-commit", c.commit_id], env={"MUSE_REPO_ROOT": str(root)}) # Must not crash (exit code may be non-zero, but not a Python traceback). assert "Traceback" not in (result.output or ""), ( f"CLI produced a Python traceback for oversized commit:\n{result.output}" ) assert "Traceback" not in (result.stderr or ""), ( f"CLI stderr has a Python traceback:\n{result.stderr}" ) # The error output must be valid JSON (or include a meaningful error). combined = (result.output or "") + (result.stderr or "") try: # Check if any JSON blob exists in the output. for line in combined.splitlines(): line = line.strip() if line.startswith("{"): parsed = json.loads(line) assert "error" in parsed, f"JSON lacks 'error' key: {parsed}" break else: # If no JSON line found, at minimum confirm no traceback and # that "not found" or "error" appears in the output. assert ( "not found" in combined.lower() or "error" in combined.lower() ), f"No useful error in CLI output:\n{combined}" except json.JSONDecodeError as exc: pytest.fail(f"Output is not valid JSON: {exc}\nOutput:\n{combined}") # --------------------------------------------------------------------------- # Tier 8 — round-trip: valid files still read correctly # --------------------------------------------------------------------------- class TestValidFilesUnaffected: """The size guard must not regress normal reads.""" def test_read_commit_roundtrip_unaffected(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) c = _commit(42) write_commit(root, c) got = read_commit(root, c.commit_id) assert got is not None assert got.commit_id == c.commit_id assert got.message == c.message def test_read_snapshot_roundtrip_unaffected(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) s = _snapshot(42) write_snapshot(root, s) got = read_snapshot(root, s.snapshot_id) assert got is not None assert got.snapshot_id == s.snapshot_id def test_snapshot_with_large_manifest_reads_correctly( self, tmp_path: pathlib.Path ) -> None: """A 1000-file snapshot manifest (realistic scale) reads without issue.""" root = _repo(tmp_path) manifest = {f"src/file_{i:05d}.py": fake_id(f"obj-{i}") for i in range(1000)} sid = compute_snapshot_id(manifest) s = SnapshotRecord( snapshot_id=sid, manifest=manifest, ) write_snapshot(root, s) got = read_snapshot(root, sid) assert got is not None assert len(got.manifest) == 1000 def test_commit_with_long_message_reads_correctly( self, tmp_path: pathlib.Path ) -> None: """A commit with a 64 KiB message reads correctly (well within 1 MiB str limit).""" root = _repo(tmp_path) long_msg = "a" * 65536 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) snapshot_id = compute_snapshot_id({}) cid = compute_commit_id( parent_ids=[], snapshot_id=snapshot_id, message=long_msg, committed_at_iso=committed_at.isoformat(), author="tester", ) c = CommitRecord( commit_id=cid, branch="main", snapshot_id=snapshot_id, message=long_msg, committed_at=committed_at, author="tester", parent_commit_id=None, parent2_commit_id=None, ) write_commit(root, c) got = read_commit(root, cid) assert got is not None assert len(got.message) == 65536 # --------------------------------------------------------------------------- # Tier 9 — performance: size check adds < 1 ms per read # --------------------------------------------------------------------------- class TestSizeCheckPerformance: """The stat() check should add negligible overhead to normal reads.""" @pytest.mark.perf def test_stat_check_overhead_under_1ms_per_read( self, tmp_path: pathlib.Path ) -> None: """100 sequential read_commit calls with the size guard active < 100ms total.""" root = _repo(tmp_path) commits = [_commit(i) for i in range(100)] for c in commits: write_commit(root, c) start = time.perf_counter() for c in commits: result = read_commit(root, c.commit_id) assert result is not None elapsed = time.perf_counter() - start assert elapsed < 0.1, ( f"100 read_commit calls took {elapsed:.3f}s — " "size check is adding too much overhead (< 100ms expected)" ) @pytest.mark.perf def test_oversized_rejection_under_1ms(self, tmp_path: pathlib.Path) -> None: """Rejecting an oversized file (via stat) takes < 1ms — no disk I/O.""" root = _repo(tmp_path) c = _commit(200) write_commit(root, c) path = commits_dir(root) / f"{c.commit_id}.msgpack" path.write_bytes(b"\x00" * 101) start = time.perf_counter() with patch("muse.core.io.MAX_MSGPACK_BYTES", 100): for _ in range(1000): read_commit(root, c.commit_id) elapsed = time.perf_counter() - start assert elapsed < 1.0, ( f"1000 oversized-rejection calls took {elapsed:.3f}s (> 1ms each)" )