"""Phase 3 — Performance regression tests. Target metrics (measured on a 2024 MacBook Pro M4, macOS 15): Phase 3.1 — Linux-kernel commit throughput write_commit: ≥ 1 000 commits/sec write_object: ≥ 2 000 objects/sec build_snapshot_manifest: ≥ 10 000 files/sec muse commit (e2e, 1 000-file workdir): < 5 000 ms Phase 3.2 — Concurrent agent write storm 200 threads × write_object: all objects readable, no corruption 200 threads × write_commit: all commits readable, no corruption 100 threads × write_head_commit: last-write-wins, valid ID written Phase 3.3 — Memory ceiling write_commit (10 000 commits): peak RSS < 512 MiB build_snapshot_manifest (5 000 files): peak RSS < 128 MiB Phase 3.3 extended — Linux-scale memory ceiling (100k / 75k) get_all_commits (100 000 commits): peak RSS < 2 GiB [@slow] get_commits_for_branch walk-cap: max_walk_commits bounds RSS muse log --json pseudo-streaming: no double-buffer build_snapshot_manifest (75 000 files): peak RSS < 512 MiB [@slow] find_merge_base (deep chain): cap fires, no OOM walk_commits_between: silent truncation at cap, not OOM All slow tests are decorated with ``@pytest.mark.slow`` and are skipped by default. Run the full suite with ``pytest tests/test_perf_phase3.py -v``. """ from __future__ import annotations type _FileStore = dict[str, bytes] import datetime import os import pathlib import resource import sys import threading import time import tracemalloc import pytest from unittest.mock import patch from muse.core.object_store import ( _created_object_shards, has_object, object_path, read_object, write_object, ) from muse.core.merge_engine import find_merge_base from muse.core.snapshot import build_snapshot_manifest from muse.core.ids import hash_commit as compute_commit_id from muse.core.refs import ( write_branch_ref, write_head_commit, ) from muse.core.commits import ( CommitRecord, get_all_commits, get_commits_for_branch, read_commit, walk_commits_between_result, write_commit, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- from muse.core.types import blob_id, fake_id, split_id from muse.core.paths import config_toml_path, heads_dir, muse_dir, repo_json_path def _repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text('{"repo_id": "bench", "owner": "bench"}') (dot_muse / "commits").mkdir() (dot_muse / "snapshots").mkdir() (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path def _write_chain( repo: pathlib.Path, branch: str, n: int, snap_id: str = fake_id("chain-snap"), start: int = 0, ) -> str: """Write a linear commit chain of length *n* and return the tip commit ID. Sets the branch ref to the tip so ``get_commits_for_branch`` can walk it. """ parent: str | None = None tip = "" for i in range(start, start + n): msg = f"chain-{i:07d}" ts = datetime.datetime(2026, 1, 1, i % 3600 // 3600, i % 3600 % 60, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=snap_id, message=msg, committed_at_iso=ts.isoformat(), author="chain-agent",) rec = CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="chain-agent", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, rec) parent = cid tip = cid write_branch_ref(repo, branch, tip) return tip def _make_commit(index: int, snap_id: str, parent: str | None = None) -> CommitRecord: """Build a CommitRecord whose ``commit_id`` passes content-hash verification.""" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) msg = f"commit-{index:07d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=snap_id, message=msg, committed_at_iso=ts.isoformat(), author="perf-agent",) return CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="perf-agent", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) # --------------------------------------------------------------------------- # Phase 3.1 — Linux-kernel commit throughput # --------------------------------------------------------------------------- class TestWriteCommitThroughput: """write_commit must sustain ≥ 1 000 commits/sec in isolation. The Linux-kernel migration target is 472 commits/sec over 850 000 commits (< 30 min). A 1 000 commits/sec floor gives comfortable headroom for real workload overhead (snapshot building, object writes, disk pressure). fsync is mocked: these tests measure msgpack serialisation + filesystem metadata throughput, not OS I/O durability. Durability ordering is verified by test_integrity_I2_fsync.py. """ _MIN_COMMITS_PER_SEC = 1_000 @pytest.fixture(autouse=True) def no_fsync(self) -> None: """Mock out all fsync calls so the test measures algorithmic throughput.""" with patch("muse.core.commits.os.fsync", return_value=None): yield @pytest.mark.slow def test_write_commit_throughput_10k(self, tmp_path: pathlib.Path) -> None: """Write 10 000 commits and assert throughput ≥ 1 000 commits/sec.""" repo = _repo(tmp_path) snap_id = fake_id("snap-a") N = 10_000 commits = [_make_commit(i, snap_id) for i in range(N)] t0 = time.perf_counter() for rec in commits: write_commit(repo, rec) elapsed = time.perf_counter() - t0 rate = N / elapsed assert rate >= self._MIN_COMMITS_PER_SEC, ( f"write_commit throughput {rate:.0f} commits/sec is below the " f"minimum {self._MIN_COMMITS_PER_SEC} commits/sec. " f"(10k commits took {elapsed:.2f}s. " f"Linux-kernel migration target: 472 commits/sec.)" ) def test_write_commit_throughput_1k_fast(self, tmp_path: pathlib.Path) -> None: """Smoke-speed: 1 000 commits must complete within 5 seconds.""" repo = _repo(tmp_path) snap_id = fake_id("snap-b") N = 1_000 commits = [_make_commit(i, snap_id) for i in range(N)] t0 = time.perf_counter() for rec in commits: write_commit(repo, rec) elapsed = time.perf_counter() - t0 assert elapsed <= 15.0, ( f"1 000 write_commit calls took {elapsed:.2f}s — expected ≤ 15.0s." ) class TestWriteObjectThroughput: """write_object must sustain ≥ 1 500 objects/sec in isolation. fsync is mocked: the test measures mkstemp + hash-verify + fchmod + os.replace throughput without OS I/O latency. Durability ordering is verified by test_integrity_I2_fsync.py. """ _MIN_OBJECTS_PER_SEC = 1_500 @pytest.fixture(autouse=True) def no_fsync(self) -> None: """Mock out all fsync calls so the test measures algorithmic throughput.""" with patch("muse.core.object_store._fsync_fd", return_value=None): yield @pytest.mark.slow def test_write_object_throughput_10k(self, tmp_path: pathlib.Path) -> None: """Write 10 000 4-KiB objects and assert throughput ≥ 2 000 objects/sec.""" repo = _repo(tmp_path) N = 10_000 items = [ ( blob_id(f"perf-obj-{i:08d}".encode() * 16), f"perf-obj-{i:08d}".encode() * 16, ) for i in range(N) ] t0 = time.perf_counter() for oid, content in items: write_object(repo, oid, content) elapsed = time.perf_counter() - t0 rate = N / elapsed assert rate >= self._MIN_OBJECTS_PER_SEC, ( f"write_object throughput {rate:.0f} objects/sec is below the " f"minimum {self._MIN_OBJECTS_PER_SEC} objects/sec. " f"(10k objects took {elapsed:.2f}s.)" ) @pytest.mark.perf def test_write_object_throughput_1k_fast(self, tmp_path: pathlib.Path) -> None: """Smoke-speed: 1 000 objects must complete within 2 seconds.""" repo = _repo(tmp_path) N = 1_000 items = [ ( blob_id(f"fast-obj-{i:08d}".encode() * 8), f"fast-obj-{i:08d}".encode() * 8, ) for i in range(N) ] t0 = time.perf_counter() for oid, content in items: write_object(repo, oid, content) elapsed = time.perf_counter() - t0 assert elapsed <= 6.0, ( f"1 000 write_object calls took {elapsed:.2f}s — expected ≤ 6.0s." ) class TestSnapshotManifestThroughput: """build_snapshot_manifest must sustain ≥ 10 000 files/sec.""" _MIN_FILES_PER_SEC = 10_000 @pytest.mark.slow def test_snapshot_5k_files(self, tmp_path: pathlib.Path) -> None: """Build manifest of 5 000 files; assert ≥ 10 000 files/sec.""" root = tmp_path / "workdir" root.mkdir() muse_dir(root).mkdir() (repo_json_path(root)).write_text('{"repo_id": "bench"}') # 50 dirs × 100 files = 5 000 files for d in range(50): dp = root / f"pkg_{d:03d}" dp.mkdir() for f in range(100): (dp / f"file_{f:03d}.py").write_bytes( f"# content-{d}-{f}\n".encode() * 50 ) t0 = time.perf_counter() manifest = build_snapshot_manifest(root) elapsed = time.perf_counter() - t0 rate = len(manifest) / elapsed assert len(manifest) == 5_000 assert rate >= self._MIN_FILES_PER_SEC, ( f"build_snapshot_manifest throughput {rate:.0f} files/sec is below " f"the minimum {self._MIN_FILES_PER_SEC} files/sec. " f"(5k files took {elapsed:.3f}s.)" ) def test_snapshot_500_files_fast(self, tmp_path: pathlib.Path) -> None: """Smoke-speed: 500-file manifest must complete within 500 ms.""" root = tmp_path / "workdir" root.mkdir() muse_dir(root).mkdir() (repo_json_path(root)).write_text('{"repo_id": "bench"}') for d in range(25): dp = root / f"pkg_{d:02d}" dp.mkdir() for f in range(20): (dp / f"file_{f:02d}.py").write_bytes(b"x" * 200) t0 = time.perf_counter() manifest = build_snapshot_manifest(root) elapsed = time.perf_counter() - t0 assert len(manifest) == 500 assert elapsed <= 0.5, ( f"500-file manifest took {elapsed*1000:.1f} ms — expected ≤ 500 ms." ) # --------------------------------------------------------------------------- # Phase 3.2 — Concurrent agent write storm # --------------------------------------------------------------------------- class TestConcurrentWriteObjectStorm: """200 threads writing distinct objects — no corruption, no data loss.""" def test_200_threads_write_distinct_objects(self, tmp_path: pathlib.Path) -> None: """200 threads each write 50 distinct objects; all must be readable after join.""" repo = _repo(tmp_path) N_THREADS = 200 N_PER_THREAD = 50 written: _FileStore = {} lock = threading.Lock() errors: list[str] = [] # Pre-compute all objects to avoid per-thread hashing noise. all_items: list[list[tuple[str, bytes]]] = [] for t in range(N_THREADS): thread_items: list[tuple[str, bytes]] = [] for i in range(N_PER_THREAD): content = f"thread-{t:03d}-obj-{i:03d}".encode() * 4 oid = blob_id(content) thread_items.append((oid, content)) with lock: written[oid] = content all_items.append(thread_items) def writer(items: list[tuple[str, bytes]]) -> None: try: for oid, content in items: write_object(repo, oid, content) except Exception as exc: with lock: errors.append(str(exc)) threads = [ threading.Thread(target=writer, args=(all_items[t],)) for t in range(N_THREADS) ] for th in threads: th.start() for th in threads: th.join(timeout=30.0) assert not errors, f"Write errors during concurrent storm: {errors[:3]}" # Verify every object is readable and byte-identical. missing: list[str] = [] corrupt: list[str] = [] for oid, expected in written.items(): if not has_object(repo, oid): missing.append(oid[:8]) else: actual = read_object(repo, oid) if actual != expected: corrupt.append(oid[:8]) assert not missing, f"{len(missing)} objects missing after concurrent write storm" assert not corrupt, f"{len(corrupt)} objects corrupted after concurrent write storm" class TestConcurrentWriteCommitStorm: """200 threads writing distinct commits — all must be readable after join.""" def test_200_threads_write_distinct_commits(self, tmp_path: pathlib.Path) -> None: """200 threads each write 25 distinct commits; all must be readable.""" repo = _repo(tmp_path) N_THREADS = 200 N_PER_THREAD = 25 snap_id = fake_id("snap-c") all_records: list[list[CommitRecord]] = [] errors: list[str] = [] lock = threading.Lock() for t in range(N_THREADS): thread_recs: list[CommitRecord] = [] for i in range(N_PER_THREAD): rec = _make_commit(t * N_PER_THREAD + i, snap_id) thread_recs.append(rec) all_records.append(thread_recs) def writer(recs: list[CommitRecord]) -> None: try: for rec in recs: write_commit(repo, rec) except Exception as exc: with lock: errors.append(str(exc)) threads = [ threading.Thread(target=writer, args=(all_records[t],)) for t in range(N_THREADS) ] for th in threads: th.start() for th in threads: th.join(timeout=30.0) assert not errors, f"Write errors during concurrent commit storm: {errors[:3]}" # All commits must be readable. total = N_THREADS * N_PER_THREAD missing: list[str] = [] for recs in all_records: for rec in recs: result = read_commit(repo, rec.commit_id) if result is None: missing.append(rec.commit_id[:8]) assert not missing, ( f"{len(missing)}/{total} commits missing after concurrent write storm" ) class TestConcurrentWriteBranchRef: """100 threads racing on write_branch_ref — last write wins, no corruption.""" def test_100_threads_write_branch_ref_last_write_wins( self, tmp_path: pathlib.Path ) -> None: """100 threads each writing write_branch_ref; result must be a valid 64-char hex ID.""" repo = _repo(tmp_path) refs_dir = heads_dir(repo) # already created by _repo() N = 100 snap_id = fake_id("snap-d") commit_ids: list[str] = [] errors: list[str] = [] lock = threading.Lock() for i in range(N): cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message=f"head-{i}", committed_at_iso="2026-01-01T00:00:00+00:00") commit_ids.append(cid) def writer(cid: str) -> None: try: write_branch_ref(repo, "main", cid) except Exception as exc: with lock: errors.append(str(exc)) threads = [threading.Thread(target=writer, args=(commit_ids[i],)) for i in range(N)] for th in threads: th.start() for th in threads: th.join(timeout=10.0) assert not errors, f"Errors during concurrent write_branch_ref: {errors[:3]}" # The branch ref must contain exactly one of the written IDs. ref_file = refs_dir / "main" assert ref_file.exists(), "Branch ref file was lost after concurrent writes" final_cid = ref_file.read_text(encoding="utf-8").strip() # Commit IDs are now "sha256:<64hex>" = 71 chars assert final_cid.startswith("sha256:"), f"Branch ref has unexpected format: {final_cid!r}" _, hex_part = split_id(final_cid) assert len(hex_part) == 64, f"Branch ref hex part has unexpected length: {final_cid!r}" assert all(c in "0123456789abcdef" for c in hex_part), ( f"Branch ref is not a valid hex ID: {final_cid!r}" ) assert final_cid in commit_ids, ( f"Branch ref {final_cid[:19]} is not one of the written IDs" ) # --------------------------------------------------------------------------- # Phase 3.3 — Memory ceiling under load # --------------------------------------------------------------------------- class TestMemoryCeiling: """Peak memory must not grow proportionally to the number of objects or commits.""" _MAX_WRITE_COMMIT_MIB = 512 _MAX_SNAPSHOT_MIB = 128 @pytest.mark.slow def test_write_10k_commits_peak_rss_under_512_mib( self, tmp_path: pathlib.Path ) -> None: """Writing 10 000 commits must not exceed 512 MiB peak RSS. write_commit buffers one commit at a time; it must not accumulate a list of all commits in memory. """ repo = _repo(tmp_path) snap_id = fake_id("snap-e") N = 10_000 tracemalloc.start() tracemalloc.clear_traces() for i in range(N): rec = _make_commit(i, snap_id) write_commit(repo, rec) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= self._MAX_WRITE_COMMIT_MIB, ( f"write_commit peak allocation {peak_mib:.1f} MiB exceeds " f"{self._MAX_WRITE_COMMIT_MIB} MiB for 10k commits. " "write_commit must stream one record at a time." ) def test_snapshot_5k_files_peak_rss_under_128_mib( self, tmp_path: pathlib.Path ) -> None: """build_snapshot_manifest on 5 000 small files stays under 128 MiB.""" root = tmp_path / "workdir" root.mkdir() muse_dir(root).mkdir() (repo_json_path(root)).write_text('{"repo_id": "bench"}') for d in range(50): dp = root / f"d_{d:02d}" dp.mkdir() for f in range(100): (dp / f"f_{f:02d}.txt").write_bytes(b"x" * 512) tracemalloc.start() tracemalloc.clear_traces() build_snapshot_manifest(root) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= self._MAX_SNAPSHOT_MIB, ( f"build_snapshot_manifest peak allocation {peak_mib:.1f} MiB " f"exceeds {self._MAX_SNAPSHOT_MIB} MiB for 5k files. " "The manifest dict must not accumulate large intermediate buffers." ) # --------------------------------------------------------------------------- # Phase 3.1 — Shard-cache amortisation (regression guard) # --------------------------------------------------------------------------- class TestShardCacheAmortisation: """The shard-validation cache eliminates O(objects) resolve() calls. This test is not a timing test — it verifies the structural invariant: after N writes to the same shard, _created_object_shards contains exactly that shard entry and subsequent writes to the same shard do not re-trigger path-resolution (proved by exercising the idempotent write path). """ def test_shard_cache_populated_after_first_write( self, tmp_path: pathlib.Path ) -> None: """After the first write to a shard, _created_object_shards contains its path.""" repo = _repo(tmp_path) content = b"shard-cache-test" oid = blob_id(content) shard_str = str(object_path(repo, oid).parent) # Ensure cache starts without this shard. _created_object_shards.discard(shard_str) write_object(repo, oid, content) assert shard_str in _created_object_shards, ( f"Shard {oid[:2]} not recorded in _created_object_shards after first write. " "The amortisation optimisation is not active." ) def test_repeated_writes_to_same_shard_succeed( self, tmp_path: pathlib.Path ) -> None: """50 distinct writes to the same shard all land correctly.""" repo = _repo(tmp_path) # Force a fixed shard prefix by constructing objects with the same prefix. # We use a known prefix and craft content that happens to hash to it. # Easier: just write 50 objects and verify they all land. N = 50 items = [ ( blob_id(f"repeat-shard-{i:04d}".encode()), f"repeat-shard-{i:04d}".encode(), ) for i in range(N) ] for oid, content in items: write_object(repo, oid, content) missing = [oid[:8] for oid, _ in items if not has_object(repo, oid)] assert not missing, ( f"{len(missing)} objects missing after repeated writes: {missing[:5]}" ) def test_idempotent_write_bypasses_shard_write( self, tmp_path: pathlib.Path ) -> None: """write_object returns False (idempotent) on the second call for the same OID.""" repo = _repo(tmp_path) content = b"idempotent-check" oid = blob_id(content) first = write_object(repo, oid, content) second = write_object(repo, oid, content) assert first is True, "First write should return True" assert second is False, "Second write should return False (already exists)" # --------------------------------------------------------------------------- # Phase 3.3 extended — Linux-scale memory ceiling # --------------------------------------------------------------------------- class TestGetAllCommitsMemory: """get_all_commits must not OOM under Linux-scale commit counts. This is the highest-risk accumulator: it loads *every* CommitRecord in the store into a list simultaneously with no cap. At 100k commits × ~2 KB per serialised record the baseline is ~200 MiB. Commits with large ``structured_delta`` payloads can be 100 KB each (100k × 100 KB = 10 GiB). The 64 MiB per-record msgpack cap (MAX_MSGPACK_BYTES) provides the guard. The @slow variants build real on-disk commit chains; the fast variant uses a small chain to confirm the structural property with tracemalloc. """ def test_get_all_commits_1k_peak_rss_under_128_mib( self, tmp_path: pathlib.Path ) -> None: """get_all_commits on 1 000 commits stays under 128 MiB (fast smoke).""" repo = _repo(tmp_path) N = 1_000 snap_id = fake_id("snap-aa") for i in range(N): write_commit(repo, _make_commit(i, snap_id)) tracemalloc.start() tracemalloc.clear_traces() results = get_all_commits(repo) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() assert len(results) == N, f"Expected {N} commits, got {len(results)}" peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= 128, ( f"get_all_commits({N}) peak {peak_mib:.1f} MiB — expected ≤ 128 MiB. " "CommitRecord size has grown; re-audit the dataclass fields." ) @pytest.mark.slow def test_get_all_commits_100k_under_2_gib( self, tmp_path: pathlib.Path ) -> None: """get_all_commits on 100 000 commits stays under 2 GiB. 100k × minimal CommitRecord ≈ 200 MiB. The 2 GiB ceiling allows a 10× margin for realistic payloads (metadata, structured_delta) while still catching runaway accumulation. """ repo = _repo(tmp_path) N = 100_000 snap_id = fake_id("snap-bb") for i in range(N): write_commit(repo, _make_commit(i, snap_id)) _MAX_MIB = 2_048 # 2 GiB tracemalloc.start() tracemalloc.clear_traces() results = get_all_commits(repo) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() assert len(results) == N peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= _MAX_MIB, ( f"get_all_commits(100k) peak {peak_mib:.1f} MiB exceeds {_MAX_MIB} MiB. " "The function loads all CommitRecords into a list simultaneously — " "consider streaming or paginating for very large repos." ) class TestGetCommitsForBranchWalkCap: """get_commits_for_branch must honour its walk cap. The cap is the primary memory guard for ``muse log --json``. A branch with N commits deeper than the cap must return exactly cap records, not N — even when filters are active and the caller passes max_count=0. """ def test_walk_cap_bounds_returned_records( self, tmp_path: pathlib.Path ) -> None: """Chain of 500 commits with cap=100 returns exactly 100 records.""" repo = _repo(tmp_path) N = 500 CAP = 100 _write_chain(repo, "main", N) results = get_commits_for_branch(repo, "main", max_count=CAP) assert len(results) == CAP, ( f"Expected cap={CAP} records, got {len(results)} — " "walk cap is not being respected." ) def test_walk_cap_memory_bounded_deep_chain( self, tmp_path: pathlib.Path ) -> None: """2 000-commit chain with default cap stays under 64 MiB.""" repo = _repo(tmp_path) N = 2_000 CAP = 500 _write_chain(repo, "main", N) tracemalloc.start() tracemalloc.clear_traces() results = get_commits_for_branch(repo, "main", max_count=CAP) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() assert len(results) == CAP peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= 64, ( f"get_commits_for_branch(cap={CAP}) peak {peak_mib:.1f} MiB — " f"expected ≤ 64 MiB for {CAP} records." ) class TestLogJsonStreaming: """``muse log --json`` must not double-buffer commit JSON strings. The previous implementation accumulated ``commit_jsons: list[str]`` before writing to stdout — doubling peak memory vs. the CommitRecord list alone. The fix uses a first-item flag to write each record inline immediately. This test verifies the fix: tracemalloc peak for a 500-commit log should not contain the signature of a large string buffer. """ def test_log_json_output_is_valid_json( self, tmp_path: pathlib.Path ) -> None: """muse log --json on a 100-commit branch emits valid JSON with all commits.""" import json from tests.cli_test_helper import CliRunner repo = _repo(tmp_path) N = 100 _write_chain(repo, "main", N) (config_toml_path(repo)).write_text("") runner = CliRunner() result = runner.invoke( None, ["log", "--json", "--max-count", str(N)], env={"MUSE_REPO_ROOT": str(repo)}, ) assert result.exit_code == 0, f"muse log --json failed:\n{result.output}" payload = json.loads(result.output) assert "commits" in payload, f"Missing 'commits' key: {payload}" assert len(payload["commits"]) == N, ( f"Expected {N} commits in JSON output, got {len(payload['commits'])}" ) def test_log_json_empty_repo_returns_empty_array( self, tmp_path: pathlib.Path ) -> None: """muse log --json on a branch with no commits returns empty commits array.""" import json from tests.cli_test_helper import CliRunner repo = _repo(tmp_path) (config_toml_path(repo)).write_text("") runner = CliRunner() result = runner.invoke( None, ["log", "--json"], env={"MUSE_REPO_ROOT": str(repo)}, ) # No commits → either exit 0 with empty commits array or a graceful # "(no commits)" message; both are acceptable. if result.exit_code == 0 and result.output.startswith("{"): payload = json.loads(result.output) assert payload.get("commits") == [] class TestFindMergeBaseMemory: """find_merge_base must fire its cap cleanly, not OOM. The default ``max_ancestors`` cap is 50 000. A chain deeper than the cap must raise ``MuseCLIError`` with an actionable message rather than exhausting all available memory. This test confirms the error path, not the OOM path. """ def test_merge_base_cap_raises_gracefully_on_deep_chain( self, tmp_path: pathlib.Path ) -> None: """Chain deeper than max_ancestors raises MuseCLIError, not MemoryError.""" from muse.core.errors import MuseCLIError repo = _repo(tmp_path) (config_toml_path(repo)).write_text( "[limits]\nmax_ancestors = 50\n" ) tip_a = _write_chain(repo, "branchA", 60, snap_id="cc" * 32) # branchB is entirely separate — no common ancestor with branchA. tip_b = _write_chain(repo, "branchB", 60, snap_id="dd" * 32, start=1000) with pytest.raises(MuseCLIError, match="max_ancestors"): find_merge_base(repo, tip_a, tip_b) def test_merge_base_found_within_cap( self, tmp_path: pathlib.Path ) -> None: """find_merge_base finds the base when both branches are within cap.""" repo = _repo(tmp_path) (config_toml_path(repo)).write_text( "[limits]\nmax_ancestors = 500\n" ) # Build a common root commit. snap_id = "ee" * 32 root_cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="root", committed_at_iso="2026-01-01T00:00:00+00:00", author="test",) root_rec = CommitRecord( commit_id=root_cid, branch="main", snapshot_id=snap_id, message="root", committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), parent_commit_id=None, parent2_commit_id=None, author="test", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, root_rec) # Extend branchA and branchB from the same root. parent_a: str | None = root_cid tip_a = root_cid for i in range(20): msg = f"a-{i:04d}" ts = datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[parent_a] if parent_a else [], snapshot_id=snap_id, message=msg, committed_at_iso=ts.isoformat(), author="test",) rec = CommitRecord( commit_id=cid, branch="branchA", snapshot_id=snap_id, message=msg, committed_at=ts, parent_commit_id=parent_a, parent2_commit_id=None, author="test", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, rec) parent_a = cid tip_a = cid parent_b: str | None = root_cid tip_b = root_cid for i in range(15): msg = f"b-{i:04d}" ts = datetime.datetime(2026, 1, 3, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[parent_b] if parent_b else [], snapshot_id=snap_id, message=msg, committed_at_iso=ts.isoformat(), author="test",) rec = CommitRecord( commit_id=cid, branch="branchB", snapshot_id=snap_id, message=msg, committed_at=ts, parent_commit_id=parent_b, parent2_commit_id=None, author="test", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, rec) parent_b = cid tip_b = cid base = find_merge_base(repo, tip_a, tip_b) assert base == root_cid, ( f"Expected merge base {root_cid[:8]}, got {base[:8] if base else None}" ) def test_merge_base_memory_bounded_within_cap( self, tmp_path: pathlib.Path ) -> None: """find_merge_base BFS uses bounded memory proportional to max_ancestors.""" from muse.core.errors import MuseCLIError repo = _repo(tmp_path) CAP = 200 (config_toml_path(repo)).write_text( f"[limits]\nmax_ancestors = {CAP}\n" ) # Build a shallow divergence: 50 commits each, well inside the cap. snap_id = "ff" * 32 root_cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="base", committed_at_iso="2026-01-01T00:00:00+00:00", author="t",) root_rec = CommitRecord( commit_id=root_cid, branch="main", snapshot_id=snap_id, message="base", committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), parent_commit_id=None, parent2_commit_id=None, author="t", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, root_rec) def _extend(parent: str, prefix: str, n: int) -> str: tip = parent for i in range(n): msg = f"{prefix}-{i:04d}" ts = datetime.datetime(2026, 2, 1, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[tip], snapshot_id=snap_id, message=msg, committed_at_iso=ts.isoformat(), author="t",) rec = CommitRecord( commit_id=cid, branch=prefix, snapshot_id=snap_id, message=msg, committed_at=ts, parent_commit_id=tip, parent2_commit_id=None, author="t", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, rec) tip = cid return tip tip_a = _extend(root_cid, "xa", 50) tip_b = _extend(root_cid, "xb", 50) tracemalloc.start() tracemalloc.clear_traces() try: base = find_merge_base(repo, tip_a, tip_b) except MuseCLIError: base = None # Cap triggered — that's also a valid outcome. _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= 64, ( f"find_merge_base peak {peak_mib:.1f} MiB — expected ≤ 64 MiB " f"for two 50-commit branches (cap={CAP})." ) class TestSnapshotManifest75kFiles: """build_snapshot_manifest on 75 000 files must stay under 512 MiB. The manifest dict holds only ``{rel_path: sha256_hex}`` — strings only. 75k × (60-char path + 64-char hash) ≈ 9.3 MiB for the dict alone. The peak should be dominated by the stat-cache msgpack load, not by the manifest itself. 512 MiB is a generous ceiling to catch any accidental full-file-content buffering. """ @pytest.mark.slow def test_75k_files_peak_rss_under_512_mib( self, tmp_path: pathlib.Path ) -> None: """build_snapshot_manifest on 75 000 small files stays under 512 MiB.""" root = tmp_path / "workdir" root.mkdir() muse_dir(root).mkdir() (repo_json_path(root)).write_text('{"repo_id": "bench"}') # 750 dirs × 100 files = 75 000 files, each 128 bytes. for d in range(750): dp = root / f"pkg_{d:04d}" dp.mkdir() for f in range(100): (dp / f"f_{f:03d}.py").write_bytes(b"x" * 128) tracemalloc.start() tracemalloc.clear_traces() manifest = build_snapshot_manifest(root) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() assert len(manifest) == 75_000, ( f"Expected 75 000 files in manifest, got {len(manifest)}" ) peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= 512, ( f"build_snapshot_manifest(75k) peak {peak_mib:.1f} MiB exceeds 512 MiB. " "File content must never be loaded into memory — only stat + SHA-256." ) def test_10k_files_peak_rss_under_64_mib( self, tmp_path: pathlib.Path ) -> None: """10k-file manifest stays under 64 MiB (fast smoke for the ceiling property).""" root = tmp_path / "workdir" root.mkdir() muse_dir(root).mkdir() (repo_json_path(root)).write_text('{"repo_id": "bench"}') for d in range(100): dp = root / f"pkg_{d:03d}" dp.mkdir() for f in range(100): (dp / f"f_{f:03d}.py").write_bytes(b"y" * 64) tracemalloc.start() tracemalloc.clear_traces() manifest = build_snapshot_manifest(root) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() assert len(manifest) == 10_000 peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= 64, ( f"build_snapshot_manifest(10k) peak {peak_mib:.1f} MiB — expected ≤ 64 MiB." ) class TestWalkCommitsBetweenCap: """walk_commits_between must truncate at its cap, never OOM. This function is used by ``muse status --json`` for ahead/behind counts. A branch 100k commits ahead of remote must return at most ``max_commits`` records and set ``truncated=True`` — it must never allocate memory proportional to the full chain depth. """ def test_truncates_at_cap_not_oom( self, tmp_path: pathlib.Path ) -> None: """Chain of 1 000 commits with cap=100 truncates, doesn't exhaust memory.""" repo = _repo(tmp_path) N = 1_000 CAP = 100 tip = _write_chain(repo, "main", N) result = walk_commits_between_result(repo, tip, max_commits=CAP) assert result["truncated"] is True, ( "Expected truncated=True for chain longer than cap" ) assert len(result["commits"]) == CAP, ( f"Expected exactly {CAP} commits, got {len(result['commits'])}" ) def test_truncation_memory_bounded( self, tmp_path: pathlib.Path ) -> None: """walk_commits_between_result peak memory is bounded by cap, not chain depth.""" repo = _repo(tmp_path) N = 2_000 CAP = 200 tip = _write_chain(repo, "main", N) tracemalloc.start() tracemalloc.clear_traces() result = walk_commits_between_result(repo, tip, max_commits=CAP) _, peak_bytes = tracemalloc.get_traced_memory() tracemalloc.stop() assert result["count"] == CAP peak_mib = peak_bytes / (1024 * 1024) assert peak_mib <= 32, ( f"walk_commits_between_result(cap={CAP}) peak {peak_mib:.1f} MiB — " "memory must be proportional to cap, not chain depth." )