"""Tests for _InvariantFileCache — the persistent per-file AST analysis cache. Coverage -------- The cache lives at ``.muse/cache/invariants.json`` and maps a file's content hash (SHA-256) to the ``_FileData`` struct produced by a single ``ast.parse`` pass. On a warm cache, ``muse code invariants`` skips every ``ast.parse`` call — O(N×R) → O(1). Tier 1 — Unit In-memory operations: get/put/prune/size/empty/dirty flag. No I/O. Tier 2 — Integration Real filesystem via ``tmp_path``. Verifies the correct on-disk path, save/load round-trip fidelity, dirty-flag lifecycle, and no-op behaviour. Tier 5 — Data integrity Adversarial on-disk state: corrupt bytes, wrong version, missing keys, invalid entries, non-string content hashes. Also verifies atomic write (no ``.tmp`` leftover after a successful save). Tier 6 — Performance Asserts that a warm cache skips ``ast.parse`` entirely (zero calls). The mechanism — I/O patching — is more reliable than wall-clock ratios across CI hardware and proves the exact property we care about. Tier 7 — Security Mode-000 cache file: ``load()`` must return empty and never raise. Deeply nested JSON payload: ``load()`` must not crash or hang. """ from __future__ import annotations import ast import os import pathlib import stat import time from collections.abc import Mapping import pytest from muse.core.paths import muse_dir from muse.plugins.code._invariants import ( _FileData, _InvariantFileCache, _FILE_CACHE_VERSION, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Return a minimal repo root with ``.muse/cache/`` created.""" (muse_dir(tmp_path) / "cache").mkdir(parents=True) return tmp_path def _file_data( imports: list[str] | None = None, fns: list[str] | None = None, classes: list[str] | None = None, has_all: bool = False, complexity: dict[str, int] | None = None, ) -> _FileData: """Build a minimal ``_FileData`` struct for testing.""" return _FileData( raw_module_imports=imports or [], from_module=[], from_name=[], top_level_fns=fns or [], top_level_classes=classes or [], has_all=has_all, complexity=complexity or {}, ) def _cache_path(root: pathlib.Path) -> pathlib.Path: return muse_dir(root) / "cache" / "invariants.json" # --------------------------------------------------------------------------- # Tier 1 — Unit (in-memory, no filesystem) # --------------------------------------------------------------------------- class TestUnit: """In-memory get/put/prune/size/empty operations — no I/O.""" def test_get_miss_returns_none(self) -> None: cache = _InvariantFileCache.empty() assert cache.get("no_such_hash") is None def test_put_then_get_hit(self) -> None: cache = _InvariantFileCache.empty() fd = _file_data(imports=["os"], fns=["main"]) cache.put("abc123", fd) assert cache.get("abc123") == fd def test_put_marks_dirty(self) -> None: cache = _InvariantFileCache.empty() assert not cache._dirty cache.put("id1", _file_data()) assert cache._dirty def test_put_same_key_overwrites(self) -> None: cache = _InvariantFileCache.empty() cache.put("k", _file_data(fns=["old"])) cache.put("k", _file_data(fns=["new"])) assert cache.get("k")["top_level_fns"] == ["new"] assert cache.size == 1 def test_different_keys_independent(self) -> None: cache = _InvariantFileCache.empty() a = _file_data(fns=["alpha"]) b = _file_data(fns=["beta"]) cache.put("k_a", a) cache.put("k_b", b) assert cache.get("k_a") == a assert cache.get("k_b") == b def test_size_starts_zero(self) -> None: assert _InvariantFileCache.empty().size == 0 def test_size_grows_with_put(self) -> None: cache = _InvariantFileCache.empty() cache.put("x", _file_data()) cache.put("y", _file_data()) assert cache.size == 2 def test_prune_removes_stale_sets_dirty(self) -> None: cache = _InvariantFileCache.empty() cache.put("keep", _file_data()) cache.put("drop", _file_data()) cache._dirty = False cache.prune({"keep"}) assert cache.get("keep") is not None assert cache.get("drop") is None assert cache._dirty def test_prune_noop_when_all_live(self) -> None: cache = _InvariantFileCache.empty() cache.put("keep", _file_data()) cache._dirty = False cache.prune({"keep", "other"}) assert not cache._dirty def test_empty_cache_dir_is_none(self) -> None: cache = _InvariantFileCache.empty() assert cache._cache_dir is None def test_empty_save_is_noop(self, tmp_path: pathlib.Path) -> None: cache = _InvariantFileCache.empty() cache.put("id", _file_data()) cache.save() assert not any(tmp_path.rglob("invariants.json")) # --------------------------------------------------------------------------- # Tier 2 — Integration (real filesystem) # --------------------------------------------------------------------------- class TestIntegration: """Real filesystem via ``tmp_path``.""" def test_load_missing_file_returns_empty(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cache = _InvariantFileCache.load(root) assert cache.size == 0 def test_save_creates_file_at_correct_path(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cache = _InvariantFileCache.load(root) cache.put("h1", _file_data(fns=["compute"])) cache.save() assert _cache_path(root).is_file() def test_save_load_round_trip_preserves_all_fields(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) fd = _file_data( imports=["os", "sys"], fns=["compute", "validate"], classes=["MyClass"], has_all=True, complexity={"billing.py::compute": 5}, ) cache = _InvariantFileCache.load(root) cache.put("deadbeef", fd) cache.save() loaded = _InvariantFileCache.load(root) result = loaded.get("deadbeef") assert result is not None assert result["raw_module_imports"] == ["os", "sys"] assert result["top_level_fns"] == ["compute", "validate"] assert result["top_level_classes"] == ["MyClass"] assert result["has_all"] is True assert result["complexity"] == {"billing.py::compute": 5} def test_save_noop_when_not_dirty(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cache = _InvariantFileCache.load(root) cache.save() assert not _cache_path(root).exists() def test_dirty_false_after_successful_save(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cache = _InvariantFileCache.load(root) cache.put("h", _file_data()) cache.save() assert not cache._dirty def test_second_save_does_not_update_mtime(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cache = _InvariantFileCache.load(root) cache.put("h", _file_data()) cache.save() mtime1 = _cache_path(root).stat().st_mtime_ns cache.save() # not dirty — must not touch the file mtime2 = _cache_path(root).stat().st_mtime_ns assert mtime1 == mtime2 def test_load_without_muse_dir_returns_empty(self, tmp_path: pathlib.Path) -> None: # No .muse/ at all — cache_dir is None, returns empty gracefully. cache = _InvariantFileCache.load(tmp_path) assert cache.size == 0 assert cache._cache_dir is None def test_multiple_entries_round_trip(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cache = _InvariantFileCache.load(root) for i in range(10): cache.put(f"hash_{i}", _file_data(fns=[f"fn_{i}"])) cache.save() loaded = _InvariantFileCache.load(root) assert loaded.size == 10 for i in range(10): assert loaded.get(f"hash_{i}")["top_level_fns"] == [f"fn_{i}"] # --------------------------------------------------------------------------- # Tier 5 — Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: """Adversarial on-disk state.""" def test_corrupt_bytes_returns_empty(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _cache_path(root).write_bytes(b"not valid JSON !!!") cache = _InvariantFileCache.load(root) assert cache.size == 0 def test_wrong_version_returns_empty(self, tmp_path: pathlib.Path) -> None: import json as _json root = _make_repo(tmp_path) _cache_path(root).write_bytes( _json.dumps({"version": 999, "entries": {}}).encode() ) cache = _InvariantFileCache.load(root) assert cache.size == 0 def test_missing_entries_key_returns_empty(self, tmp_path: pathlib.Path) -> None: import json as _json root = _make_repo(tmp_path) _cache_path(root).write_bytes( _json.dumps({"version": _FILE_CACHE_VERSION}).encode() ) cache = _InvariantFileCache.load(root) assert cache.size == 0 def test_invalid_entry_skipped_valid_survives(self, tmp_path: pathlib.Path) -> None: import json as _json root = _make_repo(tmp_path) good = { "raw_module_imports": ["os"], "from_module": [], "from_name": [], "top_level_fns": ["run"], "top_level_classes": [], "has_all": False, "complexity": {}, } doc = { "version": _FILE_CACHE_VERSION, "entries": { "good_hash": good, "bad_hash": ["not", "a", "dict"], # non-dict value — entry skipped }, } _cache_path(root).write_bytes(_json.dumps(doc).encode()) cache = _InvariantFileCache.load(root) assert cache.get("good_hash") is not None assert cache.size == 1 def test_non_dict_entry_value_skipped(self, tmp_path: pathlib.Path) -> None: import json as _json root = _make_repo(tmp_path) doc = { "version": _FILE_CACHE_VERSION, "entries": {"bad_hash": "not_a_dict"}, } _cache_path(root).write_bytes(_json.dumps(doc).encode()) cache = _InvariantFileCache.load(root) assert cache.size == 0 def test_no_tmp_file_leftover_after_save(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cache = _InvariantFileCache.load(root) cache.put("h", _file_data()) cache.save() cache_dir = muse_dir(root) / "cache" assert not any(cache_dir.glob("*.tmp")) def test_orphaned_tmp_swept_on_startup(self, tmp_path: pathlib.Path) -> None: """A stale ``.invariants_*.tmp`` left by a crash is removed by the startup sweep.""" from muse.core.repo import _cleanup_muse_dir_temps root = _make_repo(tmp_path) dot_muse = muse_dir(root) orphan = dot_muse / "cache" / ".invariants_abc123.tmp" orphan.write_bytes(b"stale") _cleanup_muse_dir_temps(dot_muse) assert not orphan.exists() def test_old_location_file_is_ignored(self, tmp_path: pathlib.Path) -> None: """A ``code_invariants_cache.json`` at the old ``.muse/`` root is not loaded.""" import json as _json root = _make_repo(tmp_path) old_location = muse_dir(root) / "code_invariants_cache.json" stale = { "version": _FILE_CACHE_VERSION, "entries": { "stale_hash": { "raw_module_imports": ["stale"], "from_module": [], "from_name": [], "top_level_fns": ["stale_fn"], "top_level_classes": [], "has_all": False, "complexity": {}, } }, } old_location.write_bytes(_json.dumps(stale).encode()) cache = _InvariantFileCache.load(root) assert cache.get("stale_hash") is None # --------------------------------------------------------------------------- # Tier 6 — Performance (warm path skips ast.parse) # --------------------------------------------------------------------------- class TestPerformance: """Warm cache must not call ``ast.parse``.""" def test_warm_cache_skips_ast_parse(self, tmp_path: pathlib.Path) -> None: """Pre-populated cache: ``_build_file_data`` must not call ``ast.parse``.""" from unittest.mock import patch from muse.core.object_store import write_object from muse.core.types import blob_id from muse.plugins.code._invariants import _build_file_data root = _make_repo(tmp_path) src = b"def compute(x: int) -> int:\n return x * 2\n" oid = blob_id(src) write_object(root, oid, src) manifest = {"billing.py": oid} # Cold run — populates cache in memory. cold_cache = _InvariantFileCache.load(root) _build_file_data(manifest, root, cold_cache) cold_cache.save() # Warm run — patch ast.parse to detect any call. warm_cache = _InvariantFileCache.load(root) parse_calls: list[str] = [] import ast as _ast original_parse = _ast.parse def counting_parse(source: str | bytes, *args: str | int, **kwargs: str | int) -> "ast.AST": parse_calls.append("called") return original_parse(source, *args, **kwargs) with patch("muse.plugins.code._invariants.ast.parse", counting_parse): _build_file_data(manifest, root, warm_cache) assert parse_calls == [], ( f"ast.parse called {len(parse_calls)} time(s) on warm cache — " "cold run should have populated the cache" ) # --------------------------------------------------------------------------- # Tier 7 — Security # --------------------------------------------------------------------------- class TestSecurity: """Untrusted cache content and unreadable files.""" @pytest.mark.skipif(os.getuid() == 0, reason="root bypasses file permissions") def test_mode_000_file_returns_empty_no_raise(self, tmp_path: pathlib.Path) -> None: """An unreadable cache file must be handled gracefully — never raises.""" root = _make_repo(tmp_path) cache_file = _cache_path(root) import json as _json cache_file.write_bytes( _json.dumps({"version": _FILE_CACHE_VERSION, "entries": {}}).encode() ) cache_file.chmod(0o000) try: cache = _InvariantFileCache.load(root) assert cache.size == 0 finally: cache_file.chmod(0o644) # restore so tmp_path cleanup succeeds def test_deeply_nested_payload_does_not_crash(self, tmp_path: pathlib.Path) -> None: """A pathologically nested JSON structure must not crash or hang. The load code accepts any dict as a ``_FileData`` (filling in defaults for missing fields), so deeply nested dicts won't be *rejected* — but they must not raise an exception or exhaust the stack. """ import json as _json root = _make_repo(tmp_path) nested: str | Mapping[str, object] = "leaf" for _ in range(200): nested = {"k": nested} doc = { "version": _FILE_CACHE_VERSION, "entries": {"bomb": nested}, } _cache_path(root).write_bytes(_json.dumps(doc).encode()) # Must complete without raising — size is 1 (loaded with empty defaults). cache = _InvariantFileCache.load(root) assert isinstance(cache, _InvariantFileCache)