"""Tests for ``muse code index`` (status / rebuild / purge). Coverage layers --------------- Unit _build_symbol_history — empty repo, single commit with ops, manifest cache (blob fetched once per obj_id), missing manifest logged+skipped, no-op commits skipped, SymbolCache consulted before read_object, SymbolCache populated on miss. _build_hash_occurrence — HEAD present, no HEAD (graceful empty), missing manifest logged+empty, imports excluded, trivial (size-1) entries excluded. index_info — present, absent, corrupt states; entries is int. purge_index — deletes existing file, returns False when absent, raises ValueError for unknown name. Integration (live repo, CliRunner) status: exit-0, JSON keys + types, absent text, present text. rebuild: exit-0, JSON schema, all counts, text output. rebuild --dry-run: no files written, JSON dry_run=true, counts correct. rebuild --index symbol_history: only that index rebuilt. rebuild --index hash_occurrence: only that index rebuilt. purge: exit-0, JSON schema, present deleted, absent skipped. purge --index : only named index deleted. Invalid --index rejected by argparse (exit non-zero). Missing repo exits non-zero. E2E (real symbol changes across commits) After rebuild, status shows both indexes as present with non-zero entries. symbol_history entries reflect commit history (insert recorded). hash_occurrence clusters > 0 when duplicate bodies exist. Rebuild is idempotent: two consecutive rebuilds yield identical JSON. Dry-run counts match a subsequent real rebuild. Purge then status shows absent; rebuild restores present. Purge --index only removes targeted index. Stress 50-commit repo: rebuild completes, all symbol_history addresses > 0. Manifest cache: blob fetched at most once per unique obj_id during rebuild. Large flat file (200 functions): hash_occurrence correct after rebuild. """ from __future__ import annotations type _CountMap = dict[str, int] import json import pathlib import textwrap import time from typing import TypedDict from unittest import mock import pytest from tests.cli_test_helper import CliRunner from muse.cli.commands.index_rebuild import _build_hash_occurrence, _build_symbol_history from muse.core.indices import ( KNOWN_INDEX_NAMES, IndexInfoEntry, SymbolHistoryEntry, index_info, purge_index, ) from muse.core.symbol_cache import SymbolCache from muse.core.paths import indices_dir # --------------------------------------------------------------------------- # Runner # --------------------------------------------------------------------------- runner = CliRunner() cli = None # CliRunner always targets muse.cli.app.main # --------------------------------------------------------------------------- # TypedDicts for JSON schema validation # --------------------------------------------------------------------------- class _StatusEntry(TypedDict): name: str status: str entries: int updated_at: str | None class _RebuildPayload(TypedDict, total=False): schema_version: str dry_run: bool rebuilt: list[str] symbol_history_addresses: int symbol_history_events: int hash_occurrence_clusters: int hash_occurrence_addresses: int class _PurgePayload(TypedDict): schema_version: str purged: list[str] skipped: list[str] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _index_path(root: pathlib.Path, name: str) -> pathlib.Path: return indices_dir(root) / f"{name}.json" def _index_exists(root: pathlib.Path, name: str) -> bool: return _index_path(root, name).exists() def _invoke_rebuild_json(extra: list[str] | None = None) -> _RebuildPayload: args = ["code", "index", "rebuild", "--json"] + (extra or []) result = runner.invoke(cli, args) assert result.exit_code == 0, result.output out: _RebuildPayload = json.loads(result.output) return out def _invoke_status_json() -> list[_StatusEntry]: result = runner.invoke(cli, ["code", "index", "status", "--json"]) assert result.exit_code == 0, result.output payload = json.loads(result.output) out: list[_StatusEntry] = payload["indexes"] return out # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["init", "--domain", "code"]) assert result.exit_code == 0, result.output return tmp_path @pytest.fixture def two_commit_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with two commits: v1 has one function, v2 replaces it.""" (repo / "billing.py").write_text(textwrap.dedent("""\ def compute(items): return sum(items) """)) runner.invoke(cli, ["code", "add", "."]) r1 = runner.invoke(cli, ["commit", "-m", "v1"]) assert r1.exit_code == 0, r1.output (repo / "billing.py").write_text(textwrap.dedent("""\ def compute(items): return sum(items) * 2 """)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "v2"]) assert r2.exit_code == 0, r2.output return repo @pytest.fixture def clone_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with two files containing identical body → one hash_occurrence cluster.""" body = "def helper():\n return True\n" (repo / "a.py").write_text(body) (repo / "b.py").write_text(f"{body}\ndef other():\n pass\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "clones"]) return repo # --------------------------------------------------------------------------- # Unit — _build_symbol_history # --------------------------------------------------------------------------- class TestBuildSymbolHistory: def test_empty_repo_returns_empty(self, repo: pathlib.Path) -> None: idx = _build_symbol_history(repo) assert isinstance(idx, dict) assert len(idx) == 0 def test_after_commit_has_entries(self, two_commit_repo: pathlib.Path) -> None: idx = _build_symbol_history(two_commit_repo) assert len(idx) > 0 def test_address_contains_double_colon(self, two_commit_repo: pathlib.Path) -> None: idx = _build_symbol_history(two_commit_repo) assert all("::" in addr for addr in idx) def test_entries_are_symbol_history_entry(self, two_commit_repo: pathlib.Path) -> None: idx = _build_symbol_history(two_commit_repo) for entries in idx.values(): for e in entries: assert isinstance(e, SymbolHistoryEntry) def test_missing_manifest_skipped_with_log( self, two_commit_repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """Commits with missing snapshot manifests are logged and skipped.""" import logging with caplog.at_level(logging.DEBUG, logger="muse.cli.commands.index_rebuild"): with mock.patch( "muse.cli.commands.index_rebuild.get_commit_snapshot_manifest", return_value=None, ): idx = _build_symbol_history(two_commit_repo) # All commits skipped → empty index assert len(idx) == 0 assert any("Missing snapshot manifest" in r.message for r in caplog.records) def test_manifest_cache_prevents_double_fetch(self, two_commit_repo: pathlib.Path) -> None: """Each unique manifest (commit) is fetched at most once.""" original = __import__( "muse.core.snapshots", fromlist=["get_commit_snapshot_manifest"] ).get_commit_snapshot_manifest call_counts: _CountMap = {} def counting_fetch(root: pathlib.Path, commit_id: str) -> Manifest | None: call_counts[commit_id] = call_counts.get(commit_id, 0) + 1 result: Manifest | None = original(root, commit_id) return result with mock.patch( "muse.cli.commands.index_rebuild.get_commit_snapshot_manifest", side_effect=counting_fetch, ): _build_symbol_history(two_commit_repo) for commit_id, count in call_counts.items(): assert count == 1, ( f"Manifest for commit {commit_id[:8]} fetched {count} times — expected 1" ) def test_blob_cache_prevents_double_parse(self, two_commit_repo: pathlib.Path) -> None: """Each unique blob (obj_id) is read at most once within a single run.""" original_read = __import__( "muse.core.object_store", fromlist=["read_object"] ).read_object obj_fetch_count: _CountMap = {} def counting_read(root: pathlib.Path, obj_id: str) -> bytes | None: obj_fetch_count[obj_id] = obj_fetch_count.get(obj_id, 0) + 1 result: bytes | None = original_read(root, obj_id) return result with mock.patch( "muse.cli.commands.index_rebuild.read_object", side_effect=counting_read, ): _build_symbol_history(two_commit_repo) duplicates = {oid: n for oid, n in obj_fetch_count.items() if n > 1} assert not duplicates, ( f"Blobs fetched more than once: {duplicates} — blob_cache not working" ) def test_symbol_cache_consulted_before_read_object( self, two_commit_repo: pathlib.Path ) -> None: """When SymbolCache has a hit, read_object is never called for that obj_id.""" from muse.core.object_store import read_object as real_read from muse.core.snapshots import get_commit_snapshot_manifest # Pre-populate a SymbolCache with every blob in every commit's manifest. warm_cache = SymbolCache.empty() from muse.core.commits import get_all_commits from muse.plugins.code.ast_parser import parse_symbols as real_parse from muse.plugins.code._query import is_semantic for commit in get_all_commits(two_commit_repo): manifest = get_commit_snapshot_manifest(two_commit_repo, commit.commit_id) or {} for fp, oid in manifest.items(): if is_semantic(fp) and warm_cache.get(oid) is None: raw = real_read(two_commit_repo, oid) if raw is not None: warm_cache.put(oid, real_parse(raw, fp)) read_calls: list[str] = [] def spy_read(root: pathlib.Path, obj_id: str) -> bytes | None: read_calls.append(obj_id) result: bytes | None = real_read(root, obj_id) return result with mock.patch("muse.cli.commands.index_rebuild.read_object", side_effect=spy_read): _build_symbol_history(two_commit_repo, symbol_cache=warm_cache) assert read_calls == [], ( f"read_object called {len(read_calls)} times despite warm SymbolCache" ) def test_symbol_cache_populated_on_miss(self, two_commit_repo: pathlib.Path) -> None: """A cold SymbolCache is populated during _build_symbol_history.""" cold_cache = SymbolCache.empty() assert cold_cache.size == 0 _build_symbol_history(two_commit_repo, symbol_cache=cold_cache) # Cache should have been populated with at least one entry. assert cold_cache.size > 0 # --------------------------------------------------------------------------- # Unit — _build_hash_occurrence # --------------------------------------------------------------------------- class TestBuildHashOccurrence: def test_no_head_returns_empty(self, repo: pathlib.Path) -> None: """No commits → no HEAD ref → gracefully returns empty dict.""" idx = _build_hash_occurrence(repo) assert idx == {} def test_single_function_not_a_clone(self, repo: pathlib.Path) -> None: (repo / "solo.py").write_text("def unique():\n return 42\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "solo"]) idx = _build_hash_occurrence(repo) # unique function appears only once → filtered out assert all(len(addrs) > 1 for addrs in idx.values()) def test_identical_bodies_form_cluster(self, clone_repo: pathlib.Path) -> None: idx = _build_hash_occurrence(clone_repo) assert len(idx) > 0 # every cluster has ≥ 2 members assert all(len(addrs) >= 2 for addrs in idx.values()) def test_imports_excluded(self, repo: pathlib.Path) -> None: (repo / "mod.py").write_text("import os\nimport sys\ndef fn():\n return 1\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "imports"]) idx = _build_hash_occurrence(repo) for addrs in idx.values(): for addr in addrs: assert "::import::" not in addr def test_missing_manifest_returns_empty(self, two_commit_repo: pathlib.Path) -> None: with mock.patch( "muse.cli.commands.index_rebuild.get_commit_snapshot_manifest", return_value=None, ): idx = _build_hash_occurrence(two_commit_repo) assert idx == {} # --------------------------------------------------------------------------- # Unit — index_info and purge_index # --------------------------------------------------------------------------- class TestIndexInfo: def test_absent_before_rebuild(self, repo: pathlib.Path) -> None: infos = index_info(repo) assert len(infos) == len(KNOWN_INDEX_NAMES) for info in infos: assert info["status"] == "absent" def test_entries_is_int_not_str(self, repo: pathlib.Path) -> None: infos = index_info(repo) for info in infos: assert isinstance(info["entries"], int), ( f"{info['name']}.entries should be int, got {type(info['entries'])}" ) def test_present_after_rebuild(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) infos = index_info(two_commit_repo) for info in infos: assert info["status"] == "present" def test_corrupt_index_reported(self, repo: pathlib.Path) -> None: (indices_dir(repo)).mkdir(parents=True, exist_ok=True) _index_path(repo, "symbol_history").write_bytes(b"\xff\xfe corrupt garbage") infos = index_info(repo) sym = next(i for i in infos if i["name"] == "symbol_history") assert sym["status"] == "corrupt" def test_updated_at_is_none_when_absent(self, repo: pathlib.Path) -> None: infos = index_info(repo) for info in infos: assert info["updated_at"] is None class TestPurgeIndex: def test_purge_existing_returns_true(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) assert _index_exists(two_commit_repo, "symbol_history") result = purge_index(two_commit_repo, "symbol_history") assert result is True assert not _index_exists(two_commit_repo, "symbol_history") def test_purge_absent_returns_false(self, repo: pathlib.Path) -> None: result = purge_index(repo, "hash_occurrence") assert result is False def test_purge_unknown_name_raises(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError, match="Unknown index name"): purge_index(repo, "nonexistent_index") # --------------------------------------------------------------------------- # Integration — CLI runner tests # --------------------------------------------------------------------------- class TestIndexStatusCLI: def test_exit_zero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status"]) assert result.exit_code == 0 def test_json_is_list(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "--json"]) assert result.exit_code == 0 payload = json.loads(result.output) data = payload["indexes"] assert isinstance(data, list) assert len(data) == len(KNOWN_INDEX_NAMES) def test_json_entry_keys(self, repo: pathlib.Path) -> None: data = _invoke_status_json() for entry in data: for key in ("name", "status", "entries", "updated_at"): assert key in entry, f"Missing key {key!r} in status entry" def test_json_entries_is_int(self, repo: pathlib.Path) -> None: data = _invoke_status_json() for entry in data: assert isinstance(entry["entries"], int) def test_json_absent_status_before_rebuild(self, repo: pathlib.Path) -> None: data = _invoke_status_json() assert all(e["status"] == "absent" for e in data) def test_text_contains_hint_to_rebuild(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status"]) assert "muse code index rebuild" in result.output def test_missing_repo_exits_nonzero(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "index", "status"]) assert result.exit_code != 0 class TestIndexRebuildCLI: def test_exit_zero(self, two_commit_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild"]) assert result.exit_code == 0 def test_json_top_level_keys(self, two_commit_repo: pathlib.Path) -> None: data = _invoke_rebuild_json() for key in ("schema", "dry_run", "rebuilt", "symbol_history_addresses", "symbol_history_events", "hash_occurrence_clusters", "hash_occurrence_addresses"): assert key in data, f"Missing key {key!r}" def test_json_dry_run_false_by_default(self, two_commit_repo: pathlib.Path) -> None: data = _invoke_rebuild_json() assert data["dry_run"] is False def test_json_rebuilt_contains_both(self, two_commit_repo: pathlib.Path) -> None: data = _invoke_rebuild_json() assert set(data["rebuilt"]) == set(KNOWN_INDEX_NAMES) def test_rebuild_writes_files(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) assert _index_exists(two_commit_repo, "symbol_history") assert _index_exists(two_commit_repo, "hash_occurrence") def test_dry_run_no_files_written(self, two_commit_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--dry-run"]) assert result.exit_code == 0 assert not _index_exists(two_commit_repo, "symbol_history") assert not _index_exists(two_commit_repo, "hash_occurrence") def test_dry_run_json_flag(self, two_commit_repo: pathlib.Path) -> None: data = _invoke_rebuild_json(["--dry-run"]) assert data["dry_run"] is True def test_dry_run_counts_match_real_rebuild(self, two_commit_repo: pathlib.Path) -> None: dry = _invoke_rebuild_json(["--dry-run"]) real = _invoke_rebuild_json() assert dry["symbol_history_addresses"] == real["symbol_history_addresses"] assert dry["symbol_history_events"] == real["symbol_history_events"] assert dry["hash_occurrence_clusters"] == real["hash_occurrence_clusters"] def test_index_symbol_history_only(self, two_commit_repo: pathlib.Path) -> None: data = _invoke_rebuild_json(["--index", "symbol_history"]) assert data["rebuilt"] == ["symbol_history"] assert _index_exists(two_commit_repo, "symbol_history") assert not _index_exists(two_commit_repo, "hash_occurrence") def test_index_hash_occurrence_only(self, two_commit_repo: pathlib.Path) -> None: data = _invoke_rebuild_json(["--index", "hash_occurrence"]) assert data["rebuilt"] == ["hash_occurrence"] assert not _index_exists(two_commit_repo, "symbol_history") assert _index_exists(two_commit_repo, "hash_occurrence") def test_text_output_no_files_on_dry_run(self, two_commit_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--dry-run"]) assert "dry run" in result.output.lower() def test_text_output_rebuild_references_status(self, two_commit_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild"]) assert "muse code index status" in result.output def test_missing_repo_exits_nonzero(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "index", "rebuild"]) assert result.exit_code != 0 class TestIndexPurgeCLI: def test_exit_zero(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "purge"]) assert result.exit_code == 0 def test_json_schema(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "purge", "--json"]) assert result.exit_code == 0 data: _PurgePayload = json.loads(result.output) assert "schema" in data assert "purged" in data assert "skipped" in data def test_purge_all_deletes_files(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) runner.invoke(cli, ["code", "index", "purge"]) assert not _index_exists(two_commit_repo, "symbol_history") assert not _index_exists(two_commit_repo, "hash_occurrence") def test_purge_specific_index(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke( cli, ["code", "index", "purge", "--index", "symbol_history", "--json"] ) data: _PurgePayload = json.loads(result.output) assert "symbol_history" in data["purged"] assert not _index_exists(two_commit_repo, "symbol_history") assert _index_exists(two_commit_repo, "hash_occurrence") def test_purge_absent_shows_skipped(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "--json"]) data: _PurgePayload = json.loads(result.output) assert data["purged"] == [] assert set(data["skipped"]) == set(KNOWN_INDEX_NAMES) def test_purge_then_status_absent(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) runner.invoke(cli, ["code", "index", "purge"]) data = _invoke_status_json() assert all(e["status"] == "absent" for e in data) def test_missing_repo_exits_nonzero(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "index", "purge"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # E2E — real commit history interactions # --------------------------------------------------------------------------- class TestIndexE2E: def test_status_shows_present_after_rebuild(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) data = _invoke_status_json() for entry in data: assert entry["status"] == "present", f"{entry['name']} still absent" def test_status_entries_nonzero_after_rebuild(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) data = _invoke_status_json() sym = next(e for e in data if e["name"] == "symbol_history") assert sym["entries"] > 0 def test_symbol_history_contains_billing_compute(self, two_commit_repo: pathlib.Path) -> None: idx = _build_symbol_history(two_commit_repo) assert any("billing.py::compute" in addr for addr in idx) def test_hash_occurrence_cluster_for_clones(self, clone_repo: pathlib.Path) -> None: idx = _build_hash_occurrence(clone_repo) assert len(idx) > 0 def test_rebuild_is_idempotent(self, two_commit_repo: pathlib.Path) -> None: d1 = _invoke_rebuild_json() d2 = _invoke_rebuild_json() assert d1["symbol_history_addresses"] == d2["symbol_history_addresses"] assert d1["symbol_history_events"] == d2["symbol_history_events"] assert d1["hash_occurrence_clusters"] == d2["hash_occurrence_clusters"] def test_purge_then_rebuild_restores_present(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) runner.invoke(cli, ["code", "index", "purge"]) runner.invoke(cli, ["code", "index", "rebuild"]) data = _invoke_status_json() for entry in data: assert entry["status"] == "present" def test_purge_index_only_removes_targeted(self, two_commit_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) runner.invoke(cli, ["code", "index", "purge", "--index", "hash_occurrence"]) data = _invoke_status_json() sym = next(e for e in data if e["name"] == "symbol_history") ho = next(e for e in data if e["name"] == "hash_occurrence") assert sym["status"] == "present" assert ho["status"] == "absent" def test_dry_run_counts_match_real_rebuild(self, two_commit_repo: pathlib.Path) -> None: dry = _invoke_rebuild_json(["--dry-run"]) real = _invoke_rebuild_json() for key in ("symbol_history_addresses", "symbol_history_events", "hash_occurrence_clusters", "hash_occurrence_addresses"): assert dry.get(key) == real.get(key), f"Mismatch on {key}" # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestIndexStress: def test_50_commit_rebuild_completes(self, repo: pathlib.Path) -> None: """50 commits, each changing one function — rebuild must complete.""" for i in range(50): (repo / "worker.py").write_text(f"def work():\n return {i}\n") runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", f"v{i}"]) assert r.exit_code == 0, r.output result = runner.invoke(cli, ["code", "index", "rebuild", "--json"]) assert result.exit_code == 0 data: _RebuildPayload = json.loads(result.output) assert data.get("symbol_history_addresses", 0) > 0 def test_blob_cache_scales(self, repo: pathlib.Path) -> None: """10 commits on 1 file: blob for each version fetched exactly once.""" for i in range(10): (repo / "target.py").write_text(f"def fn():\n return {i}\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", f"v{i}"]) original_read = __import__( "muse.core.object_store", fromlist=["read_object"] ).read_object fetch_log: list[str] = [] def tracked_read(root: pathlib.Path, obj_id: str) -> bytes | None: fetch_log.append(obj_id) result: bytes | None = original_read(root, obj_id) return result with mock.patch( "muse.cli.commands.index_rebuild.read_object", side_effect=tracked_read ): _build_symbol_history(repo) unique_ids = set(fetch_log) # Every unique obj_id must appear exactly once for obj_id in unique_ids: assert fetch_log.count(obj_id) == 1, ( f"obj_id {obj_id[:8]}… fetched {fetch_log.count(obj_id)} times" ) def test_large_flat_file_hash_occurrence(self, repo: pathlib.Path) -> None: """200 unique functions: no hash_occurrence clusters (all distinct bodies).""" funcs = "\n\n".join(f"def func_{i}():\n return {i}" for i in range(200)) (repo / "flat.py").write_text(f"{funcs}\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "flat"]) idx = _build_hash_occurrence(repo) # All distinct bodies → no clusters assert len(idx) == 0 def test_rebuild_performance(self, repo: pathlib.Path) -> None: """20 commits: rebuild must finish within 30 seconds.""" for i in range(20): (repo / "perf.py").write_text(f"def work():\n return {i}\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", f"v{i}"]) start = time.monotonic() result = runner.invoke(cli, ["code", "index", "rebuild"]) elapsed = time.monotonic() - start assert result.exit_code == 0 assert elapsed < 30.0, f"rebuild took {elapsed:.1f}s — too slow" class TestRegisterFlags: def test_json_short_flag(self) -> None: import argparse from muse.cli.commands.index_rebuild import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["index", "rebuild", "-j"]) assert args.json_out is True def test_json_long_flag(self) -> None: import argparse from muse.cli.commands.index_rebuild import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["index", "rebuild", "--json"]) assert args.json_out is True def test_default_no_json(self) -> None: import argparse from muse.cli.commands.index_rebuild import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["index", "rebuild"]) assert args.json_out is False