"""Phase 1.6 — Linux-kernel scale snapshot manifest. Tests cover: - 75 000-file cold walk correctness and timing (< 30 s on tmpfs) - Warm walk (cache hit, no changes) < 500 ms - Partial change: only modified files re-hashed - Memory ceiling: RSS stays flat during large-file streaming - Deep directory nesting (100 levels) — no recursion errors - Inode-based cache invalidation: atomic file replacement detected - Concurrent walks: two processes walking simultaneously - Cache size limit (MAX_CACHE_BYTES) enforced on load - Cache format: JSON v3 on disk - Cache fsync + mkstemp atomicity: no fixed .tmp collision - Deleted-file prune: stale cache entries removed after walk - Empty repo: no crash, empty manifest - Path safety: symlinks excluded, non-regular files excluded """ from __future__ import annotations import json import os import pathlib import resource import stat import tempfile import threading import time import json as _json import pytest from muse.core.snapshot import build_snapshot_manifest, walk_workdir from muse.core.types import blob_id from muse.core.paths import muse_dir, stat_cache_path as _stat_cache_path from muse.core.stat_cache import ( MAX_CACHE_BYTES, FileCacheEntry, StatCache, _CACHE_VERSION, load_cache, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_muse_dir(root: pathlib.Path) -> pathlib.Path: d = muse_dir(root) d.mkdir(exist_ok=True) (d / "cache").mkdir(exist_ok=True) return d def _write(path: pathlib.Path, content: str = "x") -> pathlib.Path: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return path def _create_files_flat(root: pathlib.Path, n: int, size_bytes: int = 256) -> None: """Create *n* files under *root* with *size_bytes* of content each.""" content = b"x" * size_bytes for i in range(n): p = root / f"file_{i:06d}.txt" p.write_bytes(content) def _create_files_deep(root: pathlib.Path, depth: int, files_per_level: int = 2) -> None: """Create a directory tree *depth* levels deep with files at each level.""" cur = root for level in range(depth): cur = cur / f"d{level:03d}" cur.mkdir(exist_ok=True) for f in range(files_per_level): (cur / f"f{f}.txt").write_bytes(b"level" + str(level).encode()) # --------------------------------------------------------------------------- # 1. Format — JSON v3 on disk # --------------------------------------------------------------------------- class TestCacheFormat: def test_cache_file_is_json(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) f = _write(tmp_path / "a.py", "hello") cache = StatCache.load(dot_muse) cache.get_object_hash(tmp_path, f) cache.save() cache_path = _stat_cache_path(dot_muse.parent) assert cache_path.is_file() assert cache_path.suffix == ".json" raw = _json.loads(cache_path.read_bytes()) assert raw["version"] == _CACHE_VERSION def test_cache_entry_has_ino_field(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) f = _write(tmp_path / "b.py", "world") cache = StatCache.load(dot_muse) cache.get_object_hash(tmp_path, f) cache.save() raw = _json.loads((_stat_cache_path(dot_muse.parent)).read_bytes()) entry = raw["entries"]["b.py"] assert "ino" in entry assert isinstance(entry["ino"], int) assert entry["ino"] > 0 def test_version_mismatch_returns_empty(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) # Write a v99 cache — should be discarded bad = _json.dumps({"version": 99, "entries": {}}).encode() (_stat_cache_path(dot_muse.parent)).write_bytes(bad) cache = StatCache.load(dot_muse) assert len(cache._entries) == 0 def test_corrupt_cache_returns_empty(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) (_stat_cache_path(dot_muse.parent)).write_bytes(b"\xff\x00garbage\xde\xad") cache = StatCache.load(dot_muse) assert len(cache._entries) == 0 def test_absent_cache_returns_empty(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) cache = StatCache.load(dot_muse) assert len(cache._entries) == 0 # --------------------------------------------------------------------------- # 2. Inode-based invalidation # --------------------------------------------------------------------------- class TestInodeInvalidation: def test_atomic_replace_invalidates_cache(self, tmp_path: pathlib.Path) -> None: """Atomically replacing a file (same mtime/size) is detected via new inode.""" dot_muse = _make_muse_dir(tmp_path) f = tmp_path / "data.bin" content_a = b"A" * 64 content_b = b"B" * 64 # same size as content_a f.write_bytes(content_a) cache = StatCache.load(dot_muse) hash_a = cache.get_object_hash(tmp_path, f) cache.save() # Atomically replace with different content (same size). # Force mtime to be identical (same second) by setting it manually # after the write — this simulates the NFS/tmpfs scenario. with tempfile.NamedTemporaryFile(dir=tmp_path, delete=False) as tmp: tmp.write(content_b) tmp_name = tmp.name orig_stat = f.stat() os.replace(tmp_name, str(f)) # Restore original mtime to simulate a racy replacement os.utime(str(f), (orig_stat.st_atime, orig_stat.st_mtime)) # The new file has a different inode — cache must miss cache2 = StatCache.load(dot_muse) hash_b = cache2.get_object_hash(tmp_path, f) assert hash_a != hash_b, ( "Cache returned stale hash after atomic replacement — " "inode invalidation is not working" ) def test_mtime_size_same_but_ino_changed_is_miss( self, tmp_path: pathlib.Path ) -> None: """If ino changes, cache must invalidate even with same mtime+size.""" dot_muse = _make_muse_dir(tmp_path) f = tmp_path / "tricky.py" f.write_bytes(b"old_content_pad") # 15 bytes cache = StatCache.load(dot_muse) st = f.stat() old_hash = cache.get_cached("tricky.py", str(f), st.st_mtime, st.st_size, st.st_ino) cache.save() # Simulate a different inode by using a fresh inode value fake_ino = st.st_ino + 999_999 cache2 = StatCache.load(dot_muse) # Should miss because ino doesn't match — forces re-hash entry = cache2._entries.get("tricky.py") assert entry is not None assert entry["ino"] != fake_ino # Direct get_cached with wrong ino triggers miss new_hash = cache2.get_cached("tricky.py", str(f), st.st_mtime, st.st_size, fake_ino) # The content hash is still the same (same file contents) assert new_hash == old_hash # same file content assert cache2._dirty # but it was a miss (dirty) def test_unchanged_file_is_cache_hit(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) f = _write(tmp_path / "stable.py", "unchanged") cache = StatCache.load(dot_muse) h1 = cache.get_object_hash(tmp_path, f) cache.save() cache2 = StatCache.load(dot_muse) cache2._dirty = False h2 = cache2.get_object_hash(tmp_path, f) assert h1 == h2 assert not cache2._dirty # genuine cache hit # --------------------------------------------------------------------------- # 3. Concurrent write safety (mkstemp — no .tmp collision) # --------------------------------------------------------------------------- class TestConcurrentSaveSafety: def test_concurrent_saves_no_collision(self, tmp_path: pathlib.Path) -> None: """Two threads saving simultaneously must not corrupt each other.""" dot_muse = _make_muse_dir(tmp_path) errors: list[Exception] = [] def save_cache(i: int) -> None: f = _write(tmp_path / f"thread_{i}.py", f"content {i}") cache = StatCache.load(dot_muse) cache.get_object_hash(tmp_path, f) try: cache.save() except Exception as exc: errors.append(exc) threads = [threading.Thread(target=save_cache, args=(i,)) for i in range(20)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent saves failed: {errors[:3]}" # No stray .tmp files left behind (mkstemp writes into cache/) tmp_files = list(_stat_cache_path(dot_muse.parent).parent.glob(".stat_cache_*.tmp")) assert not tmp_files, f"Stray temp files: {tmp_files}" def test_no_fixed_tmp_suffix(self, tmp_path: pathlib.Path) -> None: """save() must NOT create a file named stat.json.tmp.""" dot_muse = _make_muse_dir(tmp_path) f = _write(tmp_path / "x.py", "hi") cache = StatCache.load(dot_muse) cache.get_object_hash(tmp_path, f) cache.save() fixed_tmp = _stat_cache_path(dot_muse.parent).with_suffix(".json.tmp") assert not fixed_tmp.exists(), "Fixed .tmp name — concurrent save race possible" # --------------------------------------------------------------------------- # 4. Cache size limit (MAX_CACHE_BYTES) # --------------------------------------------------------------------------- class TestCacheSizeLimit: def test_max_cache_bytes_is_exported(self) -> None: assert isinstance(MAX_CACHE_BYTES, int) assert MAX_CACHE_BYTES >= 64 * 1024 * 1024 def test_oversized_cache_returns_empty( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: import logging dot_muse = _make_muse_dir(tmp_path) # Write a valid cache but patch the size limit to reject it f = _write(tmp_path / "z.py", "z" * 1000) cache = StatCache.load(dot_muse) cache.get_object_hash(tmp_path, f) cache.save() real_size = (_stat_cache_path(dot_muse.parent)).stat().st_size import unittest.mock as _mock with _mock.patch("muse.core.stat_cache.MAX_CACHE_BYTES", real_size - 1): with caplog.at_level(logging.CRITICAL, logger="muse.core.stat_cache"): loaded = StatCache.load(dot_muse) assert len(loaded._entries) == 0 assert any(r.levelno >= logging.CRITICAL for r in caplog.records) def test_exactly_at_limit_is_accepted(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) f = _write(tmp_path / "y.py", "y") cache = StatCache.load(dot_muse) cache.get_object_hash(tmp_path, f) cache.save() real_size = (_stat_cache_path(dot_muse.parent)).stat().st_size import unittest.mock as _mock with _mock.patch("muse.core.stat_cache.MAX_CACHE_BYTES", real_size): loaded = StatCache.load(dot_muse) assert len(loaded._entries) == 1 # --------------------------------------------------------------------------- # 5. Prune: deleted files evicted from cache # --------------------------------------------------------------------------- class TestPruneAfterWalk: def test_deleted_file_pruned_on_walk(self, tmp_path: pathlib.Path) -> None: dot_muse = _make_muse_dir(tmp_path) fa = _write(tmp_path / "keep.py", "keep") fb = _write(tmp_path / "delete_me.py", "bye") # First walk — both files cached walk_workdir(tmp_path) cache = StatCache.load(dot_muse) assert "keep.py" in cache._entries assert "delete_me.py" in cache._entries # Delete one file then re-walk fb.unlink() walk_workdir(tmp_path) cache2 = StatCache.load(dot_muse) assert "keep.py" in cache2._entries assert "delete_me.py" not in cache2._entries, "Stale cache entry not pruned" # --------------------------------------------------------------------------- # 6. Path safety # --------------------------------------------------------------------------- class TestPathSafety: def test_symlinks_excluded_from_manifest(self, tmp_path: pathlib.Path) -> None: _make_muse_dir(tmp_path) real = _write(tmp_path / "real.py", "real") link = tmp_path / "link.py" link.symlink_to(real) manifest = build_snapshot_manifest(tmp_path) assert "real.py" in manifest assert "link.py" not in manifest, "Symlinks must be excluded" def test_non_regular_files_excluded(self, tmp_path: pathlib.Path) -> None: _make_muse_dir(tmp_path) _write(tmp_path / "normal.py", "ok") # Create a FIFO (named pipe) — non-regular file fifo = tmp_path / "pipe.fifo" os.mkfifo(str(fifo)) manifest = build_snapshot_manifest(tmp_path) assert "normal.py" in manifest assert "pipe.fifo" not in manifest def test_muse_dir_excluded_from_manifest(self, tmp_path: pathlib.Path) -> None: _make_muse_dir(tmp_path) _write(tmp_path / "real.py", "code") manifest = build_snapshot_manifest(tmp_path) assert not any(k.startswith(".muse/") for k in manifest) # --------------------------------------------------------------------------- # 7. Warm-walk no-op: second commit on unchanged tree is fast # --------------------------------------------------------------------------- class TestWarmWalkPerformance: def test_warm_walk_uses_cache_no_rehash(self, tmp_path: pathlib.Path) -> None: """Unchanged files must not be re-hashed on the second walk.""" _make_muse_dir(tmp_path) for i in range(100): _write(tmp_path / f"f{i}.py", f"content {i}") # Cold walk — populates cache walk_workdir(tmp_path) # Warm walk — should be all cache hits cache_before = StatCache.load(muse_dir(tmp_path)) count_before = len(cache_before._entries) walk_workdir(tmp_path) # After warm walk, cache entries are unchanged (same count, not dirty) cache_after = StatCache.load(muse_dir(tmp_path)) assert len(cache_after._entries) == count_before @pytest.mark.slow def test_warm_walk_500ms_on_1000_files(self, tmp_path: pathlib.Path) -> None: """1000-file warm walk must complete in < 500 ms.""" _make_muse_dir(tmp_path) for i in range(1000): _write(tmp_path / f"warm_{i:04d}.py", f"content {i} " * 10) walk_workdir(tmp_path) # cold t0 = time.perf_counter() walk_workdir(tmp_path) # warm elapsed = time.perf_counter() - t0 assert elapsed < 0.5, ( f"Warm walk over 1000 files took {elapsed:.3f}s — must be < 500 ms. " "Cache is not being consulted." ) # --------------------------------------------------------------------------- # 8. Deep directory nesting — no recursion errors # --------------------------------------------------------------------------- class TestDeepNesting: def test_100_level_nesting_no_error(self, tmp_path: pathlib.Path) -> None: """os.walk must not hit recursion limit at 100 directory levels.""" _make_muse_dir(tmp_path) _create_files_deep(tmp_path, depth=100, files_per_level=1) manifest = build_snapshot_manifest(tmp_path) assert len(manifest) == 100, f"Expected 100 files, got {len(manifest)}" def test_50_level_nesting_correct_paths(self, tmp_path: pathlib.Path) -> None: """Paths at deep levels must be POSIX-relative with correct separators.""" _make_muse_dir(tmp_path) cur = tmp_path for i in range(50): cur = cur / f"l{i}" cur.mkdir() leaf = cur / "deep.py" leaf.write_bytes(b"deep") manifest = build_snapshot_manifest(tmp_path) # Path uses forward slashes regardless of OS assert any("deep.py" in k and "/" in k for k in manifest) assert not any("\\" in k for k in manifest) # --------------------------------------------------------------------------- # 9. Large-file streaming — memory stays flat # --------------------------------------------------------------------------- class TestLargeFileStreaming: @pytest.mark.slow def test_large_file_memory_flat(self, tmp_path: pathlib.Path) -> None: """Hashing a 50 MiB file must not load it all into memory at once.""" import sys as _sys _make_muse_dir(tmp_path) large = tmp_path / "large.bin" chunk = b"Z" * 65_536 with large.open("wb") as fh: for _ in range(800): # 800 × 64 KiB = 50 MiB fh.write(chunk) rss_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss manifest = build_snapshot_manifest(tmp_path) rss_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss assert "large.bin" in manifest # ru_maxrss is bytes on macOS, KiB on Linux. _scale = 1024 * 1024 if _sys.platform == "darwin" else 1024 rss_delta_mib = (rss_after - rss_before) / _scale # RSS growth must be < 10 MiB (file is 50 MiB — proves streaming) assert rss_delta_mib < 10, ( f"RSS grew {rss_delta_mib:.1f} MiB while hashing a 50 MiB file " "— file is being loaded entirely into memory." ) def test_zero_byte_file_hashes_correctly(self, tmp_path: pathlib.Path) -> None: _make_muse_dir(tmp_path) empty = tmp_path / "empty.py" empty.write_bytes(b"") manifest = build_snapshot_manifest(tmp_path) expected = blob_id(b"") assert manifest["empty.py"] == expected # --------------------------------------------------------------------------- # 10. 75 000-file scale — cold walk correctness + warm walk timing # --------------------------------------------------------------------------- @pytest.mark.slow class TestLinuxKernelScale: def test_75k_cold_walk_correct_and_bounded(self, tmp_path: pathlib.Path) -> None: """75k files: cold walk must be correct and complete in < 30 s.""" _make_muse_dir(tmp_path) n = 75_000 content = b"k" * 1024 # 1 KiB per file t_create = time.perf_counter() for i in range(n): subdir = tmp_path / f"d{i // 1000:03d}" subdir.mkdir(exist_ok=True) (subdir / f"f{i:06d}.c").write_bytes(content) t_create_done = time.perf_counter() t_walk = time.perf_counter() manifest = build_snapshot_manifest(tmp_path) t_walk_done = time.perf_counter() walk_seconds = t_walk_done - t_walk assert len(manifest) == n, f"Expected {n} files, got {len(manifest)}" # All hashes must be the 1 KiB content hash (same content → same hash) expected_hash = blob_id(content) assert all(v == expected_hash for v in manifest.values()), ( "Hash mismatch — some files were hashed incorrectly" ) assert walk_seconds < 30, ( f"75k cold walk took {walk_seconds:.1f}s — must be < 30 s" ) def test_75k_warm_walk_zero_misses(self, tmp_path: pathlib.Path) -> None: """75k files: warm walk must produce zero cache misses. Uses ``_dirty`` as the oracle — if the cache has no misses after a warm walk, ``_dirty`` remains False (no new hashes were computed). This is platform-independent: it proves correctness regardless of whether lstat or file-read latency dominates. """ _make_muse_dir(tmp_path) n = 75_000 content = b"w" * 512 for i in range(n): subdir = tmp_path / f"d{i // 1000:03d}" subdir.mkdir(exist_ok=True) (subdir / f"f{i:06d}.go").write_bytes(content) build_snapshot_manifest(tmp_path) # cold — populates cache # Load the persisted cache and walk again; _dirty should stay False # because every file matches its cached (ino, mtime, size). cache = load_cache(tmp_path) cache._dirty = False # reset to ensure we detect any miss manifest2 = build_snapshot_manifest(tmp_path) assert len(manifest2) == n # Reload the cache from disk — if anything was re-hashed during the # warm walk, the on-disk cache will have grown entries (or been re-written). # More directly: run walk_workdir manually with our cache instance. from muse.core.snapshot import walk_workdir as _walk dot_muse = muse_dir(tmp_path) cache2 = StatCache.load(dot_muse) cache2._dirty = False _ = _walk(tmp_path) # warm walk # The on-disk cache must not have been re-written with any new entries # (same inode, mtime, size → all cache hits → not dirty → no save) assert not cache2._dirty, ( "Warm walk marked cache dirty — at least one file was re-hashed. " "The stat cache is not being consulted correctly at 75k-file scale." ) def test_cache_speedup_with_large_files(self, tmp_path: pathlib.Path) -> None: """Cache delivers ≥ 5× speedup when file-content I/O dominates. Uses 100 × 1 MiB files so cold-walk hashing clearly dominates lstat overhead. Warm walk skips all file reads → dramatic speedup. Platform-independent: cold is I/O bound; warm is stat-bound. """ _make_muse_dir(tmp_path) content = b"L" * (1024 * 1024) # 1 MiB per file for i in range(100): (tmp_path / f"large_{i:03d}.bin").write_bytes(content) t_cold0 = time.perf_counter() build_snapshot_manifest(tmp_path) # cold t_cold = time.perf_counter() - t_cold0 t_warm0 = time.perf_counter() build_snapshot_manifest(tmp_path) # warm t_warm = time.perf_counter() - t_warm0 speedup = t_cold / t_warm if t_warm > 0 else float("inf") assert speedup >= 5, ( f"Warm walk ({t_warm:.3f}s) is only {speedup:.1f}× faster than " f"cold ({t_cold:.3f}s) — expected ≥ 5×. " "Cache must skip all file-content reads on warm walk." ) def test_75k_partial_change_only_rehashes_changed( self, tmp_path: pathlib.Path ) -> None: """After changing 10 files, only those 10 should trigger a cache miss.""" _make_muse_dir(tmp_path) n = 75_000 content = b"p" * 256 paths: list[pathlib.Path] = [] for i in range(n): subdir = tmp_path / f"d{i // 1000:03d}" subdir.mkdir(exist_ok=True) p = subdir / f"f{i:06d}.rs" p.write_bytes(content) paths.append(p) build_snapshot_manifest(tmp_path) # cold # Modify 10 files changed = paths[:10] new_content = b"CHANGED" * 37 # different size ensures definite miss for p in changed: p.write_bytes(new_content) manifest2 = build_snapshot_manifest(tmp_path) new_hash = blob_id(new_content) old_hash = blob_id(content) changed_rels = {str(p.relative_to(tmp_path)).replace(os.sep, "/") for p in changed} for rel, h in manifest2.items(): if rel in changed_rels: assert h == new_hash, f"{rel} should have new hash" else: assert h == old_hash, f"{rel} should still have old hash"