test_cmd_codemap.py
python
sha256:e029dfbf5482b23e127b4793b93953b3e2b5af9ca3aa533e6c6e08a6d1887e17
fixing more failing tests
Human
4 days ago
| 1 | """Tests for ``muse code codemap``. |
| 2 | |
| 3 | Coverage layers |
| 4 | --------------- |
| 5 | Unit |
| 6 | _build_import_graph — edge deduplication, self-loop exclusion, empty maps. |
| 7 | _find_cycles — empty graph, linear chain, simple cycle, multi-cycle, |
| 8 | overlapping cycles, self-loop, deep chain (no stack |
| 9 | overflow), disconnected components, O(1) index lookup. |
| 10 | |
| 11 | Integration (live repo via CliRunner) |
| 12 | Exits zero for valid invocations. |
| 13 | JSON schema: all required top-level keys present. |
| 14 | JSON schema: new fields — ``branch``, ``agent_safe_zones``. |
| 15 | ``--top`` flag limits each ranked section. |
| 16 | ``--top 0`` and ``--top -1`` are rejected with a non-zero exit. |
| 17 | ``--min-importers`` filters ranked module list correctly. |
| 18 | ``--language`` filter restricts analysis to the named language. |
| 19 | ``--language`` with no match exits zero but emits empty modules list. |
| 20 | ``--commit REF`` analyses a historical snapshot. |
| 21 | Text output contains all expected section headers. |
| 22 | No-repo invocation exits non-zero. |
| 23 | Empty repo (no commits yet) exits non-zero or returns gracefully. |
| 24 | |
| 25 | E2E (real cross-file import cycles in a live repo) |
| 26 | Cycle is detected when two Python files import each other. |
| 27 | No false-positive cycle when imports are acyclic. |
| 28 | Agent-safe zone reported for a fully isolated file. |
| 29 | |
| 30 | Stress |
| 31 | 1 000-node linear chain: completes without RecursionError. |
| 32 | 500-node graph with 50 embedded 3-cycles: all cycles found, no crash. |
| 33 | Repeated runs produce identical output (determinism). |
| 34 | Large sym_map with duplicate import records: edges deduplicated. |
| 35 | """ |
| 36 | |
| 37 | from __future__ import annotations |
| 38 | |
| 39 | import json |
| 40 | import pathlib |
| 41 | import textwrap |
| 42 | import time |
| 43 | from typing import TypedDict |
| 44 | |
| 45 | import pytest |
| 46 | |
| 47 | from tests.cli_test_helper import CliRunner |
| 48 | |
| 49 | from muse.cli.commands.codemap import _build_import_graph, _find_cycles |
| 50 | from muse.plugins.code.ast_parser import SymbolTree, SymbolRecord |
| 51 | |
| 52 | type _SymbolMap = dict[str, SymbolTree] |
| 53 | type _AdjacencyMap = dict[str, list[str]] |
| 54 | |
| 55 | cli = None # argparse migration — CliRunner ignores this arg |
| 56 | |
| 57 | runner = CliRunner() |
| 58 | |
| 59 | |
| 60 | # --------------------------------------------------------------------------- |
| 61 | # Typed payload for JSON assertions — mirrors the codemap JSON schema. |
| 62 | # --------------------------------------------------------------------------- |
| 63 | |
| 64 | |
| 65 | class _ModuleEntry(TypedDict): |
| 66 | file: str |
| 67 | symbol_count: int |
| 68 | importers: int |
| 69 | imports: int |
| 70 | |
| 71 | |
| 72 | class _CentralityEntry(TypedDict): |
| 73 | name: str |
| 74 | callers: int |
| 75 | |
| 76 | |
| 77 | class _BoundaryEntry(TypedDict): |
| 78 | file: str |
| 79 | fan_out: int |
| 80 | fan_in: int |
| 81 | |
| 82 | |
| 83 | class _CodemapPayload(TypedDict): |
| 84 | schema_version: str |
| 85 | commit: str |
| 86 | branch: str |
| 87 | language_filter: str | None |
| 88 | modules: list[_ModuleEntry] |
| 89 | import_cycles: list[list[str]] |
| 90 | high_centrality: list[_CentralityEntry] |
| 91 | boundary_files: list[_BoundaryEntry] |
| 92 | agent_safe_zones: list[str] |
| 93 | |
| 94 | |
| 95 | # --------------------------------------------------------------------------- |
| 96 | # Fixtures |
| 97 | # --------------------------------------------------------------------------- |
| 98 | |
| 99 | |
| 100 | @pytest.fixture |
| 101 | def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: |
| 102 | """Fresh code-domain Muse repo, no commits.""" |
| 103 | monkeypatch.chdir(tmp_path) |
| 104 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 105 | result = runner.invoke(cli, ["init", "--domain", "code"]) |
| 106 | assert result.exit_code == 0, result.output |
| 107 | return tmp_path |
| 108 | |
| 109 | |
| 110 | @pytest.fixture |
| 111 | def code_repo(repo: pathlib.Path) -> pathlib.Path: |
| 112 | """Repo with two Python commits — same fixture shape as other code tests.""" |
| 113 | work = repo |
| 114 | (work / "billing.py").write_text(textwrap.dedent("""\ |
| 115 | class Invoice: |
| 116 | def compute_total(self, items): |
| 117 | return sum(items) |
| 118 | |
| 119 | def apply_discount(self, total, pct): |
| 120 | return total * (1 - pct) |
| 121 | |
| 122 | def process_order(invoice, items): |
| 123 | return invoice.compute_total(items) |
| 124 | """)) |
| 125 | r = runner.invoke(cli, ["commit", "-m", "Initial billing module"]) |
| 126 | assert r.exit_code == 0, r.output |
| 127 | |
| 128 | (work / "billing.py").write_text(textwrap.dedent("""\ |
| 129 | class Invoice: |
| 130 | def compute_invoice_total(self, items): |
| 131 | return sum(items) |
| 132 | |
| 133 | def apply_discount(self, total, pct): |
| 134 | return total * (1 - pct) |
| 135 | |
| 136 | def generate_pdf(self): |
| 137 | return b"pdf" |
| 138 | |
| 139 | def process_order(invoice, items): |
| 140 | return invoice.compute_invoice_total(items) |
| 141 | |
| 142 | def send_email(address): |
| 143 | pass |
| 144 | """)) |
| 145 | runner.invoke(cli, ["code", "add", "."]) |
| 146 | r = runner.invoke(cli, ["commit", "-m", "Rename + add generate_pdf, send_email"]) |
| 147 | assert r.exit_code == 0, r.output |
| 148 | return repo |
| 149 | |
| 150 | |
| 151 | @pytest.fixture |
| 152 | def multi_file_repo(repo: pathlib.Path) -> pathlib.Path: |
| 153 | """Repo with multiple files to exercise import-graph and cycle detection.""" |
| 154 | work = repo |
| 155 | |
| 156 | (work / "utils.py").write_text(textwrap.dedent("""\ |
| 157 | def helper(): |
| 158 | pass |
| 159 | """)) |
| 160 | (work / "models.py").write_text(textwrap.dedent("""\ |
| 161 | import utils |
| 162 | |
| 163 | class User: |
| 164 | pass |
| 165 | """)) |
| 166 | (work / "api.py").write_text(textwrap.dedent("""\ |
| 167 | import models |
| 168 | import utils |
| 169 | |
| 170 | def handle(): |
| 171 | pass |
| 172 | """)) |
| 173 | (work / "standalone.py").write_text(textwrap.dedent("""\ |
| 174 | def isolated(): |
| 175 | pass |
| 176 | """)) |
| 177 | r = runner.invoke(cli, ["commit", "-m", "Multi-file layout"]) |
| 178 | assert r.exit_code == 0, r.output |
| 179 | return repo |
| 180 | |
| 181 | |
| 182 | @pytest.fixture |
| 183 | def cycle_repo(repo: pathlib.Path) -> pathlib.Path: |
| 184 | """Repo with a deliberate circular import: alpha ↔ beta.""" |
| 185 | work = repo |
| 186 | |
| 187 | (work / "alpha.py").write_text(textwrap.dedent("""\ |
| 188 | import beta |
| 189 | |
| 190 | def do_alpha(): |
| 191 | pass |
| 192 | """)) |
| 193 | (work / "beta.py").write_text(textwrap.dedent("""\ |
| 194 | import alpha |
| 195 | |
| 196 | def do_beta(): |
| 197 | pass |
| 198 | """)) |
| 199 | r = runner.invoke(cli, ["commit", "-m", "Introduce alpha-beta cycle"]) |
| 200 | assert r.exit_code == 0, r.output |
| 201 | return repo |
| 202 | |
| 203 | |
| 204 | # --------------------------------------------------------------------------- |
| 205 | # Helpers |
| 206 | # --------------------------------------------------------------------------- |
| 207 | |
| 208 | |
| 209 | def _make_sym_tree(*import_names: str) -> SymbolTree: |
| 210 | """Build a minimal SymbolTree matching the real parse_symbols format. |
| 211 | |
| 212 | ``name`` holds the bare module name (e.g. ``"utils"``). |
| 213 | ``qualified_name`` uses the ``import::NAME`` format that ``parse_symbols`` |
| 214 | actually produces. |
| 215 | """ |
| 216 | tree: SymbolTree = {} |
| 217 | for name in import_names: |
| 218 | rec: SymbolRecord = { |
| 219 | "kind": "import", |
| 220 | "name": name, |
| 221 | "qualified_name": f"import::{name}", |
| 222 | "lineno": 1, |
| 223 | "end_lineno": 1, |
| 224 | "content_id": "", |
| 225 | "body_hash": "", |
| 226 | "signature_id": "", |
| 227 | "metadata_id": "", |
| 228 | "canonical_key": "", |
| 229 | } |
| 230 | tree[f"import::{name}"] = rec |
| 231 | return tree |
| 232 | |
| 233 | |
| 234 | def _codemap_json(args: list[str] | None = None) -> _CodemapPayload: |
| 235 | """Invoke codemap --json and return the typed payload.""" |
| 236 | cmd = ["code", "codemap", "--json"] + (args or []) |
| 237 | result = runner.invoke(cli, cmd) |
| 238 | assert result.exit_code == 0, result.output |
| 239 | raw: _CodemapPayload = json.loads(result.output) |
| 240 | return raw |
| 241 | |
| 242 | |
| 243 | # --------------------------------------------------------------------------- |
| 244 | # Unit — _build_import_graph |
| 245 | # --------------------------------------------------------------------------- |
| 246 | |
| 247 | |
| 248 | class TestBuildImportGraph: |
| 249 | def test_empty_sym_map_returns_empty_dicts(self) -> None: |
| 250 | sym_map: _SymbolMap = {} |
| 251 | imports_out, in_degree = _build_import_graph(sym_map) |
| 252 | assert imports_out == {} |
| 253 | assert in_degree == {} |
| 254 | |
| 255 | def test_single_file_no_imports(self) -> None: |
| 256 | sym_map: _SymbolMap = {"src/a.py": {}} |
| 257 | imports_out, in_degree = _build_import_graph(sym_map) |
| 258 | assert imports_out == {"src/a.py": []} |
| 259 | assert in_degree == {"src/a.py": 0} |
| 260 | |
| 261 | def test_simple_import_edge(self) -> None: |
| 262 | sym_map: _SymbolMap = { |
| 263 | "src/a.py": _make_sym_tree("b"), |
| 264 | "src/b.py": {}, |
| 265 | } |
| 266 | imports_out, in_degree = _build_import_graph(sym_map) |
| 267 | assert "src/b.py" in imports_out["src/a.py"] |
| 268 | assert in_degree["src/b.py"] == 1 |
| 269 | assert in_degree["src/a.py"] == 0 |
| 270 | |
| 271 | def test_self_loop_excluded(self) -> None: |
| 272 | """A file importing its own stem must not create a self-edge.""" |
| 273 | sym_map: _SymbolMap = { |
| 274 | "src/a.py": _make_sym_tree("a"), |
| 275 | } |
| 276 | imports_out, in_degree = _build_import_graph(sym_map) |
| 277 | assert imports_out["src/a.py"] == [] |
| 278 | |
| 279 | def test_duplicate_import_records_produce_single_edge(self) -> None: |
| 280 | """Multiple import records for the same target count as one edge.""" |
| 281 | tree: SymbolTree = {} |
| 282 | for i in range(5): |
| 283 | rec: SymbolRecord = { |
| 284 | "kind": "import", |
| 285 | "name": "b", # same bare module name — all edges to src/b.py |
| 286 | "qualified_name": f"import::b_alias_{i}", |
| 287 | "lineno": i + 1, |
| 288 | "end_lineno": i + 1, |
| 289 | "content_id": "", |
| 290 | "body_hash": "", |
| 291 | "signature_id": "", |
| 292 | "metadata_id": "", |
| 293 | "canonical_key": "", |
| 294 | } |
| 295 | tree[f"import::b_alias_{i}"] = rec |
| 296 | |
| 297 | sym_map: _SymbolMap = { |
| 298 | "src/a.py": tree, |
| 299 | "src/b.py": {}, |
| 300 | } |
| 301 | imports_out, in_degree = _build_import_graph(sym_map) |
| 302 | assert imports_out["src/a.py"].count("src/b.py") == 1 |
| 303 | assert in_degree["src/b.py"] == 1 |
| 304 | |
| 305 | def test_unknown_import_ignored(self) -> None: |
| 306 | """Imports with no matching stem in the map are silently skipped.""" |
| 307 | sym_map: _SymbolMap = { |
| 308 | "src/a.py": _make_sym_tree("nonexistent_module"), |
| 309 | } |
| 310 | imports_out, in_degree = _build_import_graph(sym_map) |
| 311 | assert imports_out["src/a.py"] == [] |
| 312 | |
| 313 | def test_non_import_records_ignored(self) -> None: |
| 314 | tree: SymbolTree = {} |
| 315 | fn_rec: SymbolRecord = { |
| 316 | "kind": "function", |
| 317 | "name": "b", |
| 318 | "qualified_name": "b", |
| 319 | "lineno": 1, |
| 320 | "end_lineno": 3, |
| 321 | "content_id": "", |
| 322 | "body_hash": "", |
| 323 | "signature_id": "", |
| 324 | "metadata_id": "", |
| 325 | "canonical_key": "", |
| 326 | } |
| 327 | tree["function::b"] = fn_rec |
| 328 | |
| 329 | sym_map: _SymbolMap = { |
| 330 | "src/a.py": tree, |
| 331 | "src/b.py": {}, |
| 332 | } |
| 333 | imports_out, _ = _build_import_graph(sym_map) |
| 334 | assert imports_out["src/a.py"] == [] |
| 335 | |
| 336 | def test_fan_out_multiple_targets(self) -> None: |
| 337 | sym_map: _SymbolMap = { |
| 338 | "src/a.py": _make_sym_tree("b", "c", "d"), |
| 339 | "src/b.py": {}, |
| 340 | "src/c.py": {}, |
| 341 | "src/d.py": {}, |
| 342 | } |
| 343 | imports_out, in_degree = _build_import_graph(sym_map) |
| 344 | assert len(imports_out["src/a.py"]) == 3 |
| 345 | assert in_degree["src/b.py"] == 1 |
| 346 | assert in_degree["src/c.py"] == 1 |
| 347 | assert in_degree["src/d.py"] == 1 |
| 348 | |
| 349 | def test_fan_in_multiple_importers(self) -> None: |
| 350 | sym_map: _SymbolMap = { |
| 351 | "src/a.py": _make_sym_tree("c"), |
| 352 | "src/b.py": _make_sym_tree("c"), |
| 353 | "src/c.py": {}, |
| 354 | } |
| 355 | imports_out, in_degree = _build_import_graph(sym_map) |
| 356 | assert in_degree["src/c.py"] == 2 |
| 357 | |
| 358 | |
| 359 | # --------------------------------------------------------------------------- |
| 360 | # Unit — _find_cycles |
| 361 | # --------------------------------------------------------------------------- |
| 362 | |
| 363 | |
| 364 | class TestFindCycles: |
| 365 | def test_empty_graph(self) -> None: |
| 366 | assert _find_cycles({}) == [] |
| 367 | |
| 368 | def test_single_node_no_edges(self) -> None: |
| 369 | assert _find_cycles({"A": []}) == [] |
| 370 | |
| 371 | def test_linear_chain_no_cycle(self) -> None: |
| 372 | g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": []} |
| 373 | assert _find_cycles(g) == [] |
| 374 | |
| 375 | def test_self_loop(self) -> None: |
| 376 | g: _AdjacencyMap = {"A": ["A"]} |
| 377 | cycles = _find_cycles(g) |
| 378 | assert len(cycles) == 1 |
| 379 | assert cycles[0][0] == cycles[0][-1] == "A" |
| 380 | |
| 381 | def test_simple_two_node_cycle(self) -> None: |
| 382 | g: _AdjacencyMap = {"A": ["B"], "B": ["A"]} |
| 383 | cycles = _find_cycles(g) |
| 384 | assert any("A" in c and "B" in c for c in cycles) |
| 385 | |
| 386 | def test_three_node_cycle(self) -> None: |
| 387 | g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["A"]} |
| 388 | cycles = _find_cycles(g) |
| 389 | assert len(cycles) >= 1 |
| 390 | cycle = cycles[0] |
| 391 | assert cycle[0] == cycle[-1] |
| 392 | assert len(cycle) == 4 # A→B→C→A |
| 393 | |
| 394 | def test_two_independent_cycles(self) -> None: |
| 395 | g: _AdjacencyMap = { |
| 396 | "A": ["B"], "B": ["A"], # cycle 1 |
| 397 | "C": ["D"], "D": ["C"], # cycle 2 |
| 398 | } |
| 399 | cycles = _find_cycles(g) |
| 400 | assert len(cycles) == 2 |
| 401 | |
| 402 | def test_overlapping_cycles_shared_node(self) -> None: |
| 403 | """Node B participates in both A→B→A and B→C→B.""" |
| 404 | g: _AdjacencyMap = { |
| 405 | "A": ["B"], |
| 406 | "B": ["A", "C"], |
| 407 | "C": ["B"], |
| 408 | } |
| 409 | cycles = _find_cycles(g) |
| 410 | assert len(cycles) >= 2 |
| 411 | |
| 412 | def test_disconnected_graph_with_cycle_in_one_component(self) -> None: |
| 413 | g: _AdjacencyMap = { |
| 414 | "X": ["Y"], "Y": [], # acyclic component |
| 415 | "A": ["B"], "B": ["A"], # cyclic component |
| 416 | } |
| 417 | cycles = _find_cycles(g) |
| 418 | assert len(cycles) == 1 |
| 419 | |
| 420 | def test_cycle_path_forms_closed_ring(self) -> None: |
| 421 | g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["A"]} |
| 422 | cycles = _find_cycles(g) |
| 423 | for cycle in cycles: |
| 424 | assert cycle[0] == cycle[-1], "cycle path must start and end at the same node" |
| 425 | |
| 426 | def test_deep_linear_chain_no_recursion_error(self) -> None: |
| 427 | depth = 1_000 # well beyond Python's default recursion limit |
| 428 | nodes = [f"mod_{i}" for i in range(depth)] |
| 429 | g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)} |
| 430 | g[nodes[-1]] = [] |
| 431 | cycles = _find_cycles(g) |
| 432 | assert cycles == [] |
| 433 | |
| 434 | def test_deep_cycle_at_end_of_long_chain(self) -> None: |
| 435 | depth = 500 |
| 436 | nodes = [f"mod_{i}" for i in range(depth)] |
| 437 | g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)} |
| 438 | g[nodes[-1]] = [nodes[-2]] # last two form a cycle |
| 439 | cycles = _find_cycles(g) |
| 440 | assert any(nodes[-1] in c or nodes[-2] in c for c in cycles) |
| 441 | |
| 442 | def test_no_duplicate_cycles_for_same_back_edge(self) -> None: |
| 443 | g: _AdjacencyMap = {"A": ["B"], "B": ["A"]} |
| 444 | cycles = _find_cycles(g) |
| 445 | # There should be exactly one detected cycle for a simple two-node ring. |
| 446 | assert len(cycles) == 1 |
| 447 | |
| 448 | def test_fully_connected_triangle(self) -> None: |
| 449 | g: _AdjacencyMap = {"A": ["B", "C"], "B": ["A", "C"], "C": ["A", "B"]} |
| 450 | cycles = _find_cycles(g) |
| 451 | assert len(cycles) >= 1 |
| 452 | |
| 453 | def test_index_extraction_is_correct(self) -> None: |
| 454 | """Cycle slice starts at the actual back-edge target, not index 0.""" |
| 455 | g: _AdjacencyMap = {"A": ["B"], "B": ["C"], "C": ["B"]} |
| 456 | cycles = _find_cycles(g) |
| 457 | # The cycle involves B and C, not A. |
| 458 | for cycle in cycles: |
| 459 | assert "A" not in cycle, "A is not part of any cycle in this graph" |
| 460 | |
| 461 | |
| 462 | # --------------------------------------------------------------------------- |
| 463 | # Integration — basic CLI invocations |
| 464 | # --------------------------------------------------------------------------- |
| 465 | |
| 466 | |
| 467 | class TestCodemapCLIBasic: |
| 468 | def test_exits_zero(self, code_repo: pathlib.Path) -> None: |
| 469 | result = runner.invoke(cli, ["code", "codemap"]) |
| 470 | assert result.exit_code == 0, result.output |
| 471 | |
| 472 | def test_text_output_has_all_sections(self, code_repo: pathlib.Path) -> None: |
| 473 | result = runner.invoke(cli, ["code", "codemap"]) |
| 474 | assert result.exit_code == 0 |
| 475 | out = result.output |
| 476 | assert "Semantic codemap" in out |
| 477 | assert "Top modules by size" in out |
| 478 | assert "Import cycles" in out |
| 479 | assert "High-centrality" in out |
| 480 | assert "Boundary files" in out |
| 481 | assert "Agent-safe zones" in out |
| 482 | |
| 483 | def test_text_output_contains_commit_hash(self, code_repo: pathlib.Path) -> None: |
| 484 | result = runner.invoke(cli, ["code", "codemap"]) |
| 485 | assert result.exit_code == 0 |
| 486 | import re |
| 487 | assert re.search(r"commit sha256:[0-9a-f]{64}", result.output) |
| 488 | |
| 489 | def test_no_repo_exits_nonzero( |
| 490 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 491 | ) -> None: |
| 492 | monkeypatch.chdir(tmp_path) |
| 493 | monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) |
| 494 | result = runner.invoke(cli, ["code", "codemap"]) |
| 495 | assert result.exit_code != 0 |
| 496 | |
| 497 | |
| 498 | # --------------------------------------------------------------------------- |
| 499 | # Integration — JSON schema |
| 500 | # --------------------------------------------------------------------------- |
| 501 | |
| 502 | |
| 503 | class TestCodemapJSONSchema: |
| 504 | def test_json_exits_zero(self, code_repo: pathlib.Path) -> None: |
| 505 | result = runner.invoke(cli, ["code", "codemap", "--json"]) |
| 506 | assert result.exit_code == 0, result.output |
| 507 | |
| 508 | def test_json_is_valid(self, code_repo: pathlib.Path) -> None: |
| 509 | result = runner.invoke(cli, ["code", "codemap", "--json"]) |
| 510 | assert result.exit_code == 0 |
| 511 | data = json.loads(result.output) |
| 512 | assert isinstance(data, dict) |
| 513 | |
| 514 | def test_json_required_top_level_keys(self, code_repo: pathlib.Path) -> None: |
| 515 | data = _codemap_json() |
| 516 | required = { |
| 517 | "schema", |
| 518 | "commit", |
| 519 | "branch", |
| 520 | "language_filter", |
| 521 | "modules", |
| 522 | "import_cycles", |
| 523 | "high_centrality", |
| 524 | "boundary_files", |
| 525 | "agent_safe_zones", |
| 526 | } |
| 527 | assert required <= data.keys() |
| 528 | |
| 529 | def test_json_modules_is_list(self, code_repo: pathlib.Path) -> None: |
| 530 | data = _codemap_json() |
| 531 | assert isinstance(data["modules"], list) |
| 532 | |
| 533 | def test_json_import_cycles_is_list(self, code_repo: pathlib.Path) -> None: |
| 534 | data = _codemap_json() |
| 535 | assert isinstance(data["import_cycles"], list) |
| 536 | |
| 537 | def test_json_high_centrality_is_list(self, code_repo: pathlib.Path) -> None: |
| 538 | data = _codemap_json() |
| 539 | assert isinstance(data["high_centrality"], list) |
| 540 | |
| 541 | def test_json_boundary_files_is_list(self, code_repo: pathlib.Path) -> None: |
| 542 | data = _codemap_json() |
| 543 | assert isinstance(data["boundary_files"], list) |
| 544 | |
| 545 | def test_json_agent_safe_zones_is_list(self, code_repo: pathlib.Path) -> None: |
| 546 | data = _codemap_json() |
| 547 | assert isinstance(data["agent_safe_zones"], list) |
| 548 | |
| 549 | def test_json_branch_field_is_string(self, code_repo: pathlib.Path) -> None: |
| 550 | data = _codemap_json() |
| 551 | assert isinstance(data["branch"], str) |
| 552 | assert data["branch"] # non-empty |
| 553 | |
| 554 | def test_json_commit_is_full_id(self, code_repo: pathlib.Path) -> None: |
| 555 | data = _codemap_json() |
| 556 | commit = data["commit"] |
| 557 | assert isinstance(commit, str) |
| 558 | assert commit.startswith("sha256:") |
| 559 | hex_part = commit[len("sha256:"):] |
| 560 | assert len(hex_part) == 64 |
| 561 | assert all(c in "0123456789abcdef" for c in hex_part) |
| 562 | |
| 563 | def test_json_language_filter_none_when_unset(self, code_repo: pathlib.Path) -> None: |
| 564 | data = _codemap_json() |
| 565 | assert data["language_filter"] is None |
| 566 | |
| 567 | def test_json_module_entry_has_required_fields(self, code_repo: pathlib.Path) -> None: |
| 568 | data = _codemap_json() |
| 569 | modules: list[_ModuleEntry] = data["modules"] |
| 570 | if modules: |
| 571 | entry = modules[0] |
| 572 | assert "file" in entry |
| 573 | assert "symbol_count" in entry |
| 574 | assert "importers" in entry |
| 575 | assert "imports" in entry |
| 576 | |
| 577 | def test_json_schema_version_matches_package(self, code_repo: pathlib.Path) -> None: |
| 578 | data = _codemap_json() |
| 579 | assert isinstance(data["schema"], int) |
| 580 | assert data["schema"] > 0 |
| 581 | |
| 582 | |
| 583 | # --------------------------------------------------------------------------- |
| 584 | # Integration — --top flag |
| 585 | # --------------------------------------------------------------------------- |
| 586 | |
| 587 | |
| 588 | class TestCodemapTopFlag: |
| 589 | def test_top_1_limits_modules_to_1(self, code_repo: pathlib.Path) -> None: |
| 590 | data = _codemap_json(["--top", "1"]) |
| 591 | assert len(data["modules"]) <= 1 |
| 592 | |
| 593 | def test_top_3_limits_sections(self, code_repo: pathlib.Path) -> None: |
| 594 | data = _codemap_json(["--top", "3"]) |
| 595 | assert len(data["modules"]) <= 3 |
| 596 | assert len(data["high_centrality"]) <= 3 |
| 597 | assert len(data["boundary_files"]) <= 3 |
| 598 | assert len(data["agent_safe_zones"]) <= 3 |
| 599 | |
| 600 | def test_top_zero_exits_nonzero(self, code_repo: pathlib.Path) -> None: |
| 601 | result = runner.invoke(cli, ["code", "codemap", "--top", "0"]) |
| 602 | assert result.exit_code != 0 |
| 603 | |
| 604 | def test_top_negative_exits_nonzero(self, code_repo: pathlib.Path) -> None: |
| 605 | result = runner.invoke(cli, ["code", "codemap", "--top", "-1"]) |
| 606 | assert result.exit_code != 0 |
| 607 | |
| 608 | def test_top_text_mode_respected(self, code_repo: pathlib.Path) -> None: |
| 609 | result = runner.invoke(cli, ["code", "codemap", "--top", "1"]) |
| 610 | assert result.exit_code == 0 |
| 611 | |
| 612 | |
| 613 | # --------------------------------------------------------------------------- |
| 614 | # Integration — --min-importers flag |
| 615 | # --------------------------------------------------------------------------- |
| 616 | |
| 617 | |
| 618 | class TestCodemapMinImporters: |
| 619 | def test_min_importers_zero_includes_all(self, multi_file_repo: pathlib.Path) -> None: |
| 620 | data_all = _codemap_json(["--min-importers", "0"]) |
| 621 | data_zero = _codemap_json() # default = 0 |
| 622 | assert len(data_all["modules"]) == len(data_zero["modules"]) |
| 623 | |
| 624 | def test_min_importers_1_excludes_unimported(self, multi_file_repo: pathlib.Path) -> None: |
| 625 | data_all = _codemap_json() |
| 626 | data_filtered = _codemap_json(["--min-importers", "1"]) |
| 627 | modules_all: list[_ModuleEntry] = data_all["modules"] |
| 628 | modules_filtered: list[_ModuleEntry] = data_filtered["modules"] |
| 629 | # Every module in the filtered list must have importers >= 1. |
| 630 | for mod in modules_filtered: |
| 631 | assert mod["importers"] >= 1 |
| 632 | # Filtered list cannot be larger than unfiltered. |
| 633 | assert len(modules_filtered) <= len(modules_all) |
| 634 | |
| 635 | def test_min_importers_very_large_returns_empty_modules( |
| 636 | self, code_repo: pathlib.Path |
| 637 | ) -> None: |
| 638 | data = _codemap_json(["--min-importers", "9999"]) |
| 639 | assert data["modules"] == [] |
| 640 | |
| 641 | def test_min_importers_negative_exits_nonzero(self, code_repo: pathlib.Path) -> None: |
| 642 | result = runner.invoke(cli, ["code", "codemap", "--min-importers", "-1"]) |
| 643 | assert result.exit_code != 0 |
| 644 | |
| 645 | def test_min_importers_label_in_text_output(self, code_repo: pathlib.Path) -> None: |
| 646 | result = runner.invoke(cli, ["code", "codemap", "--min-importers", "2"]) |
| 647 | assert result.exit_code == 0 |
| 648 | assert "min-importers" in result.output |
| 649 | |
| 650 | |
| 651 | # --------------------------------------------------------------------------- |
| 652 | # Integration — --language flag |
| 653 | # --------------------------------------------------------------------------- |
| 654 | |
| 655 | |
| 656 | class TestCodemapLanguageFlag: |
| 657 | def test_language_python_exits_zero(self, code_repo: pathlib.Path) -> None: |
| 658 | result = runner.invoke(cli, ["code", "codemap", "--language", "Python"]) |
| 659 | assert result.exit_code == 0 |
| 660 | |
| 661 | def test_language_filter_in_json(self, code_repo: pathlib.Path) -> None: |
| 662 | data = _codemap_json(["--language", "Python"]) |
| 663 | assert data["language_filter"] == "Python" |
| 664 | |
| 665 | def test_language_no_match_exits_zero_empty_modules( |
| 666 | self, code_repo: pathlib.Path |
| 667 | ) -> None: |
| 668 | data = _codemap_json(["--language", "COBOL"]) |
| 669 | assert data["modules"] == [] |
| 670 | |
| 671 | def test_language_text_header_shown(self, code_repo: pathlib.Path) -> None: |
| 672 | result = runner.invoke(cli, ["code", "codemap", "--language", "Python"]) |
| 673 | assert result.exit_code == 0 |
| 674 | assert "language: Python" in result.output |
| 675 | |
| 676 | |
| 677 | # --------------------------------------------------------------------------- |
| 678 | # Integration — --commit flag |
| 679 | # --------------------------------------------------------------------------- |
| 680 | |
| 681 | |
| 682 | class TestCodemapCommitFlag: |
| 683 | def test_commit_head_is_default(self, code_repo: pathlib.Path) -> None: |
| 684 | data_head = _codemap_json(["--commit", "HEAD"]) |
| 685 | data_default = _codemap_json() |
| 686 | assert data_head["commit"] == data_default["commit"] |
| 687 | |
| 688 | def test_commit_head_minus_1(self, code_repo: pathlib.Path) -> None: |
| 689 | result = runner.invoke(cli, ["code", "codemap", "--commit", "HEAD~1", "--json"]) |
| 690 | assert result.exit_code == 0 |
| 691 | data = json.loads(result.output) |
| 692 | data_head = _codemap_json() |
| 693 | # Historical snapshot commit id must differ from HEAD. |
| 694 | assert data["commit"] != data_head["commit"] |
| 695 | |
| 696 | def test_commit_invalid_ref_exits_nonzero(self, code_repo: pathlib.Path) -> None: |
| 697 | result = runner.invoke(cli, ["code", "codemap", "--commit", "totally_bogus_ref_xyz"]) |
| 698 | assert result.exit_code != 0 |
| 699 | |
| 700 | |
| 701 | # --------------------------------------------------------------------------- |
| 702 | # Integration — empty repo |
| 703 | # --------------------------------------------------------------------------- |
| 704 | |
| 705 | |
| 706 | class TestCodemapEmptyRepo: |
| 707 | def test_no_commits_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 708 | """Repo with no commits: HEAD does not resolve — must exit non-zero.""" |
| 709 | result = runner.invoke(cli, ["code", "codemap"]) |
| 710 | assert result.exit_code != 0 |
| 711 | |
| 712 | |
| 713 | # --------------------------------------------------------------------------- |
| 714 | # E2E — cycle detection |
| 715 | # --------------------------------------------------------------------------- |
| 716 | |
| 717 | |
| 718 | class TestCodemapCycleE2E: |
| 719 | def test_circular_import_detected(self, cycle_repo: pathlib.Path) -> None: |
| 720 | data = _codemap_json() |
| 721 | cycles: list[list[str]] = data["import_cycles"] |
| 722 | assert len(cycles) >= 1 |
| 723 | involved = {node for cycle in cycles for node in cycle} |
| 724 | # alpha.py and beta.py should both appear in the cycle paths. |
| 725 | assert any("alpha" in node for node in involved) |
| 726 | assert any("beta" in node for node in involved) |
| 727 | |
| 728 | def test_acyclic_repo_has_no_cycles(self, multi_file_repo: pathlib.Path) -> None: |
| 729 | data = _codemap_json() |
| 730 | assert data["import_cycles"] == [] |
| 731 | |
| 732 | def test_cycle_paths_are_closed_rings(self, cycle_repo: pathlib.Path) -> None: |
| 733 | data = _codemap_json() |
| 734 | for cycle in data["import_cycles"]: |
| 735 | assert isinstance(cycle, list) |
| 736 | assert len(cycle) >= 2 |
| 737 | assert cycle[0] == cycle[-1], "cycle path must start and end at the same node" |
| 738 | |
| 739 | |
| 740 | # --------------------------------------------------------------------------- |
| 741 | # E2E — agent-safe zones |
| 742 | # --------------------------------------------------------------------------- |
| 743 | |
| 744 | |
| 745 | class TestCodemapAgentSafeZones: |
| 746 | def test_isolated_file_appears_in_agent_safe_zones( |
| 747 | self, multi_file_repo: pathlib.Path |
| 748 | ) -> None: |
| 749 | """standalone.py imports nothing and is imported by nothing.""" |
| 750 | data = _codemap_json() |
| 751 | safe: list[str] = data["agent_safe_zones"] |
| 752 | assert any("standalone" in fp for fp in safe) |
| 753 | |
| 754 | def test_imported_file_not_in_agent_safe_zones( |
| 755 | self, multi_file_repo: pathlib.Path |
| 756 | ) -> None: |
| 757 | """utils.py is imported by models.py and api.py — not isolated.""" |
| 758 | data = _codemap_json() |
| 759 | safe: list[str] = data["agent_safe_zones"] |
| 760 | assert not any("utils" in fp for fp in safe) |
| 761 | |
| 762 | def test_agent_safe_zones_are_sorted(self, multi_file_repo: pathlib.Path) -> None: |
| 763 | data = _codemap_json() |
| 764 | safe: list[str] = data["agent_safe_zones"] |
| 765 | assert safe == sorted(safe) |
| 766 | |
| 767 | |
| 768 | # --------------------------------------------------------------------------- |
| 769 | # E2E — boundary files |
| 770 | # --------------------------------------------------------------------------- |
| 771 | |
| 772 | |
| 773 | class TestCodemapBoundaryFiles: |
| 774 | def test_boundary_entry_has_required_fields(self, multi_file_repo: pathlib.Path) -> None: |
| 775 | data = _codemap_json() |
| 776 | for boundary in data["boundary_files"]: |
| 777 | assert "file" in boundary |
| 778 | assert "fan_out" in boundary |
| 779 | assert "fan_in" in boundary |
| 780 | |
| 781 | def test_boundary_file_fan_in_is_zero(self, multi_file_repo: pathlib.Path) -> None: |
| 782 | data = _codemap_json() |
| 783 | for boundary in data["boundary_files"]: |
| 784 | assert boundary["fan_in"] == 0 |
| 785 | |
| 786 | def test_boundary_file_fan_out_at_least_3(self, multi_file_repo: pathlib.Path) -> None: |
| 787 | data = _codemap_json() |
| 788 | for boundary in data["boundary_files"]: |
| 789 | assert boundary["fan_out"] >= 3 |
| 790 | |
| 791 | |
| 792 | # --------------------------------------------------------------------------- |
| 793 | # Stress — performance and determinism |
| 794 | # --------------------------------------------------------------------------- |
| 795 | |
| 796 | |
| 797 | class TestCodemapStress: |
| 798 | def test_find_cycles_1000_node_linear_chain_no_recursion_error(self) -> None: |
| 799 | depth = 1_000 |
| 800 | nodes = [f"file_{i}.py" for i in range(depth)] |
| 801 | g: _AdjacencyMap = {nodes[i]: [nodes[i + 1]] for i in range(depth - 1)} |
| 802 | g[nodes[-1]] = [] |
| 803 | cycles = _find_cycles(g) |
| 804 | assert cycles == [] |
| 805 | |
| 806 | def test_find_cycles_500_nodes_50_embedded_3_cycles(self) -> None: |
| 807 | """500-node graph with 50 explicit A→B→C→A triangles plus 350 isolated nodes.""" |
| 808 | g: _AdjacencyMap = {} |
| 809 | expected_min = 50 |
| 810 | for i in range(50): |
| 811 | a, b, c = f"a_{i}", f"b_{i}", f"c_{i}" |
| 812 | g[a] = [b] |
| 813 | g[b] = [c] |
| 814 | g[c] = [a] |
| 815 | for i in range(350): |
| 816 | g[f"iso_{i}"] = [] |
| 817 | cycles = _find_cycles(g) |
| 818 | assert len(cycles) >= expected_min |
| 819 | |
| 820 | def test_build_import_graph_large_sym_map_with_duplicates(self) -> None: |
| 821 | """1 000 files each importing the same hub file via 10 duplicate records.""" |
| 822 | hub = "src/hub.py" |
| 823 | sym_map: _SymbolMap = {hub: {}} |
| 824 | for i in range(999): |
| 825 | fp = f"src/module_{i}.py" |
| 826 | tree: SymbolTree = {} |
| 827 | for j in range(10): |
| 828 | rec: SymbolRecord = { |
| 829 | "kind": "import", |
| 830 | "name": "hub", # same bare module — all edges to hub |
| 831 | "qualified_name": f"import::hub_alias_{j}", |
| 832 | "lineno": j + 1, |
| 833 | "end_lineno": j + 1, |
| 834 | "content_id": "", |
| 835 | "body_hash": "", |
| 836 | "signature_id": "", |
| 837 | "metadata_id": "", |
| 838 | "canonical_key": "", |
| 839 | } |
| 840 | tree[f"import::hub_alias_{j}"] = rec |
| 841 | sym_map[fp] = tree |
| 842 | |
| 843 | imports_out, in_degree = _build_import_graph(sym_map) |
| 844 | |
| 845 | assert in_degree[hub] == 999 |
| 846 | for fp in sym_map: |
| 847 | if fp == hub: |
| 848 | continue |
| 849 | assert imports_out[fp].count(hub) == 1, "each file must have exactly one edge to hub" |
| 850 | |
| 851 | def test_repeated_runs_produce_identical_json(self, code_repo: pathlib.Path) -> None: |
| 852 | """Two back-to-back invocations must produce the exact same JSON.""" |
| 853 | result_a = runner.invoke(cli, ["code", "codemap", "--json"]) |
| 854 | result_b = runner.invoke(cli, ["code", "codemap", "--json"]) |
| 855 | assert result_a.exit_code == 0 |
| 856 | assert result_b.exit_code == 0 |
| 857 | _volatile = {"duration_ms", "timestamp"} |
| 858 | da = {k: v for k, v in json.loads(result_a.output).items() if k not in _volatile} |
| 859 | db = {k: v for k, v in json.loads(result_b.output).items() if k not in _volatile} |
| 860 | assert da == db |
| 861 | |
| 862 | def test_codemap_completes_within_reasonable_time( |
| 863 | self, code_repo: pathlib.Path |
| 864 | ) -> None: |
| 865 | """Codemap on a small repo must finish within 10 seconds.""" |
| 866 | start = time.monotonic() |
| 867 | result = runner.invoke(cli, ["code", "codemap", "--json"]) |
| 868 | elapsed = time.monotonic() - start |
| 869 | assert result.exit_code == 0 |
| 870 | assert elapsed < 10.0, f"codemap took {elapsed:.1f}s — too slow" |
| 871 | |
| 872 | |
| 873 | # --------------------------------------------------------------------------- |
| 874 | # Flag registration tests |
| 875 | # --------------------------------------------------------------------------- |
| 876 | |
| 877 | import argparse as _argparse |
| 878 | from muse.cli.commands.codemap import register as _register_codemap |
| 879 | |
| 880 | |
| 881 | def _parse_codemap(*args: str) -> _argparse.Namespace: |
| 882 | root_p = _argparse.ArgumentParser() |
| 883 | subs = root_p.add_subparsers(dest="cmd") |
| 884 | _register_codemap(subs) |
| 885 | return root_p.parse_args(["codemap", *args]) |
| 886 | |
| 887 | |
| 888 | class TestRegisterFlags: |
| 889 | def test_default_json_out_is_false(self) -> None: |
| 890 | ns = _parse_codemap() |
| 891 | assert ns.json_out is False |
| 892 | |
| 893 | def test_json_flag_sets_json_out(self) -> None: |
| 894 | ns = _parse_codemap("--json") |
| 895 | assert ns.json_out is True |
| 896 | |
| 897 | def test_j_shorthand_sets_json_out(self) -> None: |
| 898 | ns = _parse_codemap("-j") |
| 899 | assert ns.json_out is True |
File History
1 commit
sha256:e029dfbf5482b23e127b4793b93953b3e2b5af9ca3aa533e6c6e08a6d1887e17
fixing more failing tests
Human
4 days ago