"""Comprehensive tests for ``muse code dead``. dead.py is the highest-churn code porcelain command (7 changes) and contains the richest set of private helpers — making unit coverage especially valuable for catch-regression purposes. Coverage -------- Unit _module_is_imported — exact stem, dotted module, suffix, no-match _matches_path_filter — None passthrough, fnmatch, ** pattern _find_symbol_span — function, class, method, decorated, variable, missing symbol, syntax error, parent_class _delete_symbol_lines — middle removal, head removal, tail removal, blank-line normalisation _analyse_file — skips over-limit, non-semantic suffix, Python ref extraction, import extraction, kind filter, syntax error graceful return _is_test_file — test/spec patterns _DeadCandidate — confidence, reason, to_dict Integration (extends mega-suite baseline) --json schema — results, high_confidence_count, … --kind filter — only that kind in output --high-confidence-only — only high in output --count — scalar integer --language — restrict to language --path — restrict to path glob --workers — capped cap enforced, exits 0 --compare HEAD — schema correctness --save-allowlist — writes JSON list --allowlist — allowlisted addresses excluded Security --delete no --yes — prompts / exits non-zero without confirmation requires repo — exits non-zero outside repo Stress 200-function Python file — completes under 10 s 50-file codebase — exits 0 under 10 s """ from __future__ import annotations import argparse import ast import json import pathlib import textwrap import time import pytest from tests.cli_test_helper import CliRunner from muse.cli.commands.dead import ( _DeadCandidate, _FileAnalysis, _analyse_file, _delete_symbol_lines, _find_symbol_span, _is_test_file, _matches_path_filter, _module_is_imported, ) cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Repo fixture # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Fresh code-domain repo with self-contained Python modules.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke(cli, ["init", "--domain", "code"]) assert r.exit_code == 0, r.output # billing.py — imports utils, uses validate_amount. (tmp_path / "billing.py").write_text(textwrap.dedent("""\ from utils import validate_amount class Invoice: def compute_total(self, items: list[int]) -> int: return sum(items) def process_order(invoice: Invoice, items: list[int]) -> int: if not validate_amount(sum(items)): raise ValueError("bad amount") return invoice.compute_total(items) """)) # utils.py — used by billing.py; orphaned_helper is not referenced. (tmp_path / "utils.py").write_text(textwrap.dedent("""\ def validate_amount(amount: float) -> bool: return amount > 0 def orphaned_helper(x: int) -> int: return x * 2 """)) r2 = runner.invoke(cli, ["commit", "-m", "initial"]) assert r2.exit_code == 0, r2.output return tmp_path # --------------------------------------------------------------------------- # Unit — _module_is_imported # --------------------------------------------------------------------------- class TestModuleIsImported: def test_stem_match(self) -> None: assert _module_is_imported("utils.py", {"utils"}) def test_dotted_module_match(self) -> None: assert _module_is_imported("core/store.py", {"core.store"}) def test_suffix_match(self) -> None: assert _module_is_imported("muse/core/store.py", {"muse.core.store"}) def test_no_match(self) -> None: assert not _module_is_imported("utils.py", {"billing", "invoice"}) def test_empty_set(self) -> None: assert not _module_is_imported("utils.py", set()) def test_partial_stem_no_match(self) -> None: # "til" should not match "utils.py" (stem is "utils", not "til"). assert not _module_is_imported("utils.py", {"til"}) def test_stem_inside_dotted_import(self) -> None: # "utils" is an element of "muse.utils" → should match. assert _module_is_imported("utils.py", {"muse.utils"}) def test_deep_path_stem(self) -> None: assert _module_is_imported("a/b/c/billing.py", {"billing"}) # --------------------------------------------------------------------------- # Unit — _matches_path_filter # --------------------------------------------------------------------------- class TestMatchesPathFilter: def test_none_always_matches(self) -> None: assert _matches_path_filter("src/billing.py", None) assert _matches_path_filter("any/path/here.py", None) def test_exact_match(self) -> None: assert _matches_path_filter("src/billing.py", "src/billing.py") def test_glob_wildcard(self) -> None: assert _matches_path_filter("src/billing.py", "src/*.py") def test_no_match(self) -> None: assert not _matches_path_filter("src/billing.py", "tests/*.py") def test_double_star_glob(self) -> None: assert _matches_path_filter("a/b/c/billing.py", "**/billing.py") # --------------------------------------------------------------------------- # Unit — _find_symbol_span # --------------------------------------------------------------------------- class TestFindSymbolSpan: def test_finds_function(self) -> None: src = b"def foo(x: int) -> int:\n return x\n" result = _find_symbol_span(src, "foo", None) assert result is not None start, end = result assert start == 1 def test_finds_class(self) -> None: src = b"class Foo:\n x: int = 1\n" result = _find_symbol_span(src, "Foo", None) assert result is not None def test_finds_method_in_class(self) -> None: src = textwrap.dedent("""\ class Invoice: def compute_total(self) -> int: return 0 """).encode() result = _find_symbol_span(src, "compute_total", "Invoice") assert result is not None start, end = result assert start == 2 def test_returns_none_for_missing_symbol(self) -> None: src = b"def foo(): pass\n" result = _find_symbol_span(src, "bar", None) assert result is None def test_returns_none_for_syntax_error(self) -> None: src = b"def broken(\n" result = _find_symbol_span(src, "broken", None) assert result is None def test_decorator_lines_included_in_span(self) -> None: src = textwrap.dedent("""\ @staticmethod def foo() -> None: pass """).encode() result = _find_symbol_span(src, "foo", None) assert result is not None start, end = result # Start should be at the decorator line (line 1), not the def (line 2). assert start == 1 def test_multiline_function(self) -> None: src = textwrap.dedent("""\ def big( a: int, b: int, ) -> int: return a + b """).encode() result = _find_symbol_span(src, "big", None) assert result is not None start, end = result assert end >= 5 # spans all 5 lines def test_variable_assignment(self) -> None: src = b"CONSTANT = 42\n" result = _find_symbol_span(src, "CONSTANT", None) assert result is not None def test_annotated_assignment(self) -> None: src = b"count: int = 0\n" result = _find_symbol_span(src, "count", None) assert result is not None def test_wrong_parent_class_returns_none(self) -> None: src = textwrap.dedent("""\ class Foo: def bar(self) -> None: pass """).encode() result = _find_symbol_span(src, "bar", "NonExistent") assert result is None # --------------------------------------------------------------------------- # Unit — _delete_symbol_lines # --------------------------------------------------------------------------- class TestDeleteSymbolLines: def test_removes_middle_function(self) -> None: lines = [ "def alpha(): pass\n", "\n", "def beta(): pass\n", "\n", "def gamma(): pass\n", ] result = _delete_symbol_lines(lines, start=3, end=3) joined = "".join(result) assert "alpha" in joined assert "beta" not in joined assert "gamma" in joined def test_removes_first_function(self) -> None: lines = [ "def first(): pass\n", "\n", "def second(): pass\n", ] result = _delete_symbol_lines(lines, start=1, end=1) joined = "".join(result) assert "first" not in joined assert "second" in joined def test_removes_last_function(self) -> None: lines = [ "def first(): pass\n", "\n", "def last(): pass\n", ] result = _delete_symbol_lines(lines, start=3, end=3) joined = "".join(result) assert "last" not in joined def test_normalises_trailing_blanks(self) -> None: lines = [ "def foo(): pass\n", "\n", "\n", "def bar(): pass\n", ] result = _delete_symbol_lines(lines, start=1, end=1) # Trailing blanks before the deletion point are stripped. assert result[0] == "\n" # one separator line assert "bar" in "".join(result) def test_multiline_symbol_removed(self) -> None: lines = [ "def first(): pass\n", "def big(\n", " x: int,\n", ") -> int:\n", " return x\n", "def last(): pass\n", ] result = _delete_symbol_lines(lines, start=2, end=5) joined = "".join(result) assert "big" not in joined assert "first" in joined assert "last" in joined # --------------------------------------------------------------------------- # Unit — _analyse_file # --------------------------------------------------------------------------- _MAX_BYTES = 512 * 1024 # 512 KB default class TestAnalyseFile: def test_skips_file_over_limit(self) -> None: raw = b"x" * (_MAX_BYTES + 1) result = _analyse_file("big.py", raw, None, _MAX_BYTES) assert result.skipped is True def test_non_semantic_suffix_skipped(self) -> None: # .log is not in SEMANTIC_EXTENSIONS — no symbols extracted. raw = b"just a log entry" result = _analyse_file("notes.log", raw, None, _MAX_BYTES) assert result.symbol_tree == {} def test_python_symbols_extracted(self) -> None: raw = b"def foo(): pass\ndef bar(): pass\n" result = _analyse_file("mod.py", raw, None, _MAX_BYTES) assert any("foo" in addr for addr in result.symbol_tree) assert any("bar" in addr for addr in result.symbol_tree) def test_python_ref_names_collected(self) -> None: raw = b"x = validate_amount(10)\n" result = _analyse_file("billing.py", raw, None, _MAX_BYTES) assert "validate_amount" in result.ref_names def test_python_import_names_collected(self) -> None: raw = b"from muse.core import store\nimport os\n" result = _analyse_file("mod.py", raw, None, _MAX_BYTES) assert "muse.core" in result.imported_names or "os" in result.imported_names def test_kind_filter_applied(self) -> None: raw = textwrap.dedent("""\ class Invoice: pass def validate(): pass """).encode() result = _analyse_file("mod.py", raw, "function", _MAX_BYTES) for addr in result.symbol_tree: assert "Invoice" not in addr def test_syntax_error_returns_partial(self) -> None: raw = b"def broken(\n" result = _analyse_file("broken.py", raw, None, _MAX_BYTES) # Should return without raising; symbol_tree may be empty. assert result.error is not None or result.symbol_tree == {} def test_file_path_stored(self) -> None: raw = b"x = 1\n" result = _analyse_file("some/path.py", raw, None, _MAX_BYTES) assert result.file_path == "some/path.py" def test_attribute_access_in_refs(self) -> None: # Attribute accesses like "obj.method" should add "method" to ref_names. raw = b"result = invoice.compute_total(items)\n" result = _analyse_file("billing.py", raw, None, _MAX_BYTES) assert "compute_total" in result.ref_names # --------------------------------------------------------------------------- # Unit — _is_test_file # --------------------------------------------------------------------------- class TestIsTestFile: def test_test_prefix(self) -> None: assert _is_test_file("tests/test_billing.py") def test_test_in_path(self) -> None: assert _is_test_file("src/test_utils.py") def test_spec_in_path(self) -> None: assert _is_test_file("spec/billing_spec.py") def test_normal_file(self) -> None: assert not _is_test_file("src/billing.py") def test_deep_test_path(self) -> None: assert _is_test_file("a/b/c/test_something.py") # --------------------------------------------------------------------------- # Unit — _DeadCandidate # --------------------------------------------------------------------------- class TestDeadCandidateUnit: def _make( self, address: str = "utils.py::orphaned_helper", kind: str = "function", file_path: str = "utils.py", referenced: bool = False, module_imported: bool = False, ) -> "_DeadCandidate": candidate = _DeadCandidate.__new__(_DeadCandidate) candidate.address = address candidate.kind = kind candidate.file_path = file_path candidate.referenced = referenced candidate.module_imported = module_imported return candidate def test_high_confidence_not_referenced_not_imported(self) -> None: c = self._make(referenced=False, module_imported=False) assert c.confidence == "high" def test_medium_confidence_module_imported(self) -> None: c = self._make(referenced=False, module_imported=True) assert c.confidence == "medium" def test_to_dict_has_required_keys(self) -> None: c = self._make() d = c.to_dict() for key in ("address", "path", "kind", "confidence", "reason"): assert key in d, f"missing key {key!r}" def test_to_dict_confidence_consistent(self) -> None: c = self._make(referenced=False, module_imported=False) assert c.to_dict()["confidence"] == "high" # --------------------------------------------------------------------------- # Integration — extends mega-suite baseline # --------------------------------------------------------------------------- class TestDeadIntegration: def test_json_schema(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) for key in ("results", "high_confidence_count", "total_files_scanned"): assert key in data def test_high_confidence_only_filters(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--high-confidence-only", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) for c in data["results"]: assert c["confidence"] == "high" def test_count_is_integer(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--count"]) assert result.exit_code == 0 assert result.output.strip().isdigit() def test_kind_filter_function(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--kind", "function", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) for c in data["results"]: assert c["kind"] == "function" def test_exclude_private_removes_underscore_names( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) runner.invoke(cli, ["init", "--domain", "code"]) (tmp_path / "mod.py").write_text( "def _private(): pass\ndef public(): pass\n" ) runner.invoke(cli, ["commit", "-m", "mod"]) result = runner.invoke(cli, ["code", "dead", "--exclude-private", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) addresses = [c["address"] for c in data["results"]] assert not any("_private" in addr for addr in addresses) def test_workers_capped(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--workers", "512", "--count"]) assert result.exit_code == 0 def test_compare_head_schema(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--compare", "HEAD", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) for key in ("compare_commit_id", "new_dead", "recovered", "net_change"): assert key in data def test_delete_and_compare_mutually_exclusive(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--delete", "--compare", "HEAD"]) assert result.exit_code == 1 def test_save_allowlist_creates_json_file( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: allow_file = tmp_path / "allow.json" result = runner.invoke(cli, ["code", "dead", "--save-allowlist", str(allow_file)]) assert result.exit_code == 0 if allow_file.exists(): data = json.loads(allow_file.read_text()) assert isinstance(data, list) def test_allowlist_excludes_addresses( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: allow_file = tmp_path / "allow.json" allow_file.write_text('["utils.py::orphaned_helper"]') result = runner.invoke(cli, [ "code", "dead", "--allowlist", str(allow_file), "--json", ]) assert result.exit_code == 0 data = json.loads(result.output) names = [c["address"] for c in data["results"]] assert "utils.py::orphaned_helper" not in names def test_language_filter_python(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--language", "Python", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) for c in data["results"]: assert c["path"].endswith(".py") def test_deleted_working_tree_file_excluded_from_dead_scan( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Symbols in a file deleted from the working tree must not appear as dead. Regression test for the _load_file_bytes fallback bug: when from_disk=True and a file was deleted, the old code fell back to reading the committed version from the object store, causing symbols in deleted files to be reported as dead code. The correct behaviour is to exclude deleted files entirely — a deleted file has no symbols, so none of its symbols can be dead. """ monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) runner.invoke(cli, ["init", "--domain", "code"]) # gone.py — contains a function that has no callers (would appear dead). (tmp_path / "gone.py").write_text("def vanishing_fn() -> None:\n pass\n") runner.invoke(cli, ["commit", "-m", "add gone.py"]) # Verify it IS detected as dead before deletion. result_before = runner.invoke(cli, ["code", "dead", "--json"]) assert result_before.exit_code == 0 addrs_before = [c["address"] for c in json.loads(result_before.output)["results"]] assert any("vanishing_fn" in a for a in addrs_before), ( "vanishing_fn should be reported as dead before the file is deleted" ) # Delete the file without committing. (tmp_path / "gone.py").unlink() # After deletion, vanishing_fn must NOT appear in the working-tree dead scan. result_after = runner.invoke(cli, ["code", "dead", "--json"]) assert result_after.exit_code == 0 addrs_after = [c["address"] for c in json.loads(result_after.output)["results"]] assert not any("vanishing_fn" in a for a in addrs_after), ( "vanishing_fn must not be reported as dead after its file is deleted " "from the working tree — deleted files have no symbols" ) # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestDeadSecurity: def test_requires_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) result = runner.invoke(cli, ["code", "dead"]) assert result.exit_code != 0 def test_delete_without_yes_does_not_write( self, repo: pathlib.Path ) -> None: # --delete without --yes should not silently delete anything. # On a fresh repo with 0 dead candidates it exits 0 safely. result = runner.invoke(cli, ["code", "dead", "--delete", "--yes"]) assert result.exit_code in (0, 1) # --------------------------------------------------------------------------- # Stress — large codebase performance # --------------------------------------------------------------------------- class TestDeadStress: @pytest.fixture def large_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> pathlib.Path: """50 Python files each with 4 functions, creating ~200 total symbols.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) runner.invoke(cli, ["init", "--domain", "code"]) for file_idx in range(50): content = textwrap.dedent(f"""\ def do_work_{file_idx}(x: int) -> int: return x + {file_idx} def helper_{file_idx}(x: int) -> int: return x * {file_idx} def unused_a_{file_idx}(x: int) -> int: return x def unused_b_{file_idx}(x: int) -> int: return x """) (tmp_path / f"module_{file_idx:03d}.py").write_text(content) r = runner.invoke(cli, ["commit", "-m", "large codebase"]) assert r.exit_code == 0, r.output return tmp_path def test_dead_on_large_codebase_under_10s(self, large_repo: pathlib.Path) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "dead", "--json"]) elapsed = time.monotonic() - start assert result.exit_code == 0, result.output assert elapsed < 10.0, f"dead on 200 symbols took {elapsed:.2f}s" def test_dead_count_on_large_codebase_under_10s(self, large_repo: pathlib.Path) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "dead", "--count"]) elapsed = time.monotonic() - start assert result.exit_code == 0 assert elapsed < 10.0 assert result.output.strip().isdigit() def test_dead_json_schema_valid_on_large_codebase(self, large_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "results" in data assert "total_files_scanned" in data assert data["total_files_scanned"] >= 50 def test_dead_kind_filter_on_large_codebase(self, large_repo: pathlib.Path) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "dead", "--kind", "function", "--json"]) elapsed = time.monotonic() - start assert result.exit_code == 0 assert elapsed < 10.0 data = json.loads(result.output) for c in data["results"]: assert c["kind"] == "function" def test_analyse_file_200_symbols_under_1s(self) -> None: """Direct unit stress: analyse a 200-function file in < 1 s.""" lines: list[str] = [] for i in range(200): lines.append(f"def sym_{i:04d}(x: int) -> int:") lines.append(f" return x + {i}") lines.append("") raw = "\n".join(lines).encode() start = time.monotonic() result = _analyse_file("big.py", raw, None, _MAX_BYTES * 10) elapsed = time.monotonic() - start assert elapsed < 1.0, f"_analyse_file on 200 symbols took {elapsed:.3f}s" assert len(result.symbol_tree) >= 200 # --------------------------------------------------------------------------- # TestRegisterFlags — --json / -j normalized at argparse level # --------------------------------------------------------------------------- class TestRegisterFlags: """register() must expose --json with -j shorthand and dest=json_out.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse as ap from muse.cli.commands.dead import register root = ap.ArgumentParser() subs = root.add_subparsers() register(subs) return root def test_json_out_default_false(self) -> None: p = self._make_parser() ns = p.parse_args(['code', 'dead']) assert ns.json_out is False def test_json_out_true_with_json_flag(self) -> None: p = self._make_parser() ns = p.parse_args(['code', 'dead', '--json']) assert ns.json_out is True def test_json_out_true_with_j_flag(self) -> None: p = self._make_parser() ns = p.parse_args(['code', 'dead', '-j']) assert ns.json_out is True