"""Tests for ``muse code clones``. Coverage layers --------------- Unit find_clones — exact tier, near tier, both, kind_filter, language_filter, file_filter, exclude_same_file, min_cluster, empty manifest. _all_same_file — single file, multi-file. _file_hotspots — ranking, top-N cap, empty input. _CloneCluster — to_dict count is int (not str), member fields present. Integration (live repo via CliRunner) Exits zero for all valid tier values. JSON schema: all required top-level keys, correct types. JSON: count field is int (type-regression guard). JSON: branch field present and non-empty. JSON: total_symbols_involved matches sum of cluster member counts. JSON: file_hotspots is a ranked list of dicts. --tier exact, near, both. --kind restricts output symbols. --language restricts to that language. --file restricts to path prefix. --exclude-same-file removes same-file clusters. --min-cluster < 2 rejected. --min-cluster 3 raises minimum size. --commit HEAD analyses specific snapshot. --commit invalid ref exits non-zero. Text output contains all section headers. No-repo exits non-zero. Empty repo (no commits) exits non-zero. E2E (real duplicate symbols in a live repo) Exact clone detected when two files contain identical function bodies. Near-clone detected when two files share a signature but differ in body. No false-positive clones in a repo with unique symbols only. --exclude-same-file removes a same-file cluster but keeps cross-file ones. file_hotspots ranks the file with the most clones first. Stress 10 000 symbols, 1 000 exact-clone pairs: correct count, fast. Large near-clone group: all members present, no duplicates. Repeated runs: identical deterministic output. """ from __future__ import annotations import json import pathlib import textwrap import time from typing import TypedDict import pytest from tests.cli_test_helper import CliRunner from muse.cli.commands.clones import ( CloneTier, _CloneCluster, _all_same_file, _file_hotspots, find_clones, ) from muse.plugins.code.ast_parser import SymbolKind, SymbolRecord, SymbolTree from muse.core.paths import indices_dir cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() type _SymMap = dict[str, SymbolTree] type _SymMapInput = dict[str, list[tuple[str, SymbolRecord]]] # --------------------------------------------------------------------------- # Typed payload for JSON assertions # --------------------------------------------------------------------------- class _MemberEntry(TypedDict): address: str kind: str language: str body_hash: str signature_id: str content_id: str class _ClusterEntry(TypedDict): tier: str hash: str count: int members: list[_MemberEntry] class _HotspotEntry(TypedDict): file: str clone_symbols: int class _ClonesPayload(TypedDict): schema_version: str commit: str branch: str tier: str min_cluster: int kind_filter: str | None language_filter: str | None file_filter: str | None exclude_same_file: bool exact_clone_clusters: int near_clone_clusters: int total_symbols_involved: int file_hotspots: list[_HotspotEntry] clusters: list[_ClusterEntry] # --------------------------------------------------------------------------- # Test helpers # --------------------------------------------------------------------------- def _make_record( kind: SymbolKind = "function", body_hash: str = "aabbccdd", sig_id: str = "11223344", content_id: str = "deadbeef", ) -> SymbolRecord: return SymbolRecord( kind=kind, name="fn", qualified_name="fn", lineno=1, end_lineno=5, content_id=content_id * 8, body_hash=body_hash * 8, signature_id=sig_id * 8, metadata_id="", canonical_key="", ) def _make_sym_map( files: _SymMapInput, ) -> _SymMap: """Build a sym_map from a {file_path: [(addr, record), ...]} dict.""" result: _SymMap = {} for fp, entries in files.items(): tree: SymbolTree = {addr: rec for addr, rec in entries} result[fp] = tree return result def _clones_json(args: list[str] | None = None) -> _ClonesPayload: cmd = ["code", "clones", "--json"] + (args or []) result = runner.invoke(cli, cmd) assert result.exit_code == 0, result.output raw: _ClonesPayload = json.loads(result.output) return raw # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["init", "--domain", "code"]) assert result.exit_code == 0, result.output return tmp_path @pytest.fixture def code_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with a single committed Python file — no duplicates.""" (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): return sum(items) def apply_discount(total, pct): return total * (1 - pct) """)) r = runner.invoke(cli, ["commit", "-m", "Initial"]) assert r.exit_code == 0, r.output return repo @pytest.fixture def exact_clone_repo(repo: pathlib.Path) -> pathlib.Path: """Two files with identical content — exact clone. Uses genuinely byte-for-byte identical files to exercise the SymbolCache re-key path (_rekey_tree) that was fixed to handle same-SHA-256 files without conflating their addresses. """ body = textwrap.dedent("""\ def helper(x): return x * 2 """) (repo / "a.py").write_text(body) (repo / "b.py").write_text(body) r = runner.invoke(cli, ["commit", "-m", "Exact clone"]) assert r.exit_code == 0, r.output return repo @pytest.fixture def near_clone_repo(repo: pathlib.Path) -> pathlib.Path: """Two files with the same function signature but different bodies — near-clone.""" (repo / "a.py").write_text(textwrap.dedent("""\ def transform(x: int) -> int: return x * 2 """)) (repo / "b.py").write_text(textwrap.dedent("""\ def transform(x: int) -> int: return x + 10 """)) r = runner.invoke(cli, ["commit", "-m", "Near clone"]) assert r.exit_code == 0, r.output return repo @pytest.fixture def mixed_clone_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with both exact and near clones plus an isolated file.""" identical_body = textwrap.dedent("""\ def shared(x): return x """) (repo / "alpha.py").write_text(identical_body) (repo / "beta.py").write_text(identical_body) (repo / "gamma.py").write_text(textwrap.dedent("""\ def shared(x): return x + 1 """)) (repo / "unique.py").write_text(textwrap.dedent("""\ def one_of_a_kind(): return 42 """)) r = runner.invoke(cli, ["commit", "-m", "Mixed clones"]) assert r.exit_code == 0, r.output return repo @pytest.fixture def same_file_clone_repo(repo: pathlib.Path) -> pathlib.Path: """One file with two identical helper functions (same-file clone) plus a second file that also shares the same body (cross-file clone). utils.py: _helper_a and _helper_b are same-file clones of each other, AND of _helper_c in other.py. other.py: _helper_c is a cross-file clone of utils.py's helpers. """ (repo / "utils.py").write_text(textwrap.dedent("""\ def _helper_a(x): return x * 2 def _helper_b(x): return x * 2 """)) (repo / "other.py").write_text(textwrap.dedent("""\ def _helper_c(x): return x * 2 """)) r = runner.invoke(cli, ["commit", "-m", "Same-file clone"]) assert r.exit_code == 0, r.output return repo # --------------------------------------------------------------------------- # Unit — _all_same_file # --------------------------------------------------------------------------- class TestAllSameFile: def test_single_member_same_file(self) -> None: members = [("src/a.py::fn", _make_record())] assert _all_same_file(members) is True def test_two_members_same_file(self) -> None: rec = _make_record() members = [("src/a.py::fn1", rec), ("src/a.py::fn2", rec)] assert _all_same_file(members) is True def test_two_members_different_files(self) -> None: rec = _make_record() members = [("src/a.py::fn", rec), ("src/b.py::fn", rec)] assert _all_same_file(members) is False def test_three_members_one_different(self) -> None: rec = _make_record() members = [ ("src/a.py::fn", rec), ("src/a.py::gn", rec), ("src/b.py::fn", rec), ] assert _all_same_file(members) is False # --------------------------------------------------------------------------- # Unit — _file_hotspots # --------------------------------------------------------------------------- class TestFileHotspots: def _cluster(self, addresses: list[str]) -> _CloneCluster: rec = _make_record() return _CloneCluster("exact", "aabb", [(a, rec) for a in addresses]) def test_empty_clusters_returns_empty(self) -> None: assert _file_hotspots([]) == [] def test_single_cluster_single_file(self) -> None: cluster = self._cluster(["a.py::fn1", "a.py::fn2"]) result = _file_hotspots([cluster]) assert len(result) == 1 assert result[0]["file"] == "a.py" assert result[0]["clone_symbols"] == 2 def test_ranked_descending(self) -> None: c1 = self._cluster(["a.py::f1", "a.py::f2", "a.py::f3"]) c2 = self._cluster(["b.py::f1"]) result = _file_hotspots([c1, c2]) assert result[0]["file"] == "a.py" assert result[0]["clone_symbols"] == 3 def test_top_cap_respected(self) -> None: clusters = [self._cluster([f"file_{i}.py::fn"]) for i in range(20)] result = _file_hotspots(clusters, top=5) assert len(result) == 5 def test_cross_cluster_accumulation(self) -> None: c1 = self._cluster(["shared.py::fn1", "other.py::fn2"]) c2 = self._cluster(["shared.py::fn3", "another.py::fn4"]) result = _file_hotspots([c1, c2]) shared = next(h for h in result if h["file"] == "shared.py") assert shared["clone_symbols"] == 2 # --------------------------------------------------------------------------- # Unit — _CloneCluster.to_dict # --------------------------------------------------------------------------- class TestCloneClusterToDict: def _cluster(self, n: int = 2) -> _CloneCluster: rec = _make_record() members = [(f"src/file_{i}.py::fn", rec) for i in range(n)] return _CloneCluster("exact", "aabbccdd" * 8, members) def test_count_is_int_not_str(self) -> None: d = self._cluster(3).to_dict() assert isinstance(d["count"], int), "count must be int — not str" assert d["count"] == 3 def test_tier_field(self) -> None: assert self._cluster().to_dict()["tier"] == "exact" def test_hash_is_full_id(self) -> None: d = self._cluster().to_dict() assert len(d["hash"]) == 64 assert all(c in "0123456789abcdef" for c in d["hash"]) def test_member_has_all_required_fields(self) -> None: d = self._cluster().to_dict() member = d["members"][0] for field in ("address", "kind", "language", "body_hash", "signature_id", "content_id"): assert field in member def test_member_hashes_are_full_ids(self) -> None: d = self._cluster().to_dict() m = d["members"][0] for field in ("body_hash", "signature_id", "content_id"): assert len(m[field]) == 64 assert all(c in "0123456789abcdef" for c in m[field]) # --------------------------------------------------------------------------- # Unit — find_clones (pure logic via sym_map injection) # --------------------------------------------------------------------------- class TestFindClonesUnit: """Tests that bypass the object store by mocking symbols_for_snapshot.""" def test_empty_manifest_returns_no_clusters( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod monkeypatch.setattr( clones_mod, "symbols_for_snapshot", lambda *a, **kw: {}, ) result = find_clones(tmp_path, {}, "both", None, 2) assert result == [] def test_exact_clone_detected( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec = _make_record(body_hash="deadbeef") sym_map = _make_sym_map({ "a.py": [("a.py::fn", rec)], "b.py": [("b.py::fn", rec)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "exact", None, 2) assert len(result) == 1 assert result[0].tier == "exact" assert len(result[0].members) == 2 def test_near_clone_detected( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec_a = _make_record(body_hash="aaaaaaaa", sig_id="shared123") rec_b = _make_record(body_hash="bbbbbbbb", sig_id="shared123") sym_map = _make_sym_map({ "a.py": [("a.py::fn", rec_a)], "b.py": [("b.py::fn", rec_b)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "near", None, 2) assert len(result) == 1 assert result[0].tier == "near" def test_exact_not_reported_in_near_tier( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec = _make_record(body_hash="identical", sig_id="same_sig") sym_map = _make_sym_map({ "a.py": [("a.py::fn", rec)], "b.py": [("b.py::fn", rec)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) # Same body AND same signature — should not appear in near tier # because unique_bodies has only 1 element. result = find_clones(tmp_path, {}, "near", None, 2) assert result == [] def test_min_cluster_filters_small_groups( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec = _make_record(body_hash="pair") sym_map = _make_sym_map({ "a.py": [("a.py::fn", rec)], "b.py": [("b.py::fn", rec)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) # Require at least 3 — pair of 2 should be excluded. result = find_clones(tmp_path, {}, "exact", None, 3) assert result == [] def test_exclude_same_file_skips_same_file_cluster( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec = _make_record(body_hash="twin") sym_map = _make_sym_map({ "a.py": [("a.py::fn1", rec), ("a.py::fn2", rec)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "exact", None, 2, exclude_same_file=True) assert result == [] def test_exclude_same_file_keeps_cross_file_cluster( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec = _make_record(body_hash="cross") sym_map = _make_sym_map({ "a.py": [("a.py::fn", rec)], "b.py": [("b.py::fn", rec)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "exact", None, 2, exclude_same_file=True) assert len(result) == 1 def test_file_filter_restricts_by_prefix( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec = _make_record(body_hash="filtered") sym_map = _make_sym_map({ "src/a.py": [("src/a.py::fn", rec)], "tests/a.py": [("tests/a.py::fn", rec)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "exact", None, 2, file_filter="src/") # Only src/ symbols — cluster disappears (only 1 member after filter). assert result == [] def test_clusters_sorted_largest_first( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands import clones as clones_mod rec_big = _make_record(body_hash="bigclone") rec_small = _make_record(body_hash="smllone") sym_map = _make_sym_map({ "a.py": [("a.py::fn", rec_small)], "b.py": [("b.py::fn", rec_small)], "c.py": [("c.py::fn", rec_big)], "d.py": [("d.py::fn", rec_big)], "e.py": [("e.py::fn", rec_big)], }) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "exact", None, 2) assert len(result[0].members) >= len(result[-1].members) # --------------------------------------------------------------------------- # Integration — basic CLI # --------------------------------------------------------------------------- class TestClonesCLIBasic: def test_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones"]) assert result.exit_code == 0, result.output def test_tier_exact_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--tier", "exact"]) assert result.exit_code == 0 def test_tier_near_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--tier", "near"]) assert result.exit_code == 0 def test_tier_invalid_exits_nonzero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--tier", "bogus"]) assert result.exit_code != 0 def test_no_repo_exits_nonzero( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) result = runner.invoke(cli, ["code", "clones"]) assert result.exit_code != 0 def test_text_output_no_crash(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones"]) assert result.exit_code == 0 assert "Clone analysis" in result.output def test_min_cluster_1_exits_nonzero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--min-cluster", "1"]) assert result.exit_code != 0 def test_empty_repo_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — JSON schema # --------------------------------------------------------------------------- class TestClonesJSONSchema: def test_json_is_valid(self, code_repo: pathlib.Path) -> None: data = _clones_json() assert isinstance(data, dict) def test_json_required_top_level_keys(self, code_repo: pathlib.Path) -> None: data = _clones_json() required = { "commit", "branch", "tier", "min_cluster", "language_filter", "file_filter", "exclude_same_file", "exact_clone_clusters", "near_clone_clusters", "total_symbols_involved", "file_hotspots", "clusters", } assert required <= data.keys() def test_json_count_is_int(self, exact_clone_repo: pathlib.Path) -> None: data = _clones_json(["--tier", "exact"]) for cluster in data["clusters"]: assert isinstance(cluster["count"], int), ( f"count must be int, got {type(cluster['count']).__name__}" ) def test_json_branch_is_nonempty_string(self, code_repo: pathlib.Path) -> None: data = _clones_json() assert isinstance(data["branch"], str) assert data["branch"] def test_json_total_symbols_matches_cluster_sums( self, exact_clone_repo: pathlib.Path ) -> None: data = _clones_json() expected = sum(c["count"] for c in data["clusters"]) assert data["total_symbols_involved"] == expected def test_json_file_hotspots_is_list(self, code_repo: pathlib.Path) -> None: data = _clones_json() assert isinstance(data["file_hotspots"], list) def test_json_file_hotspots_entry_fields( self, exact_clone_repo: pathlib.Path ) -> None: data = _clones_json() for h in data["file_hotspots"]: assert "file" in h assert "clone_symbols" in h assert isinstance(h["clone_symbols"], int) def test_json_exclude_same_file_flag_reflected( self, code_repo: pathlib.Path ) -> None: data = _clones_json(["--exclude-same-file"]) assert data["exclude_same_file"] is True def test_json_language_filter_reflected(self, code_repo: pathlib.Path) -> None: data = _clones_json(["--language", "Python"]) assert data["language_filter"] == "Python" def test_json_file_filter_reflected(self, code_repo: pathlib.Path) -> None: data = _clones_json(["--file", "src/"]) assert data["file_filter"] == "src/" def test_json_commit_is_short_id(self, code_repo: pathlib.Path) -> None: # short_id() returns "sha256:<12 hex chars>" for sha256-prefixed IDs data = _clones_json() assert isinstance(data["commit"], str) assert data["commit"].startswith("sha256:") hex_part = data["commit"][len("sha256:"):] assert all(c in "0123456789abcdef" for c in hex_part) def test_json_cluster_member_has_all_fields( self, exact_clone_repo: pathlib.Path ) -> None: data = _clones_json(["--tier", "exact"]) for cluster in data["clusters"]: for member in cluster["members"]: for field in ("address", "kind", "language", "body_hash", "signature_id", "content_id"): assert field in member # --------------------------------------------------------------------------- # Integration — flags # --------------------------------------------------------------------------- class TestClonesFlags: def test_min_cluster_3_excludes_pairs( self, exact_clone_repo: pathlib.Path ) -> None: data_2 = _clones_json(["--tier", "exact"]) data_3 = _clones_json(["--tier", "exact", "--min-cluster", "3"]) # The exact_clone_repo has only a 2-member cluster — disappears at min 3. assert data_2["exact_clone_clusters"] >= 1 assert data_3["exact_clone_clusters"] == 0 def test_language_filter_restricts(self, code_repo: pathlib.Path) -> None: data_py = _clones_json(["--language", "Python"]) data_all = _clones_json() # Python-filtered should have ≤ as many clusters as unfiltered. total_py = data_py["exact_clone_clusters"] + data_py["near_clone_clusters"] total_all = data_all["exact_clone_clusters"] + data_all["near_clone_clusters"] assert total_py <= total_all def test_file_filter_restricts(self, mixed_clone_repo: pathlib.Path) -> None: data_all = _clones_json() data_filtered = _clones_json(["--file", "unique.py"]) # unique.py has no clones — filtering to it yields 0 clusters. assert data_filtered["exact_clone_clusters"] == 0 assert data_filtered["near_clone_clusters"] == 0 def test_commit_head_flag(self, code_repo: pathlib.Path) -> None: data = _clones_json(["--commit", "HEAD"]) assert data["commit"] def test_commit_invalid_exits_nonzero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--commit", "no_such_ref_xyz"]) assert result.exit_code != 0 def test_kind_filter_in_json(self, code_repo: pathlib.Path) -> None: data = _clones_json(["--kind", "function"]) assert data["kind_filter"] == "function" # --------------------------------------------------------------------------- # E2E — real clone detection # --------------------------------------------------------------------------- class TestClonesE2E: def test_exact_clone_detected(self, exact_clone_repo: pathlib.Path) -> None: data = _clones_json(["--tier", "exact"]) assert data["exact_clone_clusters"] >= 1 # Each exact cluster must have ≥ 2 distinct members. for cluster in data["clusters"]: if cluster["tier"] == "exact": assert cluster["count"] >= 2 addresses = {m["address"] for m in cluster["members"]} # Members must live in different files. files = {addr.split("::")[0] for addr in addresses} assert len(files) >= 2, f"Exact clone cluster should span files, got: {files}" def test_exact_clone_count_is_2(self, exact_clone_repo: pathlib.Path) -> None: data = _clones_json(["--tier", "exact"]) # The helper function is the only clone; count = 2. clone_clusters = [c for c in data["clusters"] if c["tier"] == "exact"] assert any(c["count"] == 2 for c in clone_clusters) def test_near_clone_detected(self, near_clone_repo: pathlib.Path) -> None: data = _clones_json(["--tier", "near"]) assert data["near_clone_clusters"] >= 1 def test_near_clone_members_differ_in_body( self, near_clone_repo: pathlib.Path ) -> None: data = _clones_json(["--tier", "near"]) for cluster in data["clusters"]: if cluster["tier"] == "near": bodies = {m["body_hash"] for m in cluster["members"]} assert len(bodies) > 1, "near-clone members must have different body hashes" def test_no_false_positive_clones(self, code_repo: pathlib.Path) -> None: """Unique repo (no real clones) should detect zero cross-file clones.""" data = _clones_json(["--exclude-same-file"]) # With --exclude-same-file, all same-file duplicates are removed. # The code_repo has only one file with unique functions. assert data["exact_clone_clusters"] == 0 def test_exclude_same_file_removes_same_file_cluster( self, same_file_clone_repo: pathlib.Path ) -> None: data_incl = _clones_json(["--tier", "exact"]) data_excl = _clones_json(["--tier", "exact", "--exclude-same-file"]) # The same-file cluster (utils.py::_helper_a + utils.py::_helper_b) # should disappear. The cross-file clone (utils.py + other.py) stays. assert data_excl["exact_clone_clusters"] <= data_incl["exact_clone_clusters"] def test_file_hotspots_ranks_busiest_file_first( self, mixed_clone_repo: pathlib.Path ) -> None: data = _clones_json() if data["file_hotspots"]: counts = [h["clone_symbols"] for h in data["file_hotspots"]] assert counts == sorted(counts, reverse=True) def test_mixed_repo_has_both_tiers(self, mixed_clone_repo: pathlib.Path) -> None: data = _clones_json(["--tier", "both"]) # alpha.py and beta.py are exact clones; gamma.py is near-clone of both. assert data["exact_clone_clusters"] >= 1 def test_total_symbols_nonzero_when_clones_exist( self, exact_clone_repo: pathlib.Path ) -> None: data = _clones_json() assert data["total_symbols_involved"] >= 2 def test_text_output_exact_section(self, exact_clone_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--tier", "exact"]) assert result.exit_code == 0 assert "Exact clones" in result.output def test_identical_file_content_reports_distinct_addresses( self, exact_clone_repo: pathlib.Path ) -> None: """Regression: SymbolCache re-key bug. When a.py and b.py have byte-for-byte identical content they share the same SHA-256 cache key. Before the fix, b.py's tree was served with a.py's addresses, collapsing both members into the same address and making the cluster look like a same-file duplicate. After the fix, each file gets correctly addressed symbols. """ data = _clones_json(["--tier", "exact"]) for cluster in data["clusters"]: if cluster["tier"] == "exact" and cluster["count"] >= 2: files = {m["address"].split("::")[0] for m in cluster["members"]} assert len(files) >= 2, ( f"Cache re-key bug: cluster members collapsed to one file: {files}" ) def test_text_output_no_clones_message(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "clones", "--tier", "exact", "--exclude-same-file"] ) assert result.exit_code == 0 assert "No clones detected" in result.output or "0 clone cluster" in result.output # --------------------------------------------------------------------------- # Stress — performance and determinism # --------------------------------------------------------------------------- class TestClonesStress: def test_large_exact_clone_group( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """1 000 files all containing the same function body — one big cluster.""" from muse.cli.commands import clones as clones_mod rec = _make_record(body_hash="bigclone") sym_map = _make_sym_map( {f"src/file_{i}.py": [(f"src/file_{i}.py::fn", rec)] for i in range(1000)} ) monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "exact", None, 2) assert len(result) == 1 assert len(result[0].members) == 1000 def test_many_distinct_clone_pairs_performance( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """500 clone pairs (1 000 unique body hashes, 2 files each).""" from muse.cli.commands import clones as clones_mod sym_map: _SymMap = {} for i in range(500): rec = _make_record(body_hash=f"hash_{i:04d}") sym_map[f"a_{i}.py"] = {f"a_{i}.py::fn": rec} sym_map[f"b_{i}.py"] = {f"b_{i}.py::fn": rec} monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) start = time.monotonic() result = find_clones(tmp_path, {}, "exact", None, 2) elapsed = time.monotonic() - start assert len(result) == 500 assert elapsed < 5.0, f"find_clones took {elapsed:.1f}s on 1000 symbols — too slow" def test_near_clone_large_group( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """200 symbols sharing the same signature but each with a unique body.""" from muse.cli.commands import clones as clones_mod sym_map: _SymMap = {} for i in range(200): rec = _make_record(body_hash=f"body_{i:04d}", sig_id="shared_sig") sym_map[f"f_{i}.py"] = {f"f_{i}.py::fn": rec} monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map) result = find_clones(tmp_path, {}, "near", None, 2) assert len(result) == 1 assert len(result[0].members) == 200 def test_repeated_runs_deterministic(self, exact_clone_repo: pathlib.Path) -> None: result_a = runner.invoke(cli, ["code", "clones", "--json"]) result_b = runner.invoke(cli, ["code", "clones", "--json"]) assert result_a.exit_code == 0 assert result_b.exit_code == 0 da = json.loads(result_a.output) db = json.loads(result_b.output) da.pop("duration_ms", None) db.pop("duration_ms", None) da.pop("timestamp", None) db.pop("timestamp", None) assert da == db def test_clones_completes_within_time_bound( self, exact_clone_repo: pathlib.Path ) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "clones", "--json"]) elapsed = time.monotonic() - start assert result.exit_code == 0 assert elapsed < 10.0, f"clones took {elapsed:.1f}s — too slow" # --------------------------------------------------------------------------- # Flag tests # --------------------------------------------------------------------------- import argparse as _argparse # --------------------------------------------------------------------------- # Index acceleration — clones uses hash_occurrence index as fast path # --------------------------------------------------------------------------- class TestClonesIndexAcceleration: """muse code clones uses hash_occurrence as a fast path for exact tier.""" def _index_path(self, repo: pathlib.Path) -> pathlib.Path: return indices_dir(repo) / "hash_occurrence.json" def test_exact_clones_with_index_match_live_scan( self, exact_clone_repo: pathlib.Path ) -> None: """Index-accelerated results equal full-scan results.""" # Build the index. runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"]) assert self._index_path(exact_clone_repo).exists() data_with = _clones_json(["--tier", "exact"]) # Remove index → force full scan. self._index_path(exact_clone_repo).unlink() data_without = _clones_json(["--tier", "exact"]) assert data_with["exact_clone_clusters"] == data_without["exact_clone_clusters"] assert data_with["total_symbols_involved"] == data_without["total_symbols_involved"] def test_no_index_falls_back_to_full_scan( self, exact_clone_repo: pathlib.Path ) -> None: """Without index, clones still finds exact clones via snapshot scan.""" self._index_path(exact_clone_repo).unlink(missing_ok=True) data = _clones_json(["--tier", "exact"]) assert data["exact_clone_clusters"] >= 1 def test_commit_flag_bypasses_index( self, exact_clone_repo: pathlib.Path ) -> None: """--commit always uses a live snapshot scan regardless of index.""" runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"]) data = _clones_json(["--tier", "exact", "--commit", "HEAD"]) assert data["exact_clone_clusters"] >= 1 def test_index_accelerated_respects_file_filter( self, mixed_clone_repo: pathlib.Path ) -> None: """--file filter applied correctly when using index fast path.""" runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"]) data_all = _clones_json(["--tier", "exact"]) data_unique = _clones_json(["--tier", "exact", "--file", "unique.py"]) assert data_unique["exact_clone_clusters"] == 0 assert data_all["exact_clone_clusters"] >= 1 def test_index_accelerated_respects_min_cluster( self, exact_clone_repo: pathlib.Path ) -> None: """--min-cluster applied correctly when using index fast path.""" runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"]) data_2 = _clones_json(["--tier", "exact", "--min-cluster", "2"]) data_3 = _clones_json(["--tier", "exact", "--min-cluster", "3"]) # exact_clone_repo has only 2-member clusters → disappear at min 3 assert data_2["exact_clone_clusters"] >= 1 assert data_3["exact_clone_clusters"] == 0 def test_near_clones_with_index_still_found( self, near_clone_repo: pathlib.Path ) -> None: """Near clones are always found (index doesn't cover near tier).""" runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"]) data = _clones_json(["--tier", "near"]) assert data["near_clone_clusters"] >= 1 def test_both_tiers_with_index(self, mixed_clone_repo: pathlib.Path) -> None: """--tier both works correctly when index is present.""" runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"]) data = _clones_json(["--tier", "both"]) assert data["exit_code"] == 0 assert isinstance(data["exact_clone_clusters"], int) class TestRegisterFlags: def _parse(self, *args: str) -> _argparse.Namespace: from muse.cli.commands.clones import register p = _argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["clones", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j") assert ns.json_out is True