gabriel / muse public

test_cmd_clones.py file-level

at sha256:2 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Tests for ``muse code clones``.
2
3 Coverage layers
4 ---------------
5 Unit
6 find_clones β€” exact tier, near tier, both, kind_filter, language_filter,
7 file_filter, exclude_same_file, min_cluster, empty manifest.
8 _all_same_file β€” single file, multi-file.
9 _file_hotspots β€” ranking, top-N cap, empty input.
10 _CloneCluster β€” to_dict count is int (not str), member fields present.
11
12 Integration (live repo via CliRunner)
13 Exits zero for all valid tier values.
14 JSON schema: all required top-level keys, correct types.
15 JSON: count field is int (type-regression guard).
16 JSON: branch field present and non-empty.
17 JSON: total_symbols_involved matches sum of cluster member counts.
18 JSON: file_hotspots is a ranked list of dicts.
19 --tier exact, near, both.
20 --kind restricts output symbols.
21 --language restricts to that language.
22 --file restricts to path prefix.
23 --exclude-same-file removes same-file clusters.
24 --min-cluster < 2 rejected.
25 --min-cluster 3 raises minimum size.
26 --commit HEAD analyses specific snapshot.
27 --commit invalid ref exits non-zero.
28 Text output contains all section headers.
29 No-repo exits non-zero.
30 Empty repo (no commits) exits non-zero.
31
32 E2E (real duplicate symbols in a live repo)
33 Exact clone detected when two files contain identical function bodies.
34 Near-clone detected when two files share a signature but differ in body.
35 No false-positive clones in a repo with unique symbols only.
36 --exclude-same-file removes a same-file cluster but keeps cross-file ones.
37 file_hotspots ranks the file with the most clones first.
38
39 Stress
40 10 000 symbols, 1 000 exact-clone pairs: correct count, fast.
41 Large near-clone group: all members present, no duplicates.
42 Repeated runs: identical deterministic output.
43 """
44
45 from __future__ import annotations
46
47 import json
48 import pathlib
49 import textwrap
50 import time
51 from typing import TypedDict
52
53 import pytest
54
55 from tests.cli_test_helper import CliRunner
56
57 from muse.cli.commands.clones import (
58 CloneTier,
59 _CloneCluster,
60 _all_same_file,
61 _file_hotspots,
62 find_clones,
63 )
64 from muse.plugins.code.ast_parser import SymbolKind, SymbolRecord, SymbolTree
65 from muse.core.paths import indices_dir
66
67 cli = None # argparse migration β€” CliRunner ignores this arg
68 runner = CliRunner()
69
70 type _SymMap = dict[str, SymbolTree]
71 type _SymMapInput = dict[str, list[tuple[str, SymbolRecord]]]
72
73
74 # ---------------------------------------------------------------------------
75 # Typed payload for JSON assertions
76 # ---------------------------------------------------------------------------
77
78
79 class _MemberEntry(TypedDict):
80 address: str
81 kind: str
82 language: str
83 body_hash: str
84 signature_id: str
85 content_id: str
86
87
88 class _ClusterEntry(TypedDict):
89 tier: str
90 hash: str
91 count: int
92 members: list[_MemberEntry]
93
94
95 class _HotspotEntry(TypedDict):
96 file: str
97 clone_symbols: int
98
99
100 class _ClonesPayload(TypedDict):
101 schema_version: str
102 commit: str
103 branch: str
104 tier: str
105 min_cluster: int
106 kind_filter: str | None
107 language_filter: str | None
108 file_filter: str | None
109 exclude_same_file: bool
110 exact_clone_clusters: int
111 near_clone_clusters: int
112 total_symbols_involved: int
113 file_hotspots: list[_HotspotEntry]
114 clusters: list[_ClusterEntry]
115
116
117 # ---------------------------------------------------------------------------
118 # Test helpers
119 # ---------------------------------------------------------------------------
120
121
122 def _make_record(
123 kind: SymbolKind = "function",
124 body_hash: str = "aabbccdd",
125 sig_id: str = "11223344",
126 content_id: str = "deadbeef",
127 ) -> SymbolRecord:
128 return SymbolRecord(
129 kind=kind,
130 name="fn",
131 qualified_name="fn",
132 lineno=1,
133 end_lineno=5,
134 content_id=content_id * 8,
135 body_hash=body_hash * 8,
136 signature_id=sig_id * 8,
137 metadata_id="",
138 canonical_key="",
139 )
140
141
142 def _make_sym_map(
143 files: _SymMapInput,
144 ) -> _SymMap:
145 """Build a sym_map from a {file_path: [(addr, record), ...]} dict."""
146 result: _SymMap = {}
147 for fp, entries in files.items():
148 tree: SymbolTree = {addr: rec for addr, rec in entries}
149 result[fp] = tree
150 return result
151
152
153 def _clones_json(args: list[str] | None = None) -> _ClonesPayload:
154 cmd = ["code", "clones", "--json"] + (args or [])
155 result = runner.invoke(cli, cmd)
156 assert result.exit_code == 0, result.output
157 raw: _ClonesPayload = json.loads(result.output)
158 return raw
159
160
161 # ---------------------------------------------------------------------------
162 # Fixtures
163 # ---------------------------------------------------------------------------
164
165
166 @pytest.fixture
167 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
168 monkeypatch.chdir(tmp_path)
169 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
170 result = runner.invoke(cli, ["init", "--domain", "code"])
171 assert result.exit_code == 0, result.output
172 return tmp_path
173
174
175 @pytest.fixture
176 def code_repo(repo: pathlib.Path) -> pathlib.Path:
177 """Repo with a single committed Python file β€” no duplicates."""
178 (repo / "billing.py").write_text(textwrap.dedent("""\
179 def compute_total(items):
180 return sum(items)
181
182 def apply_discount(total, pct):
183 return total * (1 - pct)
184 """))
185 r = runner.invoke(cli, ["commit", "-m", "Initial"])
186 assert r.exit_code == 0, r.output
187 return repo
188
189
190 @pytest.fixture
191 def exact_clone_repo(repo: pathlib.Path) -> pathlib.Path:
192 """Two files with identical content β€” exact clone.
193
194 Uses genuinely byte-for-byte identical files to exercise the
195 SymbolCache re-key path (_rekey_tree) that was fixed to handle
196 same-SHA-256 files without conflating their addresses.
197 """
198 body = textwrap.dedent("""\
199 def helper(x):
200 return x * 2
201 """)
202 (repo / "a.py").write_text(body)
203 (repo / "b.py").write_text(body)
204 r = runner.invoke(cli, ["commit", "-m", "Exact clone"])
205 assert r.exit_code == 0, r.output
206 return repo
207
208
209 @pytest.fixture
210 def near_clone_repo(repo: pathlib.Path) -> pathlib.Path:
211 """Two files with the same function signature but different bodies β€” near-clone."""
212 (repo / "a.py").write_text(textwrap.dedent("""\
213 def transform(x: int) -> int:
214 return x * 2
215 """))
216 (repo / "b.py").write_text(textwrap.dedent("""\
217 def transform(x: int) -> int:
218 return x + 10
219 """))
220 r = runner.invoke(cli, ["commit", "-m", "Near clone"])
221 assert r.exit_code == 0, r.output
222 return repo
223
224
225 @pytest.fixture
226 def mixed_clone_repo(repo: pathlib.Path) -> pathlib.Path:
227 """Repo with both exact and near clones plus an isolated file."""
228 identical_body = textwrap.dedent("""\
229 def shared(x):
230 return x
231 """)
232 (repo / "alpha.py").write_text(identical_body)
233 (repo / "beta.py").write_text(identical_body)
234 (repo / "gamma.py").write_text(textwrap.dedent("""\
235 def shared(x):
236 return x + 1
237 """))
238 (repo / "unique.py").write_text(textwrap.dedent("""\
239 def one_of_a_kind():
240 return 42
241 """))
242 r = runner.invoke(cli, ["commit", "-m", "Mixed clones"])
243 assert r.exit_code == 0, r.output
244 return repo
245
246
247 @pytest.fixture
248 def same_file_clone_repo(repo: pathlib.Path) -> pathlib.Path:
249 """One file with two identical helper functions (same-file clone) plus
250 a second file that also shares the same body (cross-file clone).
251
252 utils.py: _helper_a and _helper_b are same-file clones of each other,
253 AND of _helper_c in other.py.
254 other.py: _helper_c is a cross-file clone of utils.py's helpers.
255 """
256 (repo / "utils.py").write_text(textwrap.dedent("""\
257 def _helper_a(x):
258 return x * 2
259
260 def _helper_b(x):
261 return x * 2
262 """))
263 (repo / "other.py").write_text(textwrap.dedent("""\
264 def _helper_c(x):
265 return x * 2
266 """))
267 r = runner.invoke(cli, ["commit", "-m", "Same-file clone"])
268 assert r.exit_code == 0, r.output
269 return repo
270
271
272 # ---------------------------------------------------------------------------
273 # Unit β€” _all_same_file
274 # ---------------------------------------------------------------------------
275
276
277 class TestAllSameFile:
278 def test_single_member_same_file(self) -> None:
279 members = [("src/a.py::fn", _make_record())]
280 assert _all_same_file(members) is True
281
282 def test_two_members_same_file(self) -> None:
283 rec = _make_record()
284 members = [("src/a.py::fn1", rec), ("src/a.py::fn2", rec)]
285 assert _all_same_file(members) is True
286
287 def test_two_members_different_files(self) -> None:
288 rec = _make_record()
289 members = [("src/a.py::fn", rec), ("src/b.py::fn", rec)]
290 assert _all_same_file(members) is False
291
292 def test_three_members_one_different(self) -> None:
293 rec = _make_record()
294 members = [
295 ("src/a.py::fn", rec),
296 ("src/a.py::gn", rec),
297 ("src/b.py::fn", rec),
298 ]
299 assert _all_same_file(members) is False
300
301
302 # ---------------------------------------------------------------------------
303 # Unit β€” _file_hotspots
304 # ---------------------------------------------------------------------------
305
306
307 class TestFileHotspots:
308 def _cluster(self, addresses: list[str]) -> _CloneCluster:
309 rec = _make_record()
310 return _CloneCluster("exact", "aabb", [(a, rec) for a in addresses])
311
312 def test_empty_clusters_returns_empty(self) -> None:
313 assert _file_hotspots([]) == []
314
315 def test_single_cluster_single_file(self) -> None:
316 cluster = self._cluster(["a.py::fn1", "a.py::fn2"])
317 result = _file_hotspots([cluster])
318 assert len(result) == 1
319 assert result[0]["file"] == "a.py"
320 assert result[0]["clone_symbols"] == 2
321
322 def test_ranked_descending(self) -> None:
323 c1 = self._cluster(["a.py::f1", "a.py::f2", "a.py::f3"])
324 c2 = self._cluster(["b.py::f1"])
325 result = _file_hotspots([c1, c2])
326 assert result[0]["file"] == "a.py"
327 assert result[0]["clone_symbols"] == 3
328
329 def test_top_cap_respected(self) -> None:
330 clusters = [self._cluster([f"file_{i}.py::fn"]) for i in range(20)]
331 result = _file_hotspots(clusters, top=5)
332 assert len(result) == 5
333
334 def test_cross_cluster_accumulation(self) -> None:
335 c1 = self._cluster(["shared.py::fn1", "other.py::fn2"])
336 c2 = self._cluster(["shared.py::fn3", "another.py::fn4"])
337 result = _file_hotspots([c1, c2])
338 shared = next(h for h in result if h["file"] == "shared.py")
339 assert shared["clone_symbols"] == 2
340
341
342 # ---------------------------------------------------------------------------
343 # Unit β€” _CloneCluster.to_dict
344 # ---------------------------------------------------------------------------
345
346
347 class TestCloneClusterToDict:
348 def _cluster(self, n: int = 2) -> _CloneCluster:
349 rec = _make_record()
350 members = [(f"src/file_{i}.py::fn", rec) for i in range(n)]
351 return _CloneCluster("exact", "aabbccdd" * 8, members)
352
353 def test_count_is_int_not_str(self) -> None:
354 d = self._cluster(3).to_dict()
355 assert isinstance(d["count"], int), "count must be int β€” not str"
356 assert d["count"] == 3
357
358 def test_tier_field(self) -> None:
359 assert self._cluster().to_dict()["tier"] == "exact"
360
361 def test_hash_is_full_id(self) -> None:
362 d = self._cluster().to_dict()
363 assert len(d["hash"]) == 64
364 assert all(c in "0123456789abcdef" for c in d["hash"])
365
366 def test_member_has_all_required_fields(self) -> None:
367 d = self._cluster().to_dict()
368 member = d["members"][0]
369 for field in ("address", "kind", "language", "body_hash", "signature_id", "content_id"):
370 assert field in member
371
372 def test_member_hashes_are_full_ids(self) -> None:
373 d = self._cluster().to_dict()
374 m = d["members"][0]
375 for field in ("body_hash", "signature_id", "content_id"):
376 assert len(m[field]) == 64
377 assert all(c in "0123456789abcdef" for c in m[field])
378
379
380 # ---------------------------------------------------------------------------
381 # Unit β€” find_clones (pure logic via sym_map injection)
382 # ---------------------------------------------------------------------------
383
384
385 class TestFindClonesUnit:
386 """Tests that bypass the object store by mocking symbols_for_snapshot."""
387
388 def test_empty_manifest_returns_no_clusters(
389 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
390 ) -> None:
391 from muse.cli.commands import clones as clones_mod
392
393 monkeypatch.setattr(
394 clones_mod, "symbols_for_snapshot",
395 lambda *a, **kw: {},
396 )
397 result = find_clones(tmp_path, {}, "both", None, 2)
398 assert result == []
399
400 def test_exact_clone_detected(
401 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
402 ) -> None:
403 from muse.cli.commands import clones as clones_mod
404
405 rec = _make_record(body_hash="deadbeef")
406 sym_map = _make_sym_map({
407 "a.py": [("a.py::fn", rec)],
408 "b.py": [("b.py::fn", rec)],
409 })
410 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
411 result = find_clones(tmp_path, {}, "exact", None, 2)
412 assert len(result) == 1
413 assert result[0].tier == "exact"
414 assert len(result[0].members) == 2
415
416 def test_near_clone_detected(
417 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
418 ) -> None:
419 from muse.cli.commands import clones as clones_mod
420
421 rec_a = _make_record(body_hash="aaaaaaaa", sig_id="shared123")
422 rec_b = _make_record(body_hash="bbbbbbbb", sig_id="shared123")
423 sym_map = _make_sym_map({
424 "a.py": [("a.py::fn", rec_a)],
425 "b.py": [("b.py::fn", rec_b)],
426 })
427 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
428 result = find_clones(tmp_path, {}, "near", None, 2)
429 assert len(result) == 1
430 assert result[0].tier == "near"
431
432 def test_exact_not_reported_in_near_tier(
433 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
434 ) -> None:
435 from muse.cli.commands import clones as clones_mod
436
437 rec = _make_record(body_hash="identical", sig_id="same_sig")
438 sym_map = _make_sym_map({
439 "a.py": [("a.py::fn", rec)],
440 "b.py": [("b.py::fn", rec)],
441 })
442 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
443 # Same body AND same signature β€” should not appear in near tier
444 # because unique_bodies has only 1 element.
445 result = find_clones(tmp_path, {}, "near", None, 2)
446 assert result == []
447
448 def test_min_cluster_filters_small_groups(
449 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
450 ) -> None:
451 from muse.cli.commands import clones as clones_mod
452
453 rec = _make_record(body_hash="pair")
454 sym_map = _make_sym_map({
455 "a.py": [("a.py::fn", rec)],
456 "b.py": [("b.py::fn", rec)],
457 })
458 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
459 # Require at least 3 β€” pair of 2 should be excluded.
460 result = find_clones(tmp_path, {}, "exact", None, 3)
461 assert result == []
462
463 def test_exclude_same_file_skips_same_file_cluster(
464 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
465 ) -> None:
466 from muse.cli.commands import clones as clones_mod
467
468 rec = _make_record(body_hash="twin")
469 sym_map = _make_sym_map({
470 "a.py": [("a.py::fn1", rec), ("a.py::fn2", rec)],
471 })
472 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
473 result = find_clones(tmp_path, {}, "exact", None, 2, exclude_same_file=True)
474 assert result == []
475
476 def test_exclude_same_file_keeps_cross_file_cluster(
477 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
478 ) -> None:
479 from muse.cli.commands import clones as clones_mod
480
481 rec = _make_record(body_hash="cross")
482 sym_map = _make_sym_map({
483 "a.py": [("a.py::fn", rec)],
484 "b.py": [("b.py::fn", rec)],
485 })
486 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
487 result = find_clones(tmp_path, {}, "exact", None, 2, exclude_same_file=True)
488 assert len(result) == 1
489
490 def test_file_filter_restricts_by_prefix(
491 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
492 ) -> None:
493 from muse.cli.commands import clones as clones_mod
494
495 rec = _make_record(body_hash="filtered")
496 sym_map = _make_sym_map({
497 "src/a.py": [("src/a.py::fn", rec)],
498 "tests/a.py": [("tests/a.py::fn", rec)],
499 })
500 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
501 result = find_clones(tmp_path, {}, "exact", None, 2, file_filter="src/")
502 # Only src/ symbols β€” cluster disappears (only 1 member after filter).
503 assert result == []
504
505 def test_clusters_sorted_largest_first(
506 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
507 ) -> None:
508 from muse.cli.commands import clones as clones_mod
509
510 rec_big = _make_record(body_hash="bigclone")
511 rec_small = _make_record(body_hash="smllone")
512 sym_map = _make_sym_map({
513 "a.py": [("a.py::fn", rec_small)],
514 "b.py": [("b.py::fn", rec_small)],
515 "c.py": [("c.py::fn", rec_big)],
516 "d.py": [("d.py::fn", rec_big)],
517 "e.py": [("e.py::fn", rec_big)],
518 })
519 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
520 result = find_clones(tmp_path, {}, "exact", None, 2)
521 assert len(result[0].members) >= len(result[-1].members)
522
523
524 # ---------------------------------------------------------------------------
525 # Integration β€” basic CLI
526 # ---------------------------------------------------------------------------
527
528
529 class TestClonesCLIBasic:
530 def test_exits_zero(self, code_repo: pathlib.Path) -> None:
531 result = runner.invoke(cli, ["code", "clones"])
532 assert result.exit_code == 0, result.output
533
534 def test_tier_exact_exits_zero(self, code_repo: pathlib.Path) -> None:
535 result = runner.invoke(cli, ["code", "clones", "--tier", "exact"])
536 assert result.exit_code == 0
537
538 def test_tier_near_exits_zero(self, code_repo: pathlib.Path) -> None:
539 result = runner.invoke(cli, ["code", "clones", "--tier", "near"])
540 assert result.exit_code == 0
541
542 def test_tier_invalid_exits_nonzero(self, code_repo: pathlib.Path) -> None:
543 result = runner.invoke(cli, ["code", "clones", "--tier", "bogus"])
544 assert result.exit_code != 0
545
546 def test_no_repo_exits_nonzero(
547 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
548 ) -> None:
549 monkeypatch.chdir(tmp_path)
550 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
551 result = runner.invoke(cli, ["code", "clones"])
552 assert result.exit_code != 0
553
554 def test_text_output_no_crash(self, code_repo: pathlib.Path) -> None:
555 result = runner.invoke(cli, ["code", "clones"])
556 assert result.exit_code == 0
557 assert "Clone analysis" in result.output
558
559 def test_min_cluster_1_exits_nonzero(self, code_repo: pathlib.Path) -> None:
560 result = runner.invoke(cli, ["code", "clones", "--min-cluster", "1"])
561 assert result.exit_code != 0
562
563 def test_empty_repo_exits_nonzero(self, repo: pathlib.Path) -> None:
564 result = runner.invoke(cli, ["code", "clones"])
565 assert result.exit_code != 0
566
567
568 # ---------------------------------------------------------------------------
569 # Integration β€” JSON schema
570 # ---------------------------------------------------------------------------
571
572
573 class TestClonesJSONSchema:
574 def test_json_is_valid(self, code_repo: pathlib.Path) -> None:
575 data = _clones_json()
576 assert isinstance(data, dict)
577
578 def test_json_required_top_level_keys(self, code_repo: pathlib.Path) -> None:
579 data = _clones_json()
580 required = {
581 "commit", "branch", "tier", "min_cluster",
582 "language_filter", "file_filter", "exclude_same_file",
583 "exact_clone_clusters", "near_clone_clusters",
584 "total_symbols_involved", "file_hotspots", "clusters",
585 }
586 assert required <= data.keys()
587
588 def test_json_count_is_int(self, exact_clone_repo: pathlib.Path) -> None:
589 data = _clones_json(["--tier", "exact"])
590 for cluster in data["clusters"]:
591 assert isinstance(cluster["count"], int), (
592 f"count must be int, got {type(cluster['count']).__name__}"
593 )
594
595 def test_json_branch_is_nonempty_string(self, code_repo: pathlib.Path) -> None:
596 data = _clones_json()
597 assert isinstance(data["branch"], str)
598 assert data["branch"]
599
600 def test_json_total_symbols_matches_cluster_sums(
601 self, exact_clone_repo: pathlib.Path
602 ) -> None:
603 data = _clones_json()
604 expected = sum(c["count"] for c in data["clusters"])
605 assert data["total_symbols_involved"] == expected
606
607 def test_json_file_hotspots_is_list(self, code_repo: pathlib.Path) -> None:
608 data = _clones_json()
609 assert isinstance(data["file_hotspots"], list)
610
611 def test_json_file_hotspots_entry_fields(
612 self, exact_clone_repo: pathlib.Path
613 ) -> None:
614 data = _clones_json()
615 for h in data["file_hotspots"]:
616 assert "file" in h
617 assert "clone_symbols" in h
618 assert isinstance(h["clone_symbols"], int)
619
620 def test_json_exclude_same_file_flag_reflected(
621 self, code_repo: pathlib.Path
622 ) -> None:
623 data = _clones_json(["--exclude-same-file"])
624 assert data["exclude_same_file"] is True
625
626 def test_json_language_filter_reflected(self, code_repo: pathlib.Path) -> None:
627 data = _clones_json(["--language", "Python"])
628 assert data["language_filter"] == "Python"
629
630 def test_json_file_filter_reflected(self, code_repo: pathlib.Path) -> None:
631 data = _clones_json(["--file", "src/"])
632 assert data["file_filter"] == "src/"
633
634 def test_json_commit_is_short_id(self, code_repo: pathlib.Path) -> None:
635 # short_id() returns "sha256:<12 hex chars>" for sha256-prefixed IDs
636 data = _clones_json()
637 assert isinstance(data["commit"], str)
638 assert data["commit"].startswith("sha256:")
639 hex_part = data["commit"][len("sha256:"):]
640 assert all(c in "0123456789abcdef" for c in hex_part)
641
642 def test_json_cluster_member_has_all_fields(
643 self, exact_clone_repo: pathlib.Path
644 ) -> None:
645 data = _clones_json(["--tier", "exact"])
646 for cluster in data["clusters"]:
647 for member in cluster["members"]:
648 for field in ("address", "kind", "language", "body_hash",
649 "signature_id", "content_id"):
650 assert field in member
651
652
653 # ---------------------------------------------------------------------------
654 # Integration β€” flags
655 # ---------------------------------------------------------------------------
656
657
658 class TestClonesFlags:
659 def test_min_cluster_3_excludes_pairs(
660 self, exact_clone_repo: pathlib.Path
661 ) -> None:
662 data_2 = _clones_json(["--tier", "exact"])
663 data_3 = _clones_json(["--tier", "exact", "--min-cluster", "3"])
664 # The exact_clone_repo has only a 2-member cluster β€” disappears at min 3.
665 assert data_2["exact_clone_clusters"] >= 1
666 assert data_3["exact_clone_clusters"] == 0
667
668 def test_language_filter_restricts(self, code_repo: pathlib.Path) -> None:
669 data_py = _clones_json(["--language", "Python"])
670 data_all = _clones_json()
671 # Python-filtered should have ≀ as many clusters as unfiltered.
672 total_py = data_py["exact_clone_clusters"] + data_py["near_clone_clusters"]
673 total_all = data_all["exact_clone_clusters"] + data_all["near_clone_clusters"]
674 assert total_py <= total_all
675
676 def test_file_filter_restricts(self, mixed_clone_repo: pathlib.Path) -> None:
677 data_all = _clones_json()
678 data_filtered = _clones_json(["--file", "unique.py"])
679 # unique.py has no clones β€” filtering to it yields 0 clusters.
680 assert data_filtered["exact_clone_clusters"] == 0
681 assert data_filtered["near_clone_clusters"] == 0
682
683 def test_commit_head_flag(self, code_repo: pathlib.Path) -> None:
684 data = _clones_json(["--commit", "HEAD"])
685 assert data["commit"]
686
687 def test_commit_invalid_exits_nonzero(self, code_repo: pathlib.Path) -> None:
688 result = runner.invoke(cli, ["code", "clones", "--commit", "no_such_ref_xyz"])
689 assert result.exit_code != 0
690
691 def test_kind_filter_in_json(self, code_repo: pathlib.Path) -> None:
692 data = _clones_json(["--kind", "function"])
693 assert data["kind_filter"] == "function"
694
695
696 # ---------------------------------------------------------------------------
697 # E2E β€” real clone detection
698 # ---------------------------------------------------------------------------
699
700
701 class TestClonesE2E:
702 def test_exact_clone_detected(self, exact_clone_repo: pathlib.Path) -> None:
703 data = _clones_json(["--tier", "exact"])
704 assert data["exact_clone_clusters"] >= 1
705 # Each exact cluster must have β‰₯ 2 distinct members.
706 for cluster in data["clusters"]:
707 if cluster["tier"] == "exact":
708 assert cluster["count"] >= 2
709 addresses = {m["address"] for m in cluster["members"]}
710 # Members must live in different files.
711 files = {addr.split("::")[0] for addr in addresses}
712 assert len(files) >= 2, f"Exact clone cluster should span files, got: {files}"
713
714 def test_exact_clone_count_is_2(self, exact_clone_repo: pathlib.Path) -> None:
715 data = _clones_json(["--tier", "exact"])
716 # The helper function is the only clone; count = 2.
717 clone_clusters = [c for c in data["clusters"] if c["tier"] == "exact"]
718 assert any(c["count"] == 2 for c in clone_clusters)
719
720 def test_near_clone_detected(self, near_clone_repo: pathlib.Path) -> None:
721 data = _clones_json(["--tier", "near"])
722 assert data["near_clone_clusters"] >= 1
723
724 def test_near_clone_members_differ_in_body(
725 self, near_clone_repo: pathlib.Path
726 ) -> None:
727 data = _clones_json(["--tier", "near"])
728 for cluster in data["clusters"]:
729 if cluster["tier"] == "near":
730 bodies = {m["body_hash"] for m in cluster["members"]}
731 assert len(bodies) > 1, "near-clone members must have different body hashes"
732
733 def test_no_false_positive_clones(self, code_repo: pathlib.Path) -> None:
734 """Unique repo (no real clones) should detect zero cross-file clones."""
735 data = _clones_json(["--exclude-same-file"])
736 # With --exclude-same-file, all same-file duplicates are removed.
737 # The code_repo has only one file with unique functions.
738 assert data["exact_clone_clusters"] == 0
739
740 def test_exclude_same_file_removes_same_file_cluster(
741 self, same_file_clone_repo: pathlib.Path
742 ) -> None:
743 data_incl = _clones_json(["--tier", "exact"])
744 data_excl = _clones_json(["--tier", "exact", "--exclude-same-file"])
745 # The same-file cluster (utils.py::_helper_a + utils.py::_helper_b)
746 # should disappear. The cross-file clone (utils.py + other.py) stays.
747 assert data_excl["exact_clone_clusters"] <= data_incl["exact_clone_clusters"]
748
749 def test_file_hotspots_ranks_busiest_file_first(
750 self, mixed_clone_repo: pathlib.Path
751 ) -> None:
752 data = _clones_json()
753 if data["file_hotspots"]:
754 counts = [h["clone_symbols"] for h in data["file_hotspots"]]
755 assert counts == sorted(counts, reverse=True)
756
757 def test_mixed_repo_has_both_tiers(self, mixed_clone_repo: pathlib.Path) -> None:
758 data = _clones_json(["--tier", "both"])
759 # alpha.py and beta.py are exact clones; gamma.py is near-clone of both.
760 assert data["exact_clone_clusters"] >= 1
761
762 def test_total_symbols_nonzero_when_clones_exist(
763 self, exact_clone_repo: pathlib.Path
764 ) -> None:
765 data = _clones_json()
766 assert data["total_symbols_involved"] >= 2
767
768 def test_text_output_exact_section(self, exact_clone_repo: pathlib.Path) -> None:
769 result = runner.invoke(cli, ["code", "clones", "--tier", "exact"])
770 assert result.exit_code == 0
771 assert "Exact clones" in result.output
772
773 def test_identical_file_content_reports_distinct_addresses(
774 self, exact_clone_repo: pathlib.Path
775 ) -> None:
776 """Regression: SymbolCache re-key bug.
777
778 When a.py and b.py have byte-for-byte identical content they share the
779 same SHA-256 cache key. Before the fix, b.py's tree was served with
780 a.py's addresses, collapsing both members into the same address and
781 making the cluster look like a same-file duplicate. After the fix,
782 each file gets correctly addressed symbols.
783 """
784 data = _clones_json(["--tier", "exact"])
785 for cluster in data["clusters"]:
786 if cluster["tier"] == "exact" and cluster["count"] >= 2:
787 files = {m["address"].split("::")[0] for m in cluster["members"]}
788 assert len(files) >= 2, (
789 f"Cache re-key bug: cluster members collapsed to one file: {files}"
790 )
791
792 def test_text_output_no_clones_message(self, code_repo: pathlib.Path) -> None:
793 result = runner.invoke(
794 cli, ["code", "clones", "--tier", "exact", "--exclude-same-file"]
795 )
796 assert result.exit_code == 0
797 assert "No clones detected" in result.output or "0 clone cluster" in result.output
798
799
800 # ---------------------------------------------------------------------------
801 # Stress β€” performance and determinism
802 # ---------------------------------------------------------------------------
803
804
805 class TestClonesStress:
806 def test_large_exact_clone_group(
807 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
808 ) -> None:
809 """1 000 files all containing the same function body β€” one big cluster."""
810 from muse.cli.commands import clones as clones_mod
811
812 rec = _make_record(body_hash="bigclone")
813 sym_map = _make_sym_map(
814 {f"src/file_{i}.py": [(f"src/file_{i}.py::fn", rec)] for i in range(1000)}
815 )
816 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
817 result = find_clones(tmp_path, {}, "exact", None, 2)
818 assert len(result) == 1
819 assert len(result[0].members) == 1000
820
821 def test_many_distinct_clone_pairs_performance(
822 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
823 ) -> None:
824 """500 clone pairs (1 000 unique body hashes, 2 files each)."""
825 from muse.cli.commands import clones as clones_mod
826
827 sym_map: _SymMap = {}
828 for i in range(500):
829 rec = _make_record(body_hash=f"hash_{i:04d}")
830 sym_map[f"a_{i}.py"] = {f"a_{i}.py::fn": rec}
831 sym_map[f"b_{i}.py"] = {f"b_{i}.py::fn": rec}
832
833 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
834 start = time.monotonic()
835 result = find_clones(tmp_path, {}, "exact", None, 2)
836 elapsed = time.monotonic() - start
837 assert len(result) == 500
838 assert elapsed < 5.0, f"find_clones took {elapsed:.1f}s on 1000 symbols β€” too slow"
839
840 def test_near_clone_large_group(
841 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
842 ) -> None:
843 """200 symbols sharing the same signature but each with a unique body."""
844 from muse.cli.commands import clones as clones_mod
845
846 sym_map: _SymMap = {}
847 for i in range(200):
848 rec = _make_record(body_hash=f"body_{i:04d}", sig_id="shared_sig")
849 sym_map[f"f_{i}.py"] = {f"f_{i}.py::fn": rec}
850
851 monkeypatch.setattr(clones_mod, "symbols_for_snapshot", lambda *a, **kw: sym_map)
852 result = find_clones(tmp_path, {}, "near", None, 2)
853 assert len(result) == 1
854 assert len(result[0].members) == 200
855
856 def test_repeated_runs_deterministic(self, exact_clone_repo: pathlib.Path) -> None:
857 result_a = runner.invoke(cli, ["code", "clones", "--json"])
858 result_b = runner.invoke(cli, ["code", "clones", "--json"])
859 assert result_a.exit_code == 0
860 assert result_b.exit_code == 0
861 da = json.loads(result_a.output)
862 db = json.loads(result_b.output)
863 da.pop("duration_ms", None)
864 db.pop("duration_ms", None)
865 da.pop("timestamp", None)
866 db.pop("timestamp", None)
867 assert da == db
868
869 def test_clones_completes_within_time_bound(
870 self, exact_clone_repo: pathlib.Path
871 ) -> None:
872 start = time.monotonic()
873 result = runner.invoke(cli, ["code", "clones", "--json"])
874 elapsed = time.monotonic() - start
875 assert result.exit_code == 0
876 assert elapsed < 10.0, f"clones took {elapsed:.1f}s β€” too slow"
877
878
879 # ---------------------------------------------------------------------------
880 # Flag tests
881 # ---------------------------------------------------------------------------
882
883
884 import argparse as _argparse
885
886
887 # ---------------------------------------------------------------------------
888 # Index acceleration β€” clones uses hash_occurrence index as fast path
889 # ---------------------------------------------------------------------------
890
891
892 class TestClonesIndexAcceleration:
893 """muse code clones uses hash_occurrence as a fast path for exact tier."""
894
895 def _index_path(self, repo: pathlib.Path) -> pathlib.Path:
896 return indices_dir(repo) / "hash_occurrence.json"
897
898 def test_exact_clones_with_index_match_live_scan(
899 self, exact_clone_repo: pathlib.Path
900 ) -> None:
901 """Index-accelerated results equal full-scan results."""
902 # Build the index.
903 runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"])
904 assert self._index_path(exact_clone_repo).exists()
905
906 data_with = _clones_json(["--tier", "exact"])
907
908 # Remove index β†’ force full scan.
909 self._index_path(exact_clone_repo).unlink()
910 data_without = _clones_json(["--tier", "exact"])
911
912 assert data_with["exact_clone_clusters"] == data_without["exact_clone_clusters"]
913 assert data_with["total_symbols_involved"] == data_without["total_symbols_involved"]
914
915 def test_no_index_falls_back_to_full_scan(
916 self, exact_clone_repo: pathlib.Path
917 ) -> None:
918 """Without index, clones still finds exact clones via snapshot scan."""
919 self._index_path(exact_clone_repo).unlink(missing_ok=True)
920 data = _clones_json(["--tier", "exact"])
921 assert data["exact_clone_clusters"] >= 1
922
923 def test_commit_flag_bypasses_index(
924 self, exact_clone_repo: pathlib.Path
925 ) -> None:
926 """--commit always uses a live snapshot scan regardless of index."""
927 runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"])
928 data = _clones_json(["--tier", "exact", "--commit", "HEAD"])
929 assert data["exact_clone_clusters"] >= 1
930
931 def test_index_accelerated_respects_file_filter(
932 self, mixed_clone_repo: pathlib.Path
933 ) -> None:
934 """--file filter applied correctly when using index fast path."""
935 runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"])
936 data_all = _clones_json(["--tier", "exact"])
937 data_unique = _clones_json(["--tier", "exact", "--file", "unique.py"])
938 assert data_unique["exact_clone_clusters"] == 0
939 assert data_all["exact_clone_clusters"] >= 1
940
941 def test_index_accelerated_respects_min_cluster(
942 self, exact_clone_repo: pathlib.Path
943 ) -> None:
944 """--min-cluster applied correctly when using index fast path."""
945 runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"])
946 data_2 = _clones_json(["--tier", "exact", "--min-cluster", "2"])
947 data_3 = _clones_json(["--tier", "exact", "--min-cluster", "3"])
948 # exact_clone_repo has only 2-member clusters β†’ disappear at min 3
949 assert data_2["exact_clone_clusters"] >= 1
950 assert data_3["exact_clone_clusters"] == 0
951
952 def test_near_clones_with_index_still_found(
953 self, near_clone_repo: pathlib.Path
954 ) -> None:
955 """Near clones are always found (index doesn't cover near tier)."""
956 runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"])
957 data = _clones_json(["--tier", "near"])
958 assert data["near_clone_clusters"] >= 1
959
960 def test_both_tiers_with_index(self, mixed_clone_repo: pathlib.Path) -> None:
961 """--tier both works correctly when index is present."""
962 runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"])
963 data = _clones_json(["--tier", "both"])
964 assert data["exit_code"] == 0
965 assert isinstance(data["exact_clone_clusters"], int)
966
967
968 class TestRegisterFlags:
969 def _parse(self, *args: str) -> _argparse.Namespace:
970 from muse.cli.commands.clones import register
971 p = _argparse.ArgumentParser()
972 sub = p.add_subparsers()
973 register(sub)
974 return p.parse_args(["clones", *args])
975
976 def test_default_json_out_is_false(self) -> None:
977 ns = self._parse()
978 assert ns.json_out is False
979
980 def test_json_flag_sets_json_out(self) -> None:
981 ns = self._parse("--json")
982 assert ns.json_out is True
983
984 def test_j_shorthand_sets_json_out(self) -> None:
985 ns = self._parse("-j")
986 assert ns.json_out is True