gabriel / muse public
test_cmd_codemap.py python
898 lines 32.3 KB
Raw
1 """Tests for ``muse code codemap``.
2
3 Coverage layers
4 ---------------
5 Unit
6 _build_import_graph — edge deduplication, self-loop exclusion, empty maps.
7 _find_cycles — empty graph, linear chain, simple cycle, multi-cycle,
8 overlapping cycles, self-loop, deep chain (no stack
9 overflow), disconnected components, O(1) index lookup.
10
11 Integration (live repo via CliRunner)
12 Exits zero for valid invocations.
13 JSON schema: all required top-level keys present.
14 JSON schema: new fields — ``branch``, ``agent_safe_zones``.
15 ``--top`` flag limits each ranked section.
16 ``--top 0`` and ``--top -1`` are rejected with a non-zero exit.
17 ``--min-importers`` filters ranked module list correctly.
18 ``--language`` filter restricts analysis to the named language.
19 ``--language`` with no match exits zero but emits empty modules list.
20 ``--commit REF`` analyses a historical snapshot.
21 Text output contains all expected section headers.
22 No-repo invocation exits non-zero.
23 Empty repo (no commits yet) exits non-zero or returns gracefully.
24
25 E2E (real cross-file import cycles in a live repo)
26 Cycle is detected when two Python files import each other.
27 No false-positive cycle when imports are acyclic.
28 Agent-safe zone reported for a fully isolated file.
29
30 Stress
31 1 000-node linear chain: completes without RecursionError.
32 500-node graph with 50 embedded 3-cycles: all cycles found, no crash.
33 Repeated runs produce identical output (determinism).
34 Large sym_map with duplicate import records: edges deduplicated.
35 """
36
37 from __future__ import annotations
38
39 import json
40 import pathlib
41 import textwrap
42 import time
43 from typing import TypedDict
44
45 import pytest
46
47 from tests.cli_test_helper import CliRunner
48
49 from muse.cli.commands.codemap import _build_import_graph, _find_cycles
50 from muse.plugins.code.ast_parser import SymbolTree, SymbolRecord
51
52 type _SymbolMap = dict[str, SymbolTree]
53 type _AdjacencyMap = dict[str, list[str]]
54
55 cli = None # argparse migration — CliRunner ignores this arg
56
57 runner = CliRunner()
58
59
60 # ---------------------------------------------------------------------------
61 # Typed payload for JSON assertions — mirrors the codemap JSON schema.
62 # ---------------------------------------------------------------------------
63
64
65 class _ModuleEntry(TypedDict):
66 file: str
67 symbol_count: int
68 importers: int
69 imports: int
70
71
72 class _CentralityEntry(TypedDict):
73 name: str
74 callers: int
75
76
77 class _BoundaryEntry(TypedDict):
78 file: str
79 fan_out: int
80 fan_in: int
81
82
83 class _CodemapPayload(TypedDict):
84 schema_version: str
85 commit: str
86 branch: str
87 language_filter: str | None
88 modules: list[_ModuleEntry]
89 import_cycles: list[list[str]]
90 high_centrality: list[_CentralityEntry]
91 boundary_files: list[_BoundaryEntry]
92 agent_safe_zones: list[str]
93
94
95 # ---------------------------------------------------------------------------
96 # Fixtures
97 # ---------------------------------------------------------------------------
98
99
100 @pytest.fixture
101 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
102 """Fresh code-domain Muse repo, no commits."""
103 monkeypatch.chdir(tmp_path)
104 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
105 result = runner.invoke(cli, ["init", "--domain", "code"])
106 assert result.exit_code == 0, result.output
107 return tmp_path
108
109
110 @pytest.fixture
111 def code_repo(repo: pathlib.Path) -> pathlib.Path:
112 """Repo with two Python commits — same fixture shape as other code tests."""
113 work = repo
114 (work / "billing.py").write_text(textwrap.dedent("""\
115 class Invoice:
116 def compute_total(self, items):
117 return sum(items)
118
119 def apply_discount(self, total, pct):
120 return total * (1 - pct)
121
122 def process_order(invoice, items):
123 return invoice.compute_total(items)
124 """))
125 r = runner.invoke(cli, ["commit", "-m", "Initial billing module"])
126 assert r.exit_code == 0, r.output
127
128 (work / "billing.py").write_text(textwrap.dedent("""\
129 class Invoice:
130 def compute_invoice_total(self, items):
131 return sum(items)
132
133 def apply_discount(self, total, pct):
134 return total * (1 - pct)
135
136 def generate_pdf(self):
137 return b"pdf"
138
139 def process_order(invoice, items):
140 return invoice.compute_invoice_total(items)
141
142 def send_email(address):
143 pass
144 """))
145 r = runner.invoke(cli, ["commit", "-m", "Rename + add generate_pdf, send_email"])
146 assert r.exit_code == 0, r.output
147 return repo
148
149
150 @pytest.fixture
151 def multi_file_repo(repo: pathlib.Path) -> pathlib.Path:
152 """Repo with multiple files to exercise import-graph and cycle detection."""
153 work = repo
154
155 (work / "utils.py").write_text(textwrap.dedent("""\
156 def helper():
157 pass
158 """))
159 (work / "models.py").write_text(textwrap.dedent("""\
160 import utils
161
162 class User:
163 pass
164 """))
165 (work / "api.py").write_text(textwrap.dedent("""\
166 import models
167 import utils
168
169 def handle():
170 pass
171 """))
172 (work / "standalone.py").write_text(textwrap.dedent("""\
173 def isolated():
174 pass
175 """))
176 r = runner.invoke(cli, ["commit", "-m", "Multi-file layout"])
177 assert r.exit_code == 0, r.output
178 return repo
179
180
181 @pytest.fixture
182 def cycle_repo(repo: pathlib.Path) -> pathlib.Path:
183 """Repo with a deliberate circular import: alpha ↔ beta."""
184 work = repo
185
186 (work / "alpha.py").write_text(textwrap.dedent("""\
187 import beta
188
189 def do_alpha():
190 pass
191 """))
192 (work / "beta.py").write_text(textwrap.dedent("""\
193 import alpha
194
195 def do_beta():
196 pass
197 """))
198 r = runner.invoke(cli, ["commit", "-m", "Introduce alpha-beta cycle"])
199 assert r.exit_code == 0, r.output
200 return repo
201
202
203 # ---------------------------------------------------------------------------
204 # Helpers
205 # ---------------------------------------------------------------------------
206
207
208 def _make_sym_tree(*import_names: str) -> SymbolTree:
209 """Build a minimal SymbolTree matching the real parse_symbols format.
210
211 ``name`` holds the bare module name (e.g. ``"utils"``).
212 ``qualified_name`` uses the ``import::NAME`` format that ``parse_symbols``
213 actually produces.
214 """
215 tree: SymbolTree = {}
216 for name in import_names:
217 rec: SymbolRecord = {
218 "kind": "import",
219 "name": name,
220 "qualified_name": f"import::{name}",
221 "lineno": 1,
222 "end_lineno": 1,
223 "content_id": "",
224 "body_hash": "",
225 "signature_id": "",
226 "metadata_id": "",
227 "canonical_key": "",
228 }
229 tree[f"import::{name}"] = rec
230 return tree
231
232
233 def _codemap_json(args: list[str] | None = None) -> _CodemapPayload:
234 """Invoke codemap --json and return the typed payload."""
235 cmd = ["code", "codemap", "--json"] + (args or [])
236 result = runner.invoke(cli, cmd)
237 assert result.exit_code == 0, result.output
238 raw: _CodemapPayload = json.loads(result.output)
239 return raw
240
241
242 # ---------------------------------------------------------------------------
243 # Unit — _build_import_graph
244 # ---------------------------------------------------------------------------
245
246
247 class TestBuildImportGraph:
248 def test_empty_sym_map_returns_empty_dicts(self) -> None:
249 sym_map: _SymbolMap = {}
250 imports_out, in_degree = _build_import_graph(sym_map)
251 assert imports_out == {}
252 assert in_degree == {}
253
254 def test_single_file_no_imports(self) -> None:
255 sym_map: _SymbolMap = {"src/a.py": {}}
256 imports_out, in_degree = _build_import_graph(sym_map)
257 assert imports_out == {"src/a.py": []}
258 assert in_degree == {"src/a.py": 0}
259
260 def test_simple_import_edge(self) -> None:
261 sym_map: _SymbolMap = {
262 "src/a.py": _make_sym_tree("b"),
263 "src/b.py": {},
264 }
265 imports_out, in_degree = _build_import_graph(sym_map)
266 assert "src/b.py" in imports_out["src/a.py"]
267 assert in_degree["src/b.py"] == 1
268 assert in_degree["src/a.py"] == 0
269
270 def test_self_loop_excluded(self) -> None:
271 """A file importing its own stem must not create a self-edge."""
272 sym_map: _SymbolMap = {
273 "src/a.py": _make_sym_tree("a"),
274 }
275 imports_out, in_degree = _build_import_graph(sym_map)
276 assert imports_out["src/a.py"] == []
277
278 def test_duplicate_import_records_produce_single_edge(self) -> None:
279 """Multiple import records for the same target count as one edge."""
280 tree: SymbolTree = {}
281 for i in range(5):
282 rec: SymbolRecord = {
283 "kind": "import",
284 "name": "b", # same bare module name — all edges to src/b.py
285 "qualified_name": f"import::b_alias_{i}",
286 "lineno": i + 1,
287 "end_lineno": i + 1,
288 "content_id": "",
289 "body_hash": "",
290 "signature_id": "",
291 "metadata_id": "",
292 "canonical_key": "",
293 }
294 tree[f"import::b_alias_{i}"] = rec
295
296 sym_map: _SymbolMap = {
297 "src/a.py": tree,
298 "src/b.py": {},
299 }
300 imports_out, in_degree = _build_import_graph(sym_map)
301 assert imports_out["src/a.py"].count("src/b.py") == 1
302 assert in_degree["src/b.py"] == 1
303
304 def test_unknown_import_ignored(self) -> None:
305 """Imports with no matching stem in the map are silently skipped."""
306 sym_map: _SymbolMap = {
307 "src/a.py": _make_sym_tree("nonexistent_module"),
308 }
309 imports_out, in_degree = _build_import_graph(sym_map)
310 assert imports_out["src/a.py"] == []
311
312 def test_non_import_records_ignored(self) -> None:
313 tree: SymbolTree = {}
314 fn_rec: SymbolRecord = {
315 "kind": "function",
316 "name": "b",
317 "qualified_name": "b",
318 "lineno": 1,
319 "end_lineno": 3,
320 "content_id": "",
321 "body_hash": "",
322 "signature_id": "",
323 "metadata_id": "",
324 "canonical_key": "",
325 }
326 tree["function::b"] = fn_rec
327
328 sym_map: _SymbolMap = {
329 "src/a.py": tree,
330 "src/b.py": {},
331 }
332 imports_out, _ = _build_import_graph(sym_map)
333 assert imports_out["src/a.py"] == []
334
335 def test_fan_out_multiple_targets(self) -> None:
336 sym_map: _SymbolMap = {
337 "src/a.py": _make_sym_tree("b", "c", "d"),
338 "src/b.py": {},
339 "src/c.py": {},
340 "src/d.py": {},
341 }
342 imports_out, in_degree = _build_import_graph(sym_map)
343 assert len(imports_out["src/a.py"]) == 3
344 assert in_degree["src/b.py"] == 1
345 assert in_degree["src/c.py"] == 1
346 assert in_degree["src/d.py"] == 1
347
348 def test_fan_in_multiple_importers(self) -> None:
349 sym_map: _SymbolMap = {
350 "src/a.py": _make_sym_tree("c"),
351 "src/b.py": _make_sym_tree("c"),
352 "src/c.py": {},
353 }
354 imports_out, in_degree = _build_import_graph(sym_map)
355 assert in_degree["src/c.py"] == 2
356
357
358 # ---------------------------------------------------------------------------
359 # Unit — _find_cycles
360 # ---------------------------------------------------------------------------
361
362
363 class TestFindCycles:
364 def test_empty_graph(self) -> None:
365 assert _find_cycles({}) == []
366
367 def test_single_node_no_edges(self) -> None:
368 assert _find_cycles({"A": []}) == []
369
370 def test_linear_chain_no_cycle(self) -> None:
371 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": []}
372 assert _find_cycles(g) == []
373
374 def test_self_loop(self) -> None:
375 g: _AdjacencyMap = {"A": ["A"]}
376 cycles = _find_cycles(g)
377 assert len(cycles) == 1
378 assert cycles[0][0] == cycles[0][-1] == "A"
379
380 def test_simple_two_node_cycle(self) -> None:
381 g: _AdjacencyMap = {"A": ["B"], "B": ["A"]}
382 cycles = _find_cycles(g)
383 assert any("A" in c and "B" in c for c in cycles)
384
385 def test_three_node_cycle(self) -> None:
386 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["A"]}
387 cycles = _find_cycles(g)
388 assert len(cycles) >= 1
389 cycle = cycles[0]
390 assert cycle[0] == cycle[-1]
391 assert len(cycle) == 4 # A→B→C→A
392
393 def test_two_independent_cycles(self) -> None:
394 g: _AdjacencyMap = {
395 "A": ["B"], "B": ["A"], # cycle 1
396 "C": ["D"], "D": ["C"], # cycle 2
397 }
398 cycles = _find_cycles(g)
399 assert len(cycles) == 2
400
401 def test_overlapping_cycles_shared_node(self) -> None:
402 """Node B participates in both A→B→A and B→C→B."""
403 g: _AdjacencyMap = {
404 "A": ["B"],
405 "B": ["A", "C"],
406 "C": ["B"],
407 }
408 cycles = _find_cycles(g)
409 assert len(cycles) >= 2
410
411 def test_disconnected_graph_with_cycle_in_one_component(self) -> None:
412 g: _AdjacencyMap = {
413 "X": ["Y"], "Y": [], # acyclic component
414 "A": ["B"], "B": ["A"], # cyclic component
415 }
416 cycles = _find_cycles(g)
417 assert len(cycles) == 1
418
419 def test_cycle_path_forms_closed_ring(self) -> None:
420 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["A"]}
421 cycles = _find_cycles(g)
422 for cycle in cycles:
423 assert cycle[0] == cycle[-1], "cycle path must start and end at the same node"
424
425 def test_deep_linear_chain_no_recursion_error(self) -> None:
426 depth = 1_000 # well beyond Python's default recursion limit
427 nodes = [f"mod_{i}" for i in range(depth)]
428 g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)}
429 g[nodes[-1]] = []
430 cycles = _find_cycles(g)
431 assert cycles == []
432
433 def test_deep_cycle_at_end_of_long_chain(self) -> None:
434 depth = 500
435 nodes = [f"mod_{i}" for i in range(depth)]
436 g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)}
437 g[nodes[-1]] = [nodes[-2]] # last two form a cycle
438 cycles = _find_cycles(g)
439 assert any(nodes[-1] in c or nodes[-2] in c for c in cycles)
440
441 def test_no_duplicate_cycles_for_same_back_edge(self) -> None:
442 g: _AdjacencyMap = {"A": ["B"], "B": ["A"]}
443 cycles = _find_cycles(g)
444 # There should be exactly one detected cycle for a simple two-node ring.
445 assert len(cycles) == 1
446
447 def test_fully_connected_triangle(self) -> None:
448 g: _AdjacencyMap = {"A": ["B", "C"], "B": ["A", "C"], "C": ["A", "B"]}
449 cycles = _find_cycles(g)
450 assert len(cycles) >= 1
451
452 def test_index_extraction_is_correct(self) -> None:
453 """Cycle slice starts at the actual back-edge target, not index 0."""
454 g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["B"]}
455 cycles = _find_cycles(g)
456 # The cycle involves B and C, not A.
457 for cycle in cycles:
458 assert "A" not in cycle, "A is not part of any cycle in this graph"
459
460
461 # ---------------------------------------------------------------------------
462 # Integration — basic CLI invocations
463 # ---------------------------------------------------------------------------
464
465
466 class TestCodemapCLIBasic:
467 def test_exits_zero(self, code_repo: pathlib.Path) -> None:
468 result = runner.invoke(cli, ["code", "codemap"])
469 assert result.exit_code == 0, result.output
470
471 def test_text_output_has_all_sections(self, code_repo: pathlib.Path) -> None:
472 result = runner.invoke(cli, ["code", "codemap"])
473 assert result.exit_code == 0
474 out = result.output
475 assert "Semantic codemap" in out
476 assert "Top modules by size" in out
477 assert "Import cycles" in out
478 assert "High-centrality" in out
479 assert "Boundary files" in out
480 assert "Agent-safe zones" in out
481
482 def test_text_output_contains_commit_hash(self, code_repo: pathlib.Path) -> None:
483 result = runner.invoke(cli, ["code", "codemap"])
484 assert result.exit_code == 0
485 import re
486 assert re.search(r"commit sha256:[0-9a-f]{64}", result.output)
487
488 def test_no_repo_exits_nonzero(
489 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
490 ) -> None:
491 monkeypatch.chdir(tmp_path)
492 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
493 result = runner.invoke(cli, ["code", "codemap"])
494 assert result.exit_code != 0
495
496
497 # ---------------------------------------------------------------------------
498 # Integration — JSON schema
499 # ---------------------------------------------------------------------------
500
501
502 class TestCodemapJSONSchema:
503 def test_json_exits_zero(self, code_repo: pathlib.Path) -> None:
504 result = runner.invoke(cli, ["code", "codemap", "--json"])
505 assert result.exit_code == 0, result.output
506
507 def test_json_is_valid(self, code_repo: pathlib.Path) -> None:
508 result = runner.invoke(cli, ["code", "codemap", "--json"])
509 assert result.exit_code == 0
510 data = json.loads(result.output)
511 assert isinstance(data, dict)
512
513 def test_json_required_top_level_keys(self, code_repo: pathlib.Path) -> None:
514 data = _codemap_json()
515 required = {
516 "schema",
517 "commit",
518 "branch",
519 "language_filter",
520 "modules",
521 "import_cycles",
522 "high_centrality",
523 "boundary_files",
524 "agent_safe_zones",
525 }
526 assert required <= data.keys()
527
528 def test_json_modules_is_list(self, code_repo: pathlib.Path) -> None:
529 data = _codemap_json()
530 assert isinstance(data["modules"], list)
531
532 def test_json_import_cycles_is_list(self, code_repo: pathlib.Path) -> None:
533 data = _codemap_json()
534 assert isinstance(data["import_cycles"], list)
535
536 def test_json_high_centrality_is_list(self, code_repo: pathlib.Path) -> None:
537 data = _codemap_json()
538 assert isinstance(data["high_centrality"], list)
539
540 def test_json_boundary_files_is_list(self, code_repo: pathlib.Path) -> None:
541 data = _codemap_json()
542 assert isinstance(data["boundary_files"], list)
543
544 def test_json_agent_safe_zones_is_list(self, code_repo: pathlib.Path) -> None:
545 data = _codemap_json()
546 assert isinstance(data["agent_safe_zones"], list)
547
548 def test_json_branch_field_is_string(self, code_repo: pathlib.Path) -> None:
549 data = _codemap_json()
550 assert isinstance(data["branch"], str)
551 assert data["branch"] # non-empty
552
553 def test_json_commit_is_full_id(self, code_repo: pathlib.Path) -> None:
554 data = _codemap_json()
555 commit = data["commit"]
556 assert isinstance(commit, str)
557 assert commit.startswith("sha256:")
558 hex_part = commit[len("sha256:"):]
559 assert len(hex_part) == 64
560 assert all(c in "0123456789abcdef" for c in hex_part)
561
562 def test_json_language_filter_none_when_unset(self, code_repo: pathlib.Path) -> None:
563 data = _codemap_json()
564 assert data["language_filter"] is None
565
566 def test_json_module_entry_has_required_fields(self, code_repo: pathlib.Path) -> None:
567 data = _codemap_json()
568 modules: list[_ModuleEntry] = data["modules"]
569 if modules:
570 entry = modules[0]
571 assert "file" in entry
572 assert "symbol_count" in entry
573 assert "importers" in entry
574 assert "imports" in entry
575
576 def test_json_schema_version_matches_package(self, code_repo: pathlib.Path) -> None:
577 data = _codemap_json()
578 assert isinstance(data["schema"], int)
579 assert data["schema"] > 0
580
581
582 # ---------------------------------------------------------------------------
583 # Integration — --top flag
584 # ---------------------------------------------------------------------------
585
586
587 class TestCodemapTopFlag:
588 def test_top_1_limits_modules_to_1(self, code_repo: pathlib.Path) -> None:
589 data = _codemap_json(["--top", "1"])
590 assert len(data["modules"]) <= 1
591
592 def test_top_3_limits_sections(self, code_repo: pathlib.Path) -> None:
593 data = _codemap_json(["--top", "3"])
594 assert len(data["modules"]) <= 3
595 assert len(data["high_centrality"]) <= 3
596 assert len(data["boundary_files"]) <= 3
597 assert len(data["agent_safe_zones"]) <= 3
598
599 def test_top_zero_exits_nonzero(self, code_repo: pathlib.Path) -> None:
600 result = runner.invoke(cli, ["code", "codemap", "--top", "0"])
601 assert result.exit_code != 0
602
603 def test_top_negative_exits_nonzero(self, code_repo: pathlib.Path) -> None:
604 result = runner.invoke(cli, ["code", "codemap", "--top", "-1"])
605 assert result.exit_code != 0
606
607 def test_top_text_mode_respected(self, code_repo: pathlib.Path) -> None:
608 result = runner.invoke(cli, ["code", "codemap", "--top", "1"])
609 assert result.exit_code == 0
610
611
612 # ---------------------------------------------------------------------------
613 # Integration — --min-importers flag
614 # ---------------------------------------------------------------------------
615
616
617 class TestCodemapMinImporters:
618 def test_min_importers_zero_includes_all(self, multi_file_repo: pathlib.Path) -> None:
619 data_all = _codemap_json(["--min-importers", "0"])
620 data_zero = _codemap_json() # default = 0
621 assert len(data_all["modules"]) == len(data_zero["modules"])
622
623 def test_min_importers_1_excludes_unimported(self, multi_file_repo: pathlib.Path) -> None:
624 data_all = _codemap_json()
625 data_filtered = _codemap_json(["--min-importers", "1"])
626 modules_all: list[_ModuleEntry] = data_all["modules"]
627 modules_filtered: list[_ModuleEntry] = data_filtered["modules"]
628 # Every module in the filtered list must have importers >= 1.
629 for mod in modules_filtered:
630 assert mod["importers"] >= 1
631 # Filtered list cannot be larger than unfiltered.
632 assert len(modules_filtered) <= len(modules_all)
633
634 def test_min_importers_very_large_returns_empty_modules(
635 self, code_repo: pathlib.Path
636 ) -> None:
637 data = _codemap_json(["--min-importers", "9999"])
638 assert data["modules"] == []
639
640 def test_min_importers_negative_exits_nonzero(self, code_repo: pathlib.Path) -> None:
641 result = runner.invoke(cli, ["code", "codemap", "--min-importers", "-1"])
642 assert result.exit_code != 0
643
644 def test_min_importers_label_in_text_output(self, code_repo: pathlib.Path) -> None:
645 result = runner.invoke(cli, ["code", "codemap", "--min-importers", "2"])
646 assert result.exit_code == 0
647 assert "min-importers" in result.output
648
649
650 # ---------------------------------------------------------------------------
651 # Integration — --language flag
652 # ---------------------------------------------------------------------------
653
654
655 class TestCodemapLanguageFlag:
656 def test_language_python_exits_zero(self, code_repo: pathlib.Path) -> None:
657 result = runner.invoke(cli, ["code", "codemap", "--language", "Python"])
658 assert result.exit_code == 0
659
660 def test_language_filter_in_json(self, code_repo: pathlib.Path) -> None:
661 data = _codemap_json(["--language", "Python"])
662 assert data["language_filter"] == "Python"
663
664 def test_language_no_match_exits_zero_empty_modules(
665 self, code_repo: pathlib.Path
666 ) -> None:
667 data = _codemap_json(["--language", "COBOL"])
668 assert data["modules"] == []
669
670 def test_language_text_header_shown(self, code_repo: pathlib.Path) -> None:
671 result = runner.invoke(cli, ["code", "codemap", "--language", "Python"])
672 assert result.exit_code == 0
673 assert "language: Python" in result.output
674
675
676 # ---------------------------------------------------------------------------
677 # Integration — --commit flag
678 # ---------------------------------------------------------------------------
679
680
681 class TestCodemapCommitFlag:
682 def test_commit_head_is_default(self, code_repo: pathlib.Path) -> None:
683 data_head = _codemap_json(["--commit", "HEAD"])
684 data_default = _codemap_json()
685 assert data_head["commit"] == data_default["commit"]
686
687 def test_commit_head_minus_1(self, code_repo: pathlib.Path) -> None:
688 result = runner.invoke(cli, ["code", "codemap", "--commit", "HEAD~1", "--json"])
689 assert result.exit_code == 0
690 data = json.loads(result.output)
691 data_head = _codemap_json()
692 # Historical snapshot commit id must differ from HEAD.
693 assert data["commit"] != data_head["commit"]
694
695 def test_commit_invalid_ref_exits_nonzero(self, code_repo: pathlib.Path) -> None:
696 result = runner.invoke(cli, ["code", "codemap", "--commit", "totally_bogus_ref_xyz"])
697 assert result.exit_code != 0
698
699
700 # ---------------------------------------------------------------------------
701 # Integration — empty repo
702 # ---------------------------------------------------------------------------
703
704
705 class TestCodemapEmptyRepo:
706 def test_no_commits_exits_nonzero(self, repo: pathlib.Path) -> None:
707 """Repo with no commits: HEAD does not resolve — must exit non-zero."""
708 result = runner.invoke(cli, ["code", "codemap"])
709 assert result.exit_code != 0
710
711
712 # ---------------------------------------------------------------------------
713 # E2E — cycle detection
714 # ---------------------------------------------------------------------------
715
716
717 class TestCodemapCycleE2E:
718 def test_circular_import_detected(self, cycle_repo: pathlib.Path) -> None:
719 data = _codemap_json()
720 cycles: list[list[str]] = data["import_cycles"]
721 assert len(cycles) >= 1
722 involved = {node for cycle in cycles for node in cycle}
723 # alpha.py and beta.py should both appear in the cycle paths.
724 assert any("alpha" in node for node in involved)
725 assert any("beta" in node for node in involved)
726
727 def test_acyclic_repo_has_no_cycles(self, multi_file_repo: pathlib.Path) -> None:
728 data = _codemap_json()
729 assert data["import_cycles"] == []
730
731 def test_cycle_paths_are_closed_rings(self, cycle_repo: pathlib.Path) -> None:
732 data = _codemap_json()
733 for cycle in data["import_cycles"]:
734 assert isinstance(cycle, list)
735 assert len(cycle) >= 2
736 assert cycle[0] == cycle[-1], "cycle path must start and end at the same node"
737
738
739 # ---------------------------------------------------------------------------
740 # E2E — agent-safe zones
741 # ---------------------------------------------------------------------------
742
743
744 class TestCodemapAgentSafeZones:
745 def test_isolated_file_appears_in_agent_safe_zones(
746 self, multi_file_repo: pathlib.Path
747 ) -> None:
748 """standalone.py imports nothing and is imported by nothing."""
749 data = _codemap_json()
750 safe: list[str] = data["agent_safe_zones"]
751 assert any("standalone" in fp for fp in safe)
752
753 def test_imported_file_not_in_agent_safe_zones(
754 self, multi_file_repo: pathlib.Path
755 ) -> None:
756 """utils.py is imported by models.py and api.py — not isolated."""
757 data = _codemap_json()
758 safe: list[str] = data["agent_safe_zones"]
759 assert not any("utils" in fp for fp in safe)
760
761 def test_agent_safe_zones_are_sorted(self, multi_file_repo: pathlib.Path) -> None:
762 data = _codemap_json()
763 safe: list[str] = data["agent_safe_zones"]
764 assert safe == sorted(safe)
765
766
767 # ---------------------------------------------------------------------------
768 # E2E — boundary files
769 # ---------------------------------------------------------------------------
770
771
772 class TestCodemapBoundaryFiles:
773 def test_boundary_entry_has_required_fields(self, multi_file_repo: pathlib.Path) -> None:
774 data = _codemap_json()
775 for boundary in data["boundary_files"]:
776 assert "file" in boundary
777 assert "fan_out" in boundary
778 assert "fan_in" in boundary
779
780 def test_boundary_file_fan_in_is_zero(self, multi_file_repo: pathlib.Path) -> None:
781 data = _codemap_json()
782 for boundary in data["boundary_files"]:
783 assert boundary["fan_in"] == 0
784
785 def test_boundary_file_fan_out_at_least_3(self, multi_file_repo: pathlib.Path) -> None:
786 data = _codemap_json()
787 for boundary in data["boundary_files"]:
788 assert boundary["fan_out"] >= 3
789
790
791 # ---------------------------------------------------------------------------
792 # Stress — performance and determinism
793 # ---------------------------------------------------------------------------
794
795
796 class TestCodemapStress:
797 def test_find_cycles_1000_node_linear_chain_no_recursion_error(self) -> None:
798 depth = 1_000
799 nodes = [f"file_{i}.py" for i in range(depth)]
800 g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)}
801 g[nodes[-1]] = []
802 cycles = _find_cycles(g)
803 assert cycles == []
804
805 def test_find_cycles_500_nodes_50_embedded_3_cycles(self) -> None:
806 """500-node graph with 50 explicit A→B→C→A triangles plus 350 isolated nodes."""
807 g: _AdjacencyMap = {}
808 expected_min = 50
809 for i in range(50):
810 a, b, c = f"a_{i}", f"b_{i}", f"c_{i}"
811 g[a] = [b]
812 g[b] = [c]
813 g[c] = [a]
814 for i in range(350):
815 g[f"iso_{i}"] = []
816 cycles = _find_cycles(g)
817 assert len(cycles) >= expected_min
818
819 def test_build_import_graph_large_sym_map_with_duplicates(self) -> None:
820 """1 000 files each importing the same hub file via 10 duplicate records."""
821 hub = "src/hub.py"
822 sym_map: _SymbolMap = {hub: {}}
823 for i in range(999):
824 fp = f"src/module_{i}.py"
825 tree: SymbolTree = {}
826 for j in range(10):
827 rec: SymbolRecord = {
828 "kind": "import",
829 "name": "hub", # same bare module — all edges to hub
830 "qualified_name": f"import::hub_alias_{j}",
831 "lineno": j + 1,
832 "end_lineno": j + 1,
833 "content_id": "",
834 "body_hash": "",
835 "signature_id": "",
836 "metadata_id": "",
837 "canonical_key": "",
838 }
839 tree[f"import::hub_alias_{j}"] = rec
840 sym_map[fp] = tree
841
842 imports_out, in_degree = _build_import_graph(sym_map)
843
844 assert in_degree[hub] == 999
845 for fp in sym_map:
846 if fp == hub:
847 continue
848 assert imports_out[fp].count(hub) == 1, "each file must have exactly one edge to hub"
849
850 def test_repeated_runs_produce_identical_json(self, code_repo: pathlib.Path) -> None:
851 """Two back-to-back invocations must produce the exact same JSON."""
852 result_a = runner.invoke(cli, ["code", "codemap", "--json"])
853 result_b = runner.invoke(cli, ["code", "codemap", "--json"])
854 assert result_a.exit_code == 0
855 assert result_b.exit_code == 0
856 _volatile = {"duration_ms", "timestamp"}
857 da = {k: v for k, v in json.loads(result_a.output).items() if k not in _volatile}
858 db = {k: v for k, v in json.loads(result_b.output).items() if k not in _volatile}
859 assert da == db
860
861 def test_codemap_completes_within_reasonable_time(
862 self, code_repo: pathlib.Path
863 ) -> None:
864 """Codemap on a small repo must finish within 10 seconds."""
865 start = time.monotonic()
866 result = runner.invoke(cli, ["code", "codemap", "--json"])
867 elapsed = time.monotonic() - start
868 assert result.exit_code == 0
869 assert elapsed < 10.0, f"codemap took {elapsed:.1f}s — too slow"
870
871
872 # ---------------------------------------------------------------------------
873 # Flag registration tests
874 # ---------------------------------------------------------------------------
875
876 import argparse as _argparse
877 from muse.cli.commands.codemap import register as _register_codemap
878
879
880 def _parse_codemap(*args: str) -> _argparse.Namespace:
881 root_p = _argparse.ArgumentParser()
882 subs = root_p.add_subparsers(dest="cmd")
883 _register_codemap(subs)
884 return root_p.parse_args(["codemap", *args])
885
886
887 class TestRegisterFlags:
888 def test_default_json_out_is_false(self) -> None:
889 ns = _parse_codemap()
890 assert ns.json_out is False
891
892 def test_json_flag_sets_json_out(self) -> None:
893 ns = _parse_codemap("--json")
894 assert ns.json_out is True
895
896 def test_j_shorthand_sets_json_out(self) -> None:
897 ns = _parse_codemap("-j")
898 assert ns.json_out is True
File History 1 commit