gabriel / muse public
test_cmd_codemap.py python
899 lines 32.4 KB
Raw
1 """Tests for ``muse code codemap``.
2
3 Coverage layers
4 ---------------
5 Unit
6 _build_import_graph — edge deduplication, self-loop exclusion, empty maps.
7 _find_cycles — empty graph, linear chain, simple cycle, multi-cycle,
8 overlapping cycles, self-loop, deep chain (no stack
9 overflow), disconnected components, O(1) index lookup.
10
11 Integration (live repo via CliRunner)
12 Exits zero for valid invocations.
13 JSON schema: all required top-level keys present.
14 JSON schema: new fields — ``branch``, ``agent_safe_zones``.
15 ``--top`` flag limits each ranked section.
16 ``--top 0`` and ``--top -1`` are rejected with a non-zero exit.
17 ``--min-importers`` filters ranked module list correctly.
18 ``--language`` filter restricts analysis to the named language.
19 ``--language`` with no match exits zero but emits empty modules list.
20 ``--commit REF`` analyses a historical snapshot.
21 Text output contains all expected section headers.
22 No-repo invocation exits non-zero.
23 Empty repo (no commits yet) exits non-zero or returns gracefully.
24
25 E2E (real cross-file import cycles in a live repo)
26 Cycle is detected when two Python files import each other.
27 No false-positive cycle when imports are acyclic.
28 Agent-safe zone reported for a fully isolated file.
29
30 Stress
31 1 000-node linear chain: completes without RecursionError.
32 500-node graph with 50 embedded 3-cycles: all cycles found, no crash.
33 Repeated runs produce identical output (determinism).
34 Large sym_map with duplicate import records: edges deduplicated.
35 """
36
37 from __future__ import annotations
38
39 import json
40 import pathlib
41 import textwrap
42 import time
43 from typing import TypedDict
44
45 import pytest
46
47 from tests.cli_test_helper import CliRunner
48
49 from muse.cli.commands.codemap import _build_import_graph, _find_cycles
50 from muse.plugins.code.ast_parser import SymbolTree, SymbolRecord
51
52 type _SymbolMap = dict[str, SymbolTree]
53 type _AdjacencyMap = dict[str, list[str]]
54
55 cli = None # argparse migration — CliRunner ignores this arg
56
57 runner = CliRunner()
58
59
60 # ---------------------------------------------------------------------------
61 # Typed payload for JSON assertions — mirrors the codemap JSON schema.
62 # ---------------------------------------------------------------------------
63
64
65 class _ModuleEntry(TypedDict):
66 file: str
67 symbol_count: int
68 importers: int
69 imports: int
70
71
72 class _CentralityEntry(TypedDict):
73 name: str
74 callers: int
75
76
77 class _BoundaryEntry(TypedDict):
78 file: str
79 fan_out: int
80 fan_in: int
81
82
83 class _CodemapPayload(TypedDict):
84 schema_version: str
85 commit: str
86 branch: str
87 language_filter: str | None
88 modules: list[_ModuleEntry]
89 import_cycles: list[list[str]]
90 high_centrality: list[_CentralityEntry]
91 boundary_files: list[_BoundaryEntry]
92 agent_safe_zones: list[str]
93
94
95 # ---------------------------------------------------------------------------
96 # Fixtures
97 # ---------------------------------------------------------------------------
98
99
100 @pytest.fixture
101 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
102 """Fresh code-domain Muse repo, no commits."""
103 monkeypatch.chdir(tmp_path)
104 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
105 result = runner.invoke(cli, ["init", "--domain", "code"])
106 assert result.exit_code == 0, result.output
107 return tmp_path
108
109
110 @pytest.fixture
111 def code_repo(repo: pathlib.Path) -> pathlib.Path:
112 """Repo with two Python commits — same fixture shape as other code tests."""
113 work = repo
114 (work / "billing.py").write_text(textwrap.dedent("""\
115 class Invoice:
116 def compute_total(self, items):
117 return sum(items)
118
119 def apply_discount(self, total, pct):
120 return total * (1 - pct)
121
122 def process_order(invoice, items):
123 return invoice.compute_total(items)
124 """))
125 r = runner.invoke(cli, ["commit", "-m", "Initial billing module"])
126 assert r.exit_code == 0, r.output
127
128 (work / "billing.py").write_text(textwrap.dedent("""\
129 class Invoice:
130 def compute_invoice_total(self, items):
131 return sum(items)
132
133 def apply_discount(self, total, pct):
134 return total * (1 - pct)
135
136 def generate_pdf(self):
137 return b"pdf"
138
139 def process_order(invoice, items):
140 return invoice.compute_invoice_total(items)
141
142 def send_email(address):
143 pass
144 """))
145 runner.invoke(cli, ["code", "add", "."])
146 r = runner.invoke(cli, ["commit", "-m", "Rename + add generate_pdf, send_email"])
147 assert r.exit_code == 0, r.output
148 return repo
149
150
151 @pytest.fixture
152 def multi_file_repo(repo: pathlib.Path) -> pathlib.Path:
153 """Repo with multiple files to exercise import-graph and cycle detection."""
154 work = repo
155
156 (work / "utils.py").write_text(textwrap.dedent("""\
157 def helper():
158 pass
159 """))
160 (work / "models.py").write_text(textwrap.dedent("""\
161 import utils
162
163 class User:
164 pass
165 """))
166 (work / "api.py").write_text(textwrap.dedent("""\
167 import models
168 import utils
169
170 def handle():
171 pass
172 """))
173 (work / "standalone.py").write_text(textwrap.dedent("""\
174 def isolated():
175 pass
176 """))
177 r = runner.invoke(cli, ["commit", "-m", "Multi-file layout"])
178 assert r.exit_code == 0, r.output
179 return repo
180
181
182 @pytest.fixture
183 def cycle_repo(repo: pathlib.Path) -> pathlib.Path:
184 """Repo with a deliberate circular import: alpha ↔ beta."""
185 work = repo
186
187 (work / "alpha.py").write_text(textwrap.dedent("""\
188 import beta
189
190 def do_alpha():
191 pass
192 """))
193 (work / "beta.py").write_text(textwrap.dedent("""\
194 import alpha
195
196 def do_beta():
197 pass
198 """))
199 r = runner.invoke(cli, ["commit", "-m", "Introduce alpha-beta cycle"])
200 assert r.exit_code == 0, r.output
201 return repo
202
203
204 # ---------------------------------------------------------------------------
205 # Helpers
206 # ---------------------------------------------------------------------------
207
208
209 def _make_sym_tree(*import_names: str) -> SymbolTree:
210 """Build a minimal SymbolTree matching the real parse_symbols format.
211
212 ``name`` holds the bare module name (e.g. ``"utils"``).
213 ``qualified_name`` uses the ``import::NAME`` format that ``parse_symbols``
214 actually produces.
215 """
216 tree: SymbolTree = {}
217 for name in import_names:
218 rec: SymbolRecord = {
219 "kind": "import",
220 "name": name,
221 "qualified_name": f"import::{name}",
222 "lineno": 1,
223 "end_lineno": 1,
224 "content_id": "",
225 "body_hash": "",
226 "signature_id": "",
227 "metadata_id": "",
228 "canonical_key": "",
229 }
230 tree[f"import::{name}"] = rec
231 return tree
232
233
234 def _codemap_json(args: list[str] | None = None) -> _CodemapPayload:
235 """Invoke codemap --json and return the typed payload."""
236 cmd = ["code", "codemap", "--json"] + (args or [])
237 result = runner.invoke(cli, cmd)
238 assert result.exit_code == 0, result.output
239 raw: _CodemapPayload = json.loads(result.output)
240 return raw
241
242
243 # ---------------------------------------------------------------------------
244 # Unit — _build_import_graph
245 # ---------------------------------------------------------------------------
246
247
248 class TestBuildImportGraph:
249 def test_empty_sym_map_returns_empty_dicts(self) -> None:
250 sym_map: _SymbolMap = {}
251 imports_out, in_degree = _build_import_graph(sym_map)
252 assert imports_out == {}
253 assert in_degree == {}
254
255 def test_single_file_no_imports(self) -> None:
256 sym_map: _SymbolMap = {"src/a.py": {}}
257 imports_out, in_degree = _build_import_graph(sym_map)
258 assert imports_out == {"src/a.py": []}
259 assert in_degree == {"src/a.py": 0}
260
261 def test_simple_import_edge(self) -> None:
262 sym_map: _SymbolMap = {
263 "src/a.py": _make_sym_tree("b"),
264 "src/b.py": {},
265 }
266 imports_out, in_degree = _build_import_graph(sym_map)
267 assert "src/b.py" in imports_out["src/a.py"]
268 assert in_degree["src/b.py"] == 1
269 assert in_degree["src/a.py"] == 0
270
271 def test_self_loop_excluded(self) -> None:
272 """A file importing its own stem must not create a self-edge."""
273 sym_map: _SymbolMap = {
274 "src/a.py": _make_sym_tree("a"),
275 }
276 imports_out, in_degree = _build_import_graph(sym_map)
277 assert imports_out["src/a.py"] == []
278
279 def test_duplicate_import_records_produce_single_edge(self) -> None:
280 """Multiple import records for the same target count as one edge."""
281 tree: SymbolTree = {}
282 for i in range(5):
283 rec: SymbolRecord = {
284 "kind": "import",
285 "name": "b", # same bare module name — all edges to src/b.py
286 "qualified_name": f"import::b_alias_{i}",
287 "lineno": i + 1,
288 "end_lineno": i + 1,
289 "content_id": "",
290 "body_hash": "",
291 "signature_id": "",
292 "metadata_id": "",
293 "canonical_key": "",
294 }
295 tree[f"import::b_alias_{i}"] = rec
296
297 sym_map: _SymbolMap = {
298 "src/a.py": tree,
299 "src/b.py": {},
300 }
301 imports_out, in_degree = _build_import_graph(sym_map)
302 assert imports_out["src/a.py"].count("src/b.py") == 1
303 assert in_degree["src/b.py"] == 1
304
305 def test_unknown_import_ignored(self) -> None:
306 """Imports with no matching stem in the map are silently skipped."""
307 sym_map: _SymbolMap = {
308 "src/a.py": _make_sym_tree("nonexistent_module"),
309 }
310 imports_out, in_degree = _build_import_graph(sym_map)
311 assert imports_out["src/a.py"] == []
312
313 def test_non_import_records_ignored(self) -> None:
314 tree: SymbolTree = {}
315 fn_rec: SymbolRecord = {
316 "kind": "function",
317 "name": "b",
318 "qualified_name": "b",
319 "lineno": 1,
320 "end_lineno": 3,
321 "content_id": "",
322 "body_hash": "",
323 "signature_id": "",
324 "metadata_id": "",
325 "canonical_key": "",
326 }
327 tree["function::b"] = fn_rec
328
329 sym_map: _SymbolMap = {
330 "src/a.py": tree,
331 "src/b.py": {},
332 }
333 imports_out, _ = _build_import_graph(sym_map)
334 assert imports_out["src/a.py"] == []
335
336 def test_fan_out_multiple_targets(self) -> None:
337 sym_map: _SymbolMap = {
338 "src/a.py": _make_sym_tree("b", "c", "d"),
339 "src/b.py": {},
340 "src/c.py": {},
341 "src/d.py": {},
342 }
343 imports_out, in_degree = _build_import_graph(sym_map)
344 assert len(imports_out["src/a.py"]) == 3
345 assert in_degree["src/b.py"] == 1
346 assert in_degree["src/c.py"] == 1
347 assert in_degree["src/d.py"] == 1
348
349 def test_fan_in_multiple_importers(self) -> None:
350 sym_map: _SymbolMap = {
351 "src/a.py": _make_sym_tree("c"),
352 "src/b.py": _make_sym_tree("c"),
353 "src/c.py": {},
354 }
355 imports_out, in_degree = _build_import_graph(sym_map)
356 assert in_degree["src/c.py"] == 2
357
358
359 # ---------------------------------------------------------------------------
360 # Unit — _find_cycles
361 # ---------------------------------------------------------------------------
362
363
364 class TestFindCycles:
365 def test_empty_graph(self) -> None:
366 assert _find_cycles({}) == []
367
368 def test_single_node_no_edges(self) -> None:
369 assert _find_cycles({"A": []}) == []
370
371 def test_linear_chain_no_cycle(self) -> None:
372 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": []}
373 assert _find_cycles(g) == []
374
375 def test_self_loop(self) -> None:
376 g: _AdjacencyMap = {"A": ["A"]}
377 cycles = _find_cycles(g)
378 assert len(cycles) == 1
379 assert cycles[0][0] == cycles[0][-1] == "A"
380
381 def test_simple_two_node_cycle(self) -> None:
382 g: _AdjacencyMap = {"A": ["B"], "B": ["A"]}
383 cycles = _find_cycles(g)
384 assert any("A" in c and "B" in c for c in cycles)
385
386 def test_three_node_cycle(self) -> None:
387 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["A"]}
388 cycles = _find_cycles(g)
389 assert len(cycles) >= 1
390 cycle = cycles[0]
391 assert cycle[0] == cycle[-1]
392 assert len(cycle) == 4 # A→B→C→A
393
394 def test_two_independent_cycles(self) -> None:
395 g: _AdjacencyMap = {
396 "A": ["B"], "B": ["A"], # cycle 1
397 "C": ["D"], "D": ["C"], # cycle 2
398 }
399 cycles = _find_cycles(g)
400 assert len(cycles) == 2
401
402 def test_overlapping_cycles_shared_node(self) -> None:
403 """Node B participates in both A→B→A and B→C→B."""
404 g: _AdjacencyMap = {
405 "A": ["B"],
406 "B": ["A", "C"],
407 "C": ["B"],
408 }
409 cycles = _find_cycles(g)
410 assert len(cycles) >= 2
411
412 def test_disconnected_graph_with_cycle_in_one_component(self) -> None:
413 g: _AdjacencyMap = {
414 "X": ["Y"], "Y": [], # acyclic component
415 "A": ["B"], "B": ["A"], # cyclic component
416 }
417 cycles = _find_cycles(g)
418 assert len(cycles) == 1
419
420 def test_cycle_path_forms_closed_ring(self) -> None:
421 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["A"]}
422 cycles = _find_cycles(g)
423 for cycle in cycles:
424 assert cycle[0] == cycle[-1], "cycle path must start and end at the same node"
425
426 def test_deep_linear_chain_no_recursion_error(self) -> None:
427 depth = 1_000 # well beyond Python's default recursion limit
428 nodes = [f"mod_{i}" for i in range(depth)]
429 g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)}
430 g[nodes[-1]] = []
431 cycles = _find_cycles(g)
432 assert cycles == []
433
434 def test_deep_cycle_at_end_of_long_chain(self) -> None:
435 depth = 500
436 nodes = [f"mod_{i}" for i in range(depth)]
437 g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)}
438 g[nodes[-1]] = [nodes[-2]] # last two form a cycle
439 cycles = _find_cycles(g)
440 assert any(nodes[-1] in c or nodes[-2] in c for c in cycles)
441
442 def test_no_duplicate_cycles_for_same_back_edge(self) -> None:
443 g: _AdjacencyMap = {"A": ["B"], "B": ["A"]}
444 cycles = _find_cycles(g)
445 # There should be exactly one detected cycle for a simple two-node ring.
446 assert len(cycles) == 1
447
448 def test_fully_connected_triangle(self) -> None:
449 g: _AdjacencyMap = {"A": ["B", "C"], "B": ["A", "C"], "C": ["A", "B"]}
450 cycles = _find_cycles(g)
451 assert len(cycles) >= 1
452
453 def test_index_extraction_is_correct(self) -> None:
454 """Cycle slice starts at the actual back-edge target, not index 0."""
455 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["B"]}
456 cycles = _find_cycles(g)
457 # The cycle involves B and C, not A.
458 for cycle in cycles:
459 assert "A" not in cycle, "A is not part of any cycle in this graph"
460
461
462 # ---------------------------------------------------------------------------
463 # Integration — basic CLI invocations
464 # ---------------------------------------------------------------------------
465
466
467 class TestCodemapCLIBasic:
468 def test_exits_zero(self, code_repo: pathlib.Path) -> None:
469 result = runner.invoke(cli, ["code", "codemap"])
470 assert result.exit_code == 0, result.output
471
472 def test_text_output_has_all_sections(self, code_repo: pathlib.Path) -> None:
473 result = runner.invoke(cli, ["code", "codemap"])
474 assert result.exit_code == 0
475 out = result.output
476 assert "Semantic codemap" in out
477 assert "Top modules by size" in out
478 assert "Import cycles" in out
479 assert "High-centrality" in out
480 assert "Boundary files" in out
481 assert "Agent-safe zones" in out
482
483 def test_text_output_contains_commit_hash(self, code_repo: pathlib.Path) -> None:
484 result = runner.invoke(cli, ["code", "codemap"])
485 assert result.exit_code == 0
486 import re
487 assert re.search(r"commit sha256:[0-9a-f]{64}", result.output)
488
489 def test_no_repo_exits_nonzero(
490 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
491 ) -> None:
492 monkeypatch.chdir(tmp_path)
493 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
494 result = runner.invoke(cli, ["code", "codemap"])
495 assert result.exit_code != 0
496
497
498 # ---------------------------------------------------------------------------
499 # Integration — JSON schema
500 # ---------------------------------------------------------------------------
501
502
503 class TestCodemapJSONSchema:
504 def test_json_exits_zero(self, code_repo: pathlib.Path) -> None:
505 result = runner.invoke(cli, ["code", "codemap", "--json"])
506 assert result.exit_code == 0, result.output
507
508 def test_json_is_valid(self, code_repo: pathlib.Path) -> None:
509 result = runner.invoke(cli, ["code", "codemap", "--json"])
510 assert result.exit_code == 0
511 data = json.loads(result.output)
512 assert isinstance(data, dict)
513
514 def test_json_required_top_level_keys(self, code_repo: pathlib.Path) -> None:
515 data = _codemap_json()
516 required = {
517 "schema",
518 "commit",
519 "branch",
520 "language_filter",
521 "modules",
522 "import_cycles",
523 "high_centrality",
524 "boundary_files",
525 "agent_safe_zones",
526 }
527 assert required <= data.keys()
528
529 def test_json_modules_is_list(self, code_repo: pathlib.Path) -> None:
530 data = _codemap_json()
531 assert isinstance(data["modules"], list)
532
533 def test_json_import_cycles_is_list(self, code_repo: pathlib.Path) -> None:
534 data = _codemap_json()
535 assert isinstance(data["import_cycles"], list)
536
537 def test_json_high_centrality_is_list(self, code_repo: pathlib.Path) -> None:
538 data = _codemap_json()
539 assert isinstance(data["high_centrality"], list)
540
541 def test_json_boundary_files_is_list(self, code_repo: pathlib.Path) -> None:
542 data = _codemap_json()
543 assert isinstance(data["boundary_files"], list)
544
545 def test_json_agent_safe_zones_is_list(self, code_repo: pathlib.Path) -> None:
546 data = _codemap_json()
547 assert isinstance(data["agent_safe_zones"], list)
548
549 def test_json_branch_field_is_string(self, code_repo: pathlib.Path) -> None:
550 data = _codemap_json()
551 assert isinstance(data["branch"], str)
552 assert data["branch"] # non-empty
553
554 def test_json_commit_is_full_id(self, code_repo: pathlib.Path) -> None:
555 data = _codemap_json()
556 commit = data["commit"]
557 assert isinstance(commit, str)
558 assert commit.startswith("sha256:")
559 hex_part = commit[len("sha256:"):]
560 assert len(hex_part) == 64
561 assert all(c in "0123456789abcdef" for c in hex_part)
562
563 def test_json_language_filter_none_when_unset(self, code_repo: pathlib.Path) -> None:
564 data = _codemap_json()
565 assert data["language_filter"] is None
566
567 def test_json_module_entry_has_required_fields(self, code_repo: pathlib.Path) -> None:
568 data = _codemap_json()
569 modules: list[_ModuleEntry] = data["modules"]
570 if modules:
571 entry = modules[0]
572 assert "file" in entry
573 assert "symbol_count" in entry
574 assert "importers" in entry
575 assert "imports" in entry
576
577 def test_json_schema_version_matches_package(self, code_repo: pathlib.Path) -> None:
578 data = _codemap_json()
579 assert isinstance(data["schema"], int)
580 assert data["schema"] > 0
581
582
583 # ---------------------------------------------------------------------------
584 # Integration — --top flag
585 # ---------------------------------------------------------------------------
586
587
588 class TestCodemapTopFlag:
589 def test_top_1_limits_modules_to_1(self, code_repo: pathlib.Path) -> None:
590 data = _codemap_json(["--top", "1"])
591 assert len(data["modules"]) <= 1
592
593 def test_top_3_limits_sections(self, code_repo: pathlib.Path) -> None:
594 data = _codemap_json(["--top", "3"])
595 assert len(data["modules"]) <= 3
596 assert len(data["high_centrality"]) <= 3
597 assert len(data["boundary_files"]) <= 3
598 assert len(data["agent_safe_zones"]) <= 3
599
600 def test_top_zero_exits_nonzero(self, code_repo: pathlib.Path) -> None:
601 result = runner.invoke(cli, ["code", "codemap", "--top", "0"])
602 assert result.exit_code != 0
603
604 def test_top_negative_exits_nonzero(self, code_repo: pathlib.Path) -> None:
605 result = runner.invoke(cli, ["code", "codemap", "--top", "-1"])
606 assert result.exit_code != 0
607
608 def test_top_text_mode_respected(self, code_repo: pathlib.Path) -> None:
609 result = runner.invoke(cli, ["code", "codemap", "--top", "1"])
610 assert result.exit_code == 0
611
612
613 # ---------------------------------------------------------------------------
614 # Integration — --min-importers flag
615 # ---------------------------------------------------------------------------
616
617
618 class TestCodemapMinImporters:
619 def test_min_importers_zero_includes_all(self, multi_file_repo: pathlib.Path) -> None:
620 data_all = _codemap_json(["--min-importers", "0"])
621 data_zero = _codemap_json() # default = 0
622 assert len(data_all["modules"]) == len(data_zero["modules"])
623
624 def test_min_importers_1_excludes_unimported(self, multi_file_repo: pathlib.Path) -> None:
625 data_all = _codemap_json()
626 data_filtered = _codemap_json(["--min-importers", "1"])
627 modules_all: list[_ModuleEntry] = data_all["modules"]
628 modules_filtered: list[_ModuleEntry] = data_filtered["modules"]
629 # Every module in the filtered list must have importers >= 1.
630 for mod in modules_filtered:
631 assert mod["importers"] >= 1
632 # Filtered list cannot be larger than unfiltered.
633 assert len(modules_filtered) <= len(modules_all)
634
635 def test_min_importers_very_large_returns_empty_modules(
636 self, code_repo: pathlib.Path
637 ) -> None:
638 data = _codemap_json(["--min-importers", "9999"])
639 assert data["modules"] == []
640
641 def test_min_importers_negative_exits_nonzero(self, code_repo: pathlib.Path) -> None:
642 result = runner.invoke(cli, ["code", "codemap", "--min-importers", "-1"])
643 assert result.exit_code != 0
644
645 def test_min_importers_label_in_text_output(self, code_repo: pathlib.Path) -> None:
646 result = runner.invoke(cli, ["code", "codemap", "--min-importers", "2"])
647 assert result.exit_code == 0
648 assert "min-importers" in result.output
649
650
651 # ---------------------------------------------------------------------------
652 # Integration — --language flag
653 # ---------------------------------------------------------------------------
654
655
656 class TestCodemapLanguageFlag:
657 def test_language_python_exits_zero(self, code_repo: pathlib.Path) -> None:
658 result = runner.invoke(cli, ["code", "codemap", "--language", "Python"])
659 assert result.exit_code == 0
660
661 def test_language_filter_in_json(self, code_repo: pathlib.Path) -> None:
662 data = _codemap_json(["--language", "Python"])
663 assert data["language_filter"] == "Python"
664
665 def test_language_no_match_exits_zero_empty_modules(
666 self, code_repo: pathlib.Path
667 ) -> None:
668 data = _codemap_json(["--language", "COBOL"])
669 assert data["modules"] == []
670
671 def test_language_text_header_shown(self, code_repo: pathlib.Path) -> None:
672 result = runner.invoke(cli, ["code", "codemap", "--language", "Python"])
673 assert result.exit_code == 0
674 assert "language: Python" in result.output
675
676
677 # ---------------------------------------------------------------------------
678 # Integration — --commit flag
679 # ---------------------------------------------------------------------------
680
681
682 class TestCodemapCommitFlag:
683 def test_commit_head_is_default(self, code_repo: pathlib.Path) -> None:
684 data_head = _codemap_json(["--commit", "HEAD"])
685 data_default = _codemap_json()
686 assert data_head["commit"] == data_default["commit"]
687
688 def test_commit_head_minus_1(self, code_repo: pathlib.Path) -> None:
689 result = runner.invoke(cli, ["code", "codemap", "--commit", "HEAD~1", "--json"])
690 assert result.exit_code == 0
691 data = json.loads(result.output)
692 data_head = _codemap_json()
693 # Historical snapshot commit id must differ from HEAD.
694 assert data["commit"] != data_head["commit"]
695
696 def test_commit_invalid_ref_exits_nonzero(self, code_repo: pathlib.Path) -> None:
697 result = runner.invoke(cli, ["code", "codemap", "--commit", "totally_bogus_ref_xyz"])
698 assert result.exit_code != 0
699
700
701 # ---------------------------------------------------------------------------
702 # Integration — empty repo
703 # ---------------------------------------------------------------------------
704
705
706 class TestCodemapEmptyRepo:
707 def test_no_commits_exits_nonzero(self, repo: pathlib.Path) -> None:
708 """Repo with no commits: HEAD does not resolve — must exit non-zero."""
709 result = runner.invoke(cli, ["code", "codemap"])
710 assert result.exit_code != 0
711
712
713 # ---------------------------------------------------------------------------
714 # E2E — cycle detection
715 # ---------------------------------------------------------------------------
716
717
718 class TestCodemapCycleE2E:
719 def test_circular_import_detected(self, cycle_repo: pathlib.Path) -> None:
720 data = _codemap_json()
721 cycles: list[list[str]] = data["import_cycles"]
722 assert len(cycles) >= 1
723 involved = {node for cycle in cycles for node in cycle}
724 # alpha.py and beta.py should both appear in the cycle paths.
725 assert any("alpha" in node for node in involved)
726 assert any("beta" in node for node in involved)
727
728 def test_acyclic_repo_has_no_cycles(self, multi_file_repo: pathlib.Path) -> None:
729 data = _codemap_json()
730 assert data["import_cycles"] == []
731
732 def test_cycle_paths_are_closed_rings(self, cycle_repo: pathlib.Path) -> None:
733 data = _codemap_json()
734 for cycle in data["import_cycles"]:
735 assert isinstance(cycle, list)
736 assert len(cycle) >= 2
737 assert cycle[0] == cycle[-1], "cycle path must start and end at the same node"
738
739
740 # ---------------------------------------------------------------------------
741 # E2E — agent-safe zones
742 # ---------------------------------------------------------------------------
743
744
745 class TestCodemapAgentSafeZones:
746 def test_isolated_file_appears_in_agent_safe_zones(
747 self, multi_file_repo: pathlib.Path
748 ) -> None:
749 """standalone.py imports nothing and is imported by nothing."""
750 data = _codemap_json()
751 safe: list[str] = data["agent_safe_zones"]
752 assert any("standalone" in fp for fp in safe)
753
754 def test_imported_file_not_in_agent_safe_zones(
755 self, multi_file_repo: pathlib.Path
756 ) -> None:
757 """utils.py is imported by models.py and api.py — not isolated."""
758 data = _codemap_json()
759 safe: list[str] = data["agent_safe_zones"]
760 assert not any("utils" in fp for fp in safe)
761
762 def test_agent_safe_zones_are_sorted(self, multi_file_repo: pathlib.Path) -> None:
763 data = _codemap_json()
764 safe: list[str] = data["agent_safe_zones"]
765 assert safe == sorted(safe)
766
767
768 # ---------------------------------------------------------------------------
769 # E2E — boundary files
770 # ---------------------------------------------------------------------------
771
772
773 class TestCodemapBoundaryFiles:
774 def test_boundary_entry_has_required_fields(self, multi_file_repo: pathlib.Path) -> None:
775 data = _codemap_json()
776 for boundary in data["boundary_files"]:
777 assert "file" in boundary
778 assert "fan_out" in boundary
779 assert "fan_in" in boundary
780
781 def test_boundary_file_fan_in_is_zero(self, multi_file_repo: pathlib.Path) -> None:
782 data = _codemap_json()
783 for boundary in data["boundary_files"]:
784 assert boundary["fan_in"] == 0
785
786 def test_boundary_file_fan_out_at_least_3(self, multi_file_repo: pathlib.Path) -> None:
787 data = _codemap_json()
788 for boundary in data["boundary_files"]:
789 assert boundary["fan_out"] >= 3
790
791
792 # ---------------------------------------------------------------------------
793 # Stress — performance and determinism
794 # ---------------------------------------------------------------------------
795
796
797 class TestCodemapStress:
798 def test_find_cycles_1000_node_linear_chain_no_recursion_error(self) -> None:
799 depth = 1_000
800 nodes = [f"file_{i}.py" for i in range(depth)]
801 g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)}
802 g[nodes[-1]] = []
803 cycles = _find_cycles(g)
804 assert cycles == []
805
806 def test_find_cycles_500_nodes_50_embedded_3_cycles(self) -> None:
807 """500-node graph with 50 explicit A→B→C→A triangles plus 350 isolated nodes."""
808 g: _AdjacencyMap = {}
809 expected_min = 50
810 for i in range(50):
811 a, b, c = f"a_{i}", f"b_{i}", f"c_{i}"
812 g[a] = [b]
813 g[b] = [c]
814 g[c] = [a]
815 for i in range(350):
816 g[f"iso_{i}"] = []
817 cycles = _find_cycles(g)
818 assert len(cycles) >= expected_min
819
820 def test_build_import_graph_large_sym_map_with_duplicates(self) -> None:
821 """1 000 files each importing the same hub file via 10 duplicate records."""
822 hub = "src/hub.py"
823 sym_map: _SymbolMap = {hub: {}}
824 for i in range(999):
825 fp = f"src/module_{i}.py"
826 tree: SymbolTree = {}
827 for j in range(10):
828 rec: SymbolRecord = {
829 "kind": "import",
830 "name": "hub", # same bare module — all edges to hub
831 "qualified_name": f"import::hub_alias_{j}",
832 "lineno": j + 1,
833 "end_lineno": j + 1,
834 "content_id": "",
835 "body_hash": "",
836 "signature_id": "",
837 "metadata_id": "",
838 "canonical_key": "",
839 }
840 tree[f"import::hub_alias_{j}"] = rec
841 sym_map[fp] = tree
842
843 imports_out, in_degree = _build_import_graph(sym_map)
844
845 assert in_degree[hub] == 999
846 for fp in sym_map:
847 if fp == hub:
848 continue
849 assert imports_out[fp].count(hub) == 1, "each file must have exactly one edge to hub"
850
851 def test_repeated_runs_produce_identical_json(self, code_repo: pathlib.Path) -> None:
852 """Two back-to-back invocations must produce the exact same JSON."""
853 result_a = runner.invoke(cli, ["code", "codemap", "--json"])
854 result_b = runner.invoke(cli, ["code", "codemap", "--json"])
855 assert result_a.exit_code == 0
856 assert result_b.exit_code == 0
857 _volatile = {"duration_ms", "timestamp"}
858 da = {k: v for k, v in json.loads(result_a.output).items() if k not in _volatile}
859 db = {k: v for k, v in json.loads(result_b.output).items() if k not in _volatile}
860 assert da == db
861
862 def test_codemap_completes_within_reasonable_time(
863 self, code_repo: pathlib.Path
864 ) -> None:
865 """Codemap on a small repo must finish within 10 seconds."""
866 start = time.monotonic()
867 result = runner.invoke(cli, ["code", "codemap", "--json"])
868 elapsed = time.monotonic() - start
869 assert result.exit_code == 0
870 assert elapsed < 10.0, f"codemap took {elapsed:.1f}s — too slow"
871
872
873 # ---------------------------------------------------------------------------
874 # Flag registration tests
875 # ---------------------------------------------------------------------------
876
877 import argparse as _argparse
878 from muse.cli.commands.codemap import register as _register_codemap
879
880
881 def _parse_codemap(*args: str) -> _argparse.Namespace:
882 root_p = _argparse.ArgumentParser()
883 subs = root_p.add_subparsers(dest="cmd")
884 _register_codemap(subs)
885 return root_p.parse_args(["codemap", *args])
886
887
888 class TestRegisterFlags:
889 def test_default_json_out_is_false(self) -> None:
890 ns = _parse_codemap()
891 assert ns.json_out is False
892
893 def test_json_flag_sets_json_out(self) -> None:
894 ns = _parse_codemap("--json")
895 assert ns.json_out is True
896
897 def test_j_shorthand_sets_json_out(self) -> None:
898 ns = _parse_codemap("-j")
899 assert ns.json_out is True
File History 1 commit