gabriel / muse public
dead.py python
1,201 lines 49.1 KB
Raw
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40 docs: add | jq convention to --json section of agent-guide Sonnet 4.6 1 day ago
1 """muse code dead — dead code detection.
2
3 Finds symbols that are **never referenced** and whose containing module is
4 **never imported** by anything else in the codebase.
5
6 A symbol is a dead-code candidate when two independent conditions hold:
7
8 1. **No reference**: its bare name does not appear as any ``ast.Name`` id
9 or ``ast.Attribute`` attr anywhere in the codebase. This is broader
10 than call-site detection — it catches attribute accesses, keyword
11 argument values, type annotations, ``isinstance`` checks, and every
12 other form of name usage, not just direct calls.
13
14 2. **No import**: its containing file's module name does not appear in
15 any ``import``-kind symbol in any other file.
16
17 Both conditions must hold simultaneously. A function that is never
18 referenced but lives in a module that *is* imported is still reachable —
19 it may be part of an exported API even if it's not used internally.
20
21 Performance
22 -----------
23 All files are processed in a **single parallel pass** and AST-parsed
24 exactly once. Without ``--commit`` the working tree is read from disk,
25 so uncommitted changes are immediately visible. With ``--commit`` the
26 specified historical snapshot is read from the object store.
27 Imports, references, and symbol trees are all extracted in the same
28 pass, then combined. ``--workers`` controls the thread-pool size.
29
30 Security
31 --------
32 ``ast.parse`` never executes code. Files exceeding ``--max-file-bytes``
33 (default 512 KB) are skipped to prevent stalls on generated or minified
34 files. ``--delete`` validates every file path inside the repo root before
35 touching the working tree.
36
37 Known limitations
38 -----------------
39 - Symbols whose names are extremely common (e.g. ``run``, ``name``) may
40 appear as false negatives because a matching name exists somewhere else.
41 - Exported APIs: symbols accessed from outside the repo (library code)
42 appear dead because the callers are not in the snapshot.
43 - Entry points: ``main()``, CLI callbacks, and test functions appear dead
44 by design. Use ``--exclude-tests`` to hide test file symbols.
45 - tree-sitter languages: reference extraction is Python-only. Symbols in
46 Go/Rust/TypeScript files are checked for import-graph reachability only.
47 - ``--delete`` is Python-only (requires AST line-range information).
48
49 Usage::
50
51 muse code dead
52 muse code dead --kind function
53 muse code dead --exclude-tests
54 muse code dead --exclude-private
55 muse code dead --high-confidence-only
56 muse code dead --path "musehub/services/*"
57 muse code dead --language Python
58 muse code dead --top 50
59 muse code dead --group-by-file
60 muse code dead --commit HEAD~10
61 muse code dead --workers 8
62 muse code dead --json
63 muse code dead --delete
64 muse code dead --delete --yes
65 muse code dead --allowlist .muse/dead-allowlist.json
66
67 Confidence levels::
68
69 HIGH — not referenced AND module not imported → almost certainly dead
70 MEDIUM — not referenced, but module IS imported → may be exported API surface
71
72 Flags:
73
74 ``--kind KIND, -k KIND``
75 Restrict to symbols of a specific kind (function, class, method, …).
76
77 ``--exclude-tests``
78 Exclude symbols in files whose path contains ``test`` or ``spec``.
79
80 ``--exclude-private``
81 Exclude symbols whose bare name starts with ``_``.
82
83 ``--high-confidence-only``
84 Show only HIGH confidence candidates (module not imported).
85
86 ``--path GLOB, -p GLOB``
87 Restrict to files matching this glob pattern (e.g. ``"musehub/services/*"``).
88
89 ``--language LANG, -l LANG``
90 Restrict to files of a specific language (e.g. ``Python``, ``TypeScript``).
91
92 ``--top N``
93 Show only the top N candidates.
94
95 ``--group-by-file, -g``
96 Group output by file instead of a flat sorted list.
97
98 ``--commit REF, -c REF``
99 Analyse a historical snapshot instead of HEAD.
100
101 ``--workers N, -w N``
102 Number of parallel worker threads for file parsing (default: 8).
103
104 ``--max-file-bytes N``
105 Skip files larger than N bytes (default: 524288 = 512 KB).
106
107 ``--no-color``
108 Disable ANSI color output.
109
110 ``--json``
111 Emit results as JSON.
112
113 ``--delete``
114 Interactively delete dead symbols from the working tree (Python only).
115 Prompts for each candidate unless ``--yes`` is also given.
116
117 ``--yes, -y``
118 Skip confirmation prompts when used with ``--delete``.
119
120 ``--allowlist FILE``
121 JSON file containing a list of symbol addresses to suppress from output.
122 Addresses are matched as exact strings against the ``address`` field.
123 Example file: ``[\"muse/cli/config.py::MuseConfig\"]``
124 """
125
126 import argparse
127 import ast
128 import fnmatch
129 import json
130 import logging
131 import os
132 import pathlib
133 import sys
134 from concurrent.futures import ThreadPoolExecutor, as_completed
135 from dataclasses import dataclass, field
136 from typing import TypedDict
137
138 from muse.core.errors import ExitCode
139 from muse.core.object_store import read_object
140 from muse.core.repo import require_repo
141 from muse.core.types import Manifest
142 from muse.core.paths import dead_allowlist_path as _dead_allowlist_path
143 from muse.core.refs import read_current_branch
144 from muse.core.commits import (
145 CommitRecord,
146 resolve_commit_ref,
147 )
148 from muse.core.snapshots import get_commit_snapshot_manifest
149 from muse.plugins.code._framework import ImplicitEdgeGraph, build_implicit_edge_graph
150 from muse.plugins.code._query import language_of
151 from muse.plugins.code.ast_parser import SEMANTIC_EXTENSIONS, SymbolTree, parse_symbols
152
153 type _BlobMap = dict[str, bytes]
154 type _KindCountMap = dict[str, int]
155 type _DeadByFile = dict[str, list["_DeadCandidate"]]
156 from muse.core.validation import MAX_AST_BYTES, clamp_int, sanitize_display
157 from muse.core.envelope import EnvelopeJson, make_envelope
158 from muse.core.timing import start_timer
159
160 logger = logging.getLogger(__name__)
161
162 class _DeadCandidateJson(TypedDict):
163 """JSON-serialisable representation of one dead-code candidate."""
164
165 address: str
166 path: str
167 kind: str
168 referenced: bool
169 module_imported: bool
170 confidence: str
171 reason: str
172
173 class _DeadPayload(EnvelopeJson, total=False):
174 """JSON output for ``muse code dead``."""
175
176 source: str
177 total_files_scanned: int
178 total_symbols_scanned: int
179 high_confidence_count: int
180 medium_confidence_count: int
181 results: list[_DeadCandidateJson]
182 compare_commit_id: str
183 new_dead: list[_DeadCandidateJson]
184 recovered: list[_DeadCandidateJson]
185 net_change: int
186
187 class _ScanKwargs(TypedDict):
188 """Keyword arguments forwarded to every :func:`_scan_file_bytes` call.
189
190 Collected into a TypedDict so the ``**scan_kwargs`` spread is type-safe
191 without a ``# type: ignore`` and the common args are defined once.
192 """
193
194 kind_filter: str | None
195 max_file_bytes: int
196 workers: int
197 language_filter: str | None
198 path_filter: str | None
199 exclude_tests: bool
200 exclude_private: bool
201 high_confidence_only: bool
202 allowlist: frozenset[str]
203
204 _PY_SUFFIXES: frozenset[str] = frozenset({".py", ".pyi"})
205 _MAX_WORKERS: int = 64
206 _MIN_FILE_BYTES: int = 4_096
207
208 # Maximum file size we'll parse (512 KB). Prevents stalling on generated files.
209 _DEFAULT_MAX_FILE_BYTES: int = 524_288
210
211 # ── ANSI colours ──────────────────────────────────────────────────────────────
212
213 _RESET = "\033[0m"
214 _BOLD = "\033[1m"
215 _DIM = "\033[2m"
216 _RED = "\033[31m"
217 _YELLOW = "\033[33m"
218 _CYAN = "\033[36m"
219 _GREEN = "\033[32m"
220 _BLUE = "\033[34m"
221 _MAGENTA = "\033[35m"
222 _WHITE = "\033[37m"
223 _GRAY = "\033[90m"
224
225 def _c(text: str, *codes: str, use_color: bool = True) -> str:
226 """Wrap *text* with ANSI escape codes if *use_color* is True."""
227 if not use_color:
228 return text
229 return "".join(codes) + text + _RESET
230
231 # ── Data structures ───────────────────────────────────────────────────────────
232
233 @dataclass
234 class _FileAnalysis:
235 """Everything extracted from a single file in one pass."""
236 file_path: str
237 lang: str
238 symbol_tree: SymbolTree = field(default_factory=dict)
239 # Every name referenced anywhere in the file (ast.Name ids + ast.Attribute attrs).
240 # This is broader than call-sites: catches attribute access, keyword args,
241 # type annotations, isinstance checks, decorator names, etc.
242 ref_names: set[str] = field(default_factory=set)
243 # Imported module/name strings (from import-kind symbols)
244 imported_names: set[str] = field(default_factory=set)
245 skipped: bool = False
246 error: str | None = None
247
248 @dataclass
249 class _DeadCandidate:
250 address: str
251 file_path: str
252 kind: str
253 referenced: bool
254 module_imported: bool
255
256 @property
257 def confidence(self) -> str:
258 return "high" if not self.module_imported else "medium"
259
260 @property
261 def reason(self) -> str:
262 if not self.referenced and not self.module_imported:
263 return "not referenced, module not imported"
264 return "not referenced (module imported — may be exported API)"
265
266 def to_dict(self) -> _DeadCandidateJson:
267 return _DeadCandidateJson(
268 address=self.address,
269 path=self.file_path,
270 kind=self.kind,
271 referenced=self.referenced,
272 module_imported=self.module_imported,
273 confidence=self.confidence,
274 reason=self.reason,
275 )
276
277 # ── Single-pass file analysis ─────────────────────────────────────────────────
278
279 def _analyse_file(
280 file_path: str,
281 raw: bytes,
282 kind_filter: str | None,
283 max_file_bytes: int,
284 ) -> _FileAnalysis:
285 """Parse and extract symbols + references + imports from one file.
286
287 Thread-safe: pure functions only, no shared mutable state.
288 The caller is responsible for supplying the raw file bytes — either read
289 from disk (working tree) or fetched from the object store (historical commit).
290 """
291 lang = language_of(file_path)
292 result = _FileAnalysis(file_path=file_path, lang=lang)
293
294 if len(raw) > max_file_bytes:
295 result.skipped = True
296 return result
297
298 suffix = pathlib.PurePosixPath(file_path).suffix.lower()
299 if suffix not in SEMANTIC_EXTENSIONS:
300 return result
301
302 # ── Symbol extraction (all languages with AST support) ─────────────────
303 try:
304 tree = parse_symbols(raw, file_path)
305 except Exception as exc: # noqa: BLE001
306 result.error = str(exc)
307 return result
308
309 for rec in tree.values():
310 if rec["kind"] == "import":
311 result.imported_names.add(rec["qualified_name"])
312
313 if kind_filter:
314 tree = {addr: rec for addr, rec in tree.items() if rec["kind"] == kind_filter}
315 result.symbol_tree = tree
316
317 # ── Reference + module-import extraction (Python only via stdlib ast) ─────
318 # We walk ALL nodes once:
319 #
320 # ast.Name / ast.Attribute — broad reference tracking (fixes logger,
321 # func=run keyword args, property access, isinstance args, annotations)
322 #
323 # ast.ImportFrom / ast.Import — extract the actual dotted module paths
324 # ("from muse.core.store import X" → "muse.core.store"). The Muse
325 # symbol tree stores imports as "import::symbolname" with no module
326 # path, so we must supplement it here to make _module_is_imported work.
327 if suffix in _PY_SUFFIXES:
328 try:
329 if len(raw) > MAX_AST_BYTES:
330 return result
331 py_tree = ast.parse(raw)
332 except SyntaxError:
333 return result
334 for node in ast.walk(py_tree):
335 if isinstance(node, ast.Name):
336 result.ref_names.add(node.id)
337 elif isinstance(node, ast.Attribute):
338 result.ref_names.add(node.attr)
339 elif isinstance(node, ast.ImportFrom):
340 if node.module:
341 result.imported_names.add(node.module)
342 elif isinstance(node, ast.Import):
343 for alias in node.names:
344 result.imported_names.add(alias.name)
345
346 return result
347
348 # ── Module-import matching ────────────────────────────────────────────────────
349
350 def _module_is_imported(file_path: str, imported_names: set[str]) -> bool:
351 """Return True if *file_path*'s module name appears anywhere in *imported_names*."""
352 stem = pathlib.PurePosixPath(file_path).stem
353 module = pathlib.PurePosixPath(file_path).with_suffix("").as_posix().replace("/", ".")
354 for imp in imported_names:
355 if (
356 imp == stem
357 or imp == module
358 or imp.endswith(f".{stem}")
359 or imp.endswith(f".{module}")
360 or stem in imp.split(".")
361 ):
362 return True
363 return False
364
365 # ── Path filter ───────────────────────────────────────────────────────────────
366
367 def _matches_path_filter(file_path: str, pattern: str | None) -> bool:
368 if pattern is None:
369 return True
370 return fnmatch.fnmatch(file_path, pattern) or fnmatch.fnmatch(file_path, f"**/{pattern}")
371
372 # ── Symbol deletion (Python only) ─────────────────────────────────────────────
373
374 def _find_symbol_span(source: bytes, bare_name: str, parent_class: str | None) -> tuple[int, int] | None:
375 """Return (start_lineno, end_lineno) 1-indexed for the named symbol.
376
377 Accounts for decorator lines (start is the first decorator's line).
378 Returns None if the symbol cannot be located.
379 """
380 try:
381 if len(source) > MAX_AST_BYTES:
382 return None
383 tree = ast.parse(source)
384 except SyntaxError:
385 return None
386
387 search_body: list[ast.stmt] = tree.body
388 if parent_class:
389 for node in ast.walk(tree):
390 if isinstance(node, ast.ClassDef) and node.name == parent_class:
391 search_body = list(node.body)
392 break
393
394 for node in search_body:
395 node_name: str | None = None
396 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
397 node_name = node.name
398 elif isinstance(node, ast.Assign):
399 if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
400 node_name = node.targets[0].id
401 elif isinstance(node, ast.AnnAssign):
402 if isinstance(node.target, ast.Name):
403 node_name = node.target.id
404
405 if node_name != bare_name:
406 continue
407
408 if not hasattr(node, "end_lineno") or node.end_lineno is None:
409 return None
410 end: int = node.end_lineno
411 start: int = node.lineno
412 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
413 if node.decorator_list:
414 start = node.decorator_list[0].lineno
415 return (start, end)
416
417 return None
418
419 def _delete_symbol_lines(source_lines: list[str], start: int, end: int) -> list[str]:
420 """Remove lines *start*–*end* (1-indexed, inclusive) and normalise blank lines."""
421 before = list(source_lines[: start - 1])
422 after = list(source_lines[end:])
423
424 # Strip trailing blank lines from the block before the deletion point.
425 while before and not before[-1].strip():
426 before.pop()
427
428 # Add one blank separator line if there is still content below.
429 if after and any(line.strip() for line in after):
430 return before + ["\n"] + after
431 return before + after
432
433 # ── Repo ID ───────────────────────────────────────────────────────────────────
434
435 # ── Allowlist ─────────────────────────────────────────────────────────────────
436
437 def _load_allowlist(path: str | None) -> frozenset[str]:
438 """Load a JSON list of symbol addresses that should be suppressed."""
439 if path is None:
440 return frozenset()
441 try:
442 raw = pathlib.Path(path).read_text(encoding="utf-8")
443 parsed = json.loads(raw)
444 if not isinstance(parsed, list):
445 logger.warning("dead-code allowlist must be a JSON array; ignoring %s", path)
446 return frozenset()
447 return frozenset(str(x) for x in parsed)
448 except (OSError, json.JSONDecodeError) as exc:
449 logger.warning("Could not load allowlist %s: %s", path, exc)
450 return frozenset()
451
452 # ── Shared scan pipeline ──────────────────────────────────────────────────────
453
454 def _load_file_bytes(
455 root: pathlib.Path,
456 manifest: Manifest,
457 from_disk: bool,
458 ) -> _BlobMap:
459 """Build the ``file_path → bytes`` map for the scan.
460
461 When *from_disk* is True, read each file from the working tree. Files
462 deleted from the working tree are excluded entirely — a deleted file has
463 no symbols, so its symbols cannot be dead. When False, read exclusively
464 from the object store (historical snapshot).
465 """
466 result: _BlobMap = {}
467 for fp, oid in manifest.items():
468 if from_disk:
469 try:
470 result[fp] = (root / fp).read_bytes()
471 except OSError:
472 pass # File deleted from working tree — exclude from scan.
473 else:
474 raw = read_object(root, oid)
475 if raw is not None:
476 result[fp] = raw
477 return result
478
479 def _scan_file_bytes(
480 file_bytes: _BlobMap,
481 kind_filter: str | None,
482 max_file_bytes: int,
483 workers: int,
484 language_filter: str | None,
485 path_filter: str | None,
486 exclude_tests: bool,
487 exclude_private: bool,
488 high_confidence_only: bool,
489 allowlist: frozenset[str],
490 entry_point_addresses: frozenset[str] = frozenset(),
491 ) -> tuple[list[_DeadCandidate], int, float, int, int]:
492 """Full dead-code analysis pipeline.
493
494 Args:
495 file_bytes: Map of ``file_path → raw bytes`` to analyse.
496 kind_filter: Restrict to symbols of this kind, or ``None``.
497 max_file_bytes: Skip files larger than this many bytes.
498 workers: Number of parallel parse threads.
499 language_filter: Restrict to this language name, or ``None``.
500 path_filter: Glob pattern for file path restriction.
501 exclude_tests: When ``True``, skip test files.
502 exclude_private: When ``True``, skip ``_private`` symbols.
503 high_confidence_only: When ``True``, only return high-confidence hits.
504 allowlist: Set of symbol addresses to suppress.
505 entry_point_addresses: Addresses of framework-wired entry points.
506 These are *never* reported as dead code because
507 they are externally reachable via the framework
508 even though no user code calls them explicitly.
509
510 Returns:
511 ``(candidates, scanned_symbols, duration_ms, skipped, errors)``.
512 """
513 elapsed = start_timer()
514 analyses: list[_FileAnalysis] = []
515
516 with ThreadPoolExecutor(max_workers=workers) as pool:
517 futures = {
518 pool.submit(_analyse_file, fp, raw, kind_filter, max_file_bytes): fp
519 for fp, raw in file_bytes.items()
520 }
521 for future in as_completed(futures):
522 analyses.append(future.result())
523
524 all_ref_names: set[str] = set()
525 all_imported_names: set[str] = set()
526 for a in analyses:
527 all_ref_names.update(a.ref_names)
528 all_imported_names.update(a.imported_names)
529
530 candidates: list[_DeadCandidate] = []
531 scanned_symbols = 0
532
533 for analysis in sorted(analyses, key=lambda a: a.file_path):
534 if analysis.skipped or analysis.error:
535 continue
536 if not _matches_path_filter(analysis.file_path, path_filter):
537 continue
538 if language_filter and analysis.lang != language_filter:
539 continue
540 if exclude_tests and _is_test_file(analysis.file_path):
541 continue
542
543 mod_imported = _module_is_imported(analysis.file_path, all_imported_names)
544
545 for address, rec in sorted(analysis.symbol_tree.items()):
546 if rec["kind"] == "import":
547 continue
548 scanned_symbols += 1
549 bare_name = rec["name"].split(".")[-1]
550 if exclude_private and bare_name.startswith("_"):
551 continue
552 if address in allowlist:
553 continue
554 if bare_name in all_ref_names:
555 continue
556 if address in entry_point_addresses:
557 continue
558 cand = _DeadCandidate(
559 address=address,
560 file_path=analysis.file_path,
561 kind=rec["kind"],
562 referenced=False,
563 module_imported=mod_imported,
564 )
565 if high_confidence_only and cand.confidence != "high":
566 continue
567 candidates.append(cand)
568
569 candidates.sort(key=lambda c: (c.confidence != "high", c.file_path, c.address))
570
571 skipped = sum(1 for a in analyses if a.skipped)
572 errors = sum(1 for a in analyses if a.error)
573
574 return candidates, scanned_symbols, elapsed, skipped, errors
575
576 # ── CLI registration ──────────────────────────────────────────────────────────
577
578 def _get_code_subs(
579 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
580 ) -> "argparse._SubParsersAction[argparse.ArgumentParser]":
581 """Return the subparsers action for the 'code' sub-parser.
582
583 If ``subparsers`` already contains a ``code`` parser (i.e. this is being
584 called with the top-level subparsers after app setup), reuse its existing
585 subparsers action. If not, create a minimal ``code`` parser so that
586 ``register(top_level_subs)`` produces the path ``code dead``.
587 """
588 choices = getattr(subparsers, "choices", None) or {}
589 if "code" in choices:
590 code_parser = choices["code"]
591 for action in code_parser._actions:
592 if isinstance(action, argparse._SubParsersAction):
593 return action
594 return code_parser.add_subparsers()
595 code_parser = subparsers.add_parser("code")
596 return code_parser.add_subparsers()
597
598 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
599 """Register the dead subcommand under the 'code' namespace."""
600 subparsers = _get_code_subs(subparsers)
601 parser = subparsers.add_parser(
602 "dead",
603 help="Find symbols with no references and no importers — dead code candidates.",
604 description=__doc__,
605 formatter_class=argparse.RawDescriptionHelpFormatter,
606 )
607 parser.add_argument(
608 "--kind", "-k", default=None, metavar="KIND", dest="kind_filter",
609 help="Restrict to symbols of this kind (function, class, method, async_function, …).",
610 )
611 parser.add_argument(
612 "--include-tests", action="store_true", dest="include_tests",
613 help=(
614 "Include test files (paths containing 'test' or 'spec') in the analysis. "
615 "Tests are excluded by default because pytest discovers them by naming "
616 "convention rather than by reference, which produces thousands of false positives."
617 ),
618 )
619 parser.add_argument(
620 "--exclude-private", action="store_true", dest="exclude_private",
621 help="Exclude symbols whose name starts with '_'.",
622 )
623 parser.add_argument(
624 "--high-confidence-only", action="store_true", dest="high_confidence_only",
625 help="Show only HIGH confidence candidates (module not imported).",
626 )
627 parser.add_argument(
628 "--path", "-p", default=None, metavar="GLOB", dest="path_filter",
629 help="Restrict to files matching this glob pattern (e.g. 'musehub/services/*').",
630 )
631 parser.add_argument(
632 "--language", "-l", default="Python", metavar="LANG", dest="language_filter",
633 help=(
634 "Restrict to files of a specific language (default: Python). "
635 "Use --language all to scan every language including Markdown, TOML, etc. "
636 "Markdown sections and variables are never Python references, so scanning "
637 "them without this filter produces thousands of false positives."
638 ),
639 )
640 parser.add_argument(
641 "--top", "-n", default=None, type=int, metavar="N", dest="top",
642 help="Show only the top N candidates.",
643 )
644 parser.add_argument(
645 "--group-by-file", "-g", action="store_true", dest="group_by_file",
646 help="Group output by file instead of a flat sorted list.",
647 )
648 parser.add_argument(
649 "--commit", "-c", default=None, metavar="REF", dest="ref",
650 help=(
651 "Analyse a historical committed snapshot instead of the working tree. "
652 "Accepts a full commit ID, a short prefix, HEAD, or a branch name."
653 ),
654 )
655 parser.add_argument(
656 "--workers", "-w", default=8, type=int, metavar="N", dest="workers",
657 help="Number of parallel worker threads for parsing (default: 8).",
658 )
659 parser.add_argument(
660 "--max-file-bytes", default=_DEFAULT_MAX_FILE_BYTES, type=int,
661 metavar="N", dest="max_file_bytes",
662 help="Skip files larger than N bytes (default: 524288).",
663 )
664 parser.add_argument(
665 "--no-color", action="store_true", dest="no_color",
666 help="Disable ANSI colour output.",
667 )
668 parser.add_argument(
669 "--json", "-j", action="store_true", dest="json_out",
670 help="Emit results as JSON.",
671 )
672 parser.add_argument(
673 "--delete", action="store_true", dest="delete",
674 help=(
675 "Interactively delete dead symbols from the working tree (Python only). "
676 "Prompts for each candidate unless --yes is also given."
677 ),
678 )
679 parser.add_argument(
680 "--yes", "-y", action="store_true", dest="yes",
681 help="Skip confirmation prompts when used with --delete.",
682 )
683 parser.add_argument(
684 "--allowlist", default=None, metavar="FILE", dest="allowlist",
685 help=(
686 "JSON file with a list of symbol addresses to suppress. "
687 "Example: [\".muse/dead-allowlist.json\"]"
688 ),
689 )
690 parser.add_argument(
691 "--compare", default=None, metavar="REF", dest="compare_ref",
692 help=(
693 "Diff dead-code results against this commit reference. "
694 "Shows which symbols newly became dead and which were recovered."
695 ),
696 )
697 parser.add_argument(
698 "--count", action="store_true", dest="count_only",
699 help="Print only the total count of dead-code candidates (scriptable).",
700 )
701 parser.add_argument(
702 "--save-allowlist", default=None, metavar="FILE", dest="save_allowlist",
703 help=(
704 "Save all found dead-code candidate addresses to FILE as a JSON list. "
705 "Use as input to --allowlist to permanently suppress known false positives."
706 ),
707 )
708 parser.set_defaults(func=run)
709
710 # ── Main logic ────────────────────────────────────────────────────────────────
711
712 def run(args: argparse.Namespace) -> None:
713 """Find symbols with no references and no importers — dead code candidates.
714
715 Scans the working tree (or a historical snapshot with ``--commit``) for
716 symbols that are never called, never accessed, and whose containing module
717 is never imported by anything else in the codebase. Both conditions must
718 hold simultaneously — a symbol in an imported module is still reachable
719 even if unused internally.
720
721 Agent quickstart
722 ----------------
723 ::
724
725 muse code dead --json
726 muse code dead --high-confidence-only --json
727 muse code dead --kind function --exclude-tests --json
728 muse code dead --commit HEAD~10 --json
729
730 JSON fields
731 -----------
732 source Working tree label or commit ref used.
733 total_files_scanned Number of files processed.
734 total_symbols_scanned Number of symbols analysed.
735 high_confidence_count Candidates with high-confidence dead classification.
736 medium_confidence_count Candidates with medium-confidence classification.
737 results List of candidate objects: ``address``, ``kind``,
738 ``confidence``, ``file``, ``line``.
739
740 Exit codes
741 ----------
742 0 Analysis complete (dead candidates may or may not be found).
743 1 Invalid arguments or ref not found.
744 2 Not inside a Muse repository.
745 """
746 kind_filter: str | None = args.kind_filter
747 exclude_tests: bool = not args.include_tests
748 exclude_private: bool = args.exclude_private
749 high_confidence_only: bool = args.high_confidence_only
750 path_filter: str | None = args.path_filter
751 raw_lang: str = args.language_filter
752 language_filter: str | None = None if raw_lang.lower() == "all" else raw_lang
753 top: int | None = (clamp_int(args.top, 1, 100_000, 'top') if args.top is not None else None)
754 group_by_file: bool = args.group_by_file
755 ref: str | None = args.ref
756 compare_ref: str | None = args.compare_ref
757 workers: int = min(max(1, args.workers), _MAX_WORKERS)
758 max_file_bytes: int = max(args.max_file_bytes, _MIN_FILE_BYTES)
759 json_out: bool = args.json_out
760 do_delete: bool = args.delete
761 auto_yes: bool = args.yes
762 allowlist_path: str | None = args.allowlist
763 count_only: bool = args.count_only
764 save_allowlist_path: str | None = args.save_allowlist
765 use_color: bool = not args.no_color and sys.stdout.isatty() and not json_out and not do_delete
766
767 if do_delete and compare_ref:
768 print("❌ --delete and --compare are mutually exclusive.", file=sys.stderr)
769 raise SystemExit(ExitCode.USER_ERROR)
770
771 root = require_repo()
772 branch = read_current_branch(root)
773
774 allowlist = _load_allowlist(allowlist_path)
775 default_allowlist_path = _dead_allowlist_path(root)
776 if default_allowlist_path.exists() and not allowlist_path:
777 allowlist = allowlist | _load_allowlist(str(default_allowlist_path))
778
779 # ── Resolve file bytes ────────────────────────────────────────────────────
780 commit: CommitRecord | None
781 source_label: str
782
783 if ref is None:
784 commit = resolve_commit_ref(root, branch, None)
785 if commit is None:
786 _err("No commits found — repository may be empty.", use_color)
787 raise SystemExit(ExitCode.USER_ERROR)
788 manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {}
789 file_bytes = _load_file_bytes(root, manifest, from_disk=True)
790 source_label = "working tree"
791 if not json_out and not do_delete and not count_only:
792 _print_header_workdir(len(file_bytes), use_color)
793 else:
794 commit = resolve_commit_ref(root, branch, ref)
795 if commit is None:
796 _err(f"Commit '{ref}' not found.", use_color)
797 raise SystemExit(ExitCode.USER_ERROR)
798 manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {}
799 file_bytes = _load_file_bytes(root, manifest, from_disk=False)
800 source_label = f"{commit.commit_id} on {branch}"
801 if not json_out and not do_delete and not count_only:
802 _print_header(commit.commit_id, branch, len(file_bytes), use_color)
803
804 # ── Build implicit entry-point graph ─────────────────────────────────────
805 # Entry-point symbols are framework-wired (e.g. FastAPI route handlers).
806 # They are externally reachable via the runtime even though no user code
807 # calls them directly — they must never be flagged as dead.
808 implicit_graph: ImplicitEdgeGraph = build_implicit_edge_graph(root, manifest)
809 entry_point_addresses: frozenset[str] = frozenset(implicit_graph.keys())
810
811 # ── Common scan args ──────────────────────────────────────────────────────
812 scan_kwargs = _ScanKwargs(
813 kind_filter=kind_filter,
814 max_file_bytes=max_file_bytes,
815 workers=workers,
816 language_filter=language_filter,
817 path_filter=path_filter,
818 exclude_tests=exclude_tests,
819 exclude_private=exclude_private,
820 high_confidence_only=high_confidence_only,
821 allowlist=allowlist,
822 )
823
824 candidates, scanned_symbols, elapsed, skipped_count, error_count = _scan_file_bytes(
825 file_bytes, **scan_kwargs, entry_point_addresses=entry_point_addresses
826 )
827
828 if top is not None:
829 candidates = candidates[:top]
830
831 # ── --save-allowlist ──────────────────────────────────────────────────────
832 if save_allowlist_path:
833 _save_allowlist(save_allowlist_path, candidates)
834
835 # ── --compare diff ────────────────────────────────────────────────────────
836 compare_commit: CommitRecord | None = None
837 new_dead: list[_DeadCandidate] = []
838 recovered: list[_DeadCandidate] = []
839
840 if compare_ref:
841 compare_commit = resolve_commit_ref(root, branch, compare_ref)
842 if compare_commit is None:
843 _err(f"--compare commit '{compare_ref}' not found.", use_color)
844 raise SystemExit(ExitCode.USER_ERROR)
845 compare_manifest = get_commit_snapshot_manifest(root, compare_commit.commit_id) or {}
846 compare_file_bytes = _load_file_bytes(root, compare_manifest, from_disk=False)
847 compare_candidates, _, _, _, _ = _scan_file_bytes(
848 compare_file_bytes, **scan_kwargs
849 )
850 current_addrs = {c.address for c in candidates}
851 compare_addrs = {c.address for c in compare_candidates}
852 new_dead = [c for c in candidates if c.address not in compare_addrs]
853 recovered_addrs = compare_addrs - current_addrs
854 recovered = [c for c in compare_candidates if c.address in recovered_addrs]
855
856 # ── Stats ─────────────────────────────────────────────────────────────────
857 high_count = sum(1 for c in candidates if c.confidence == "high")
858 medium_count = sum(1 for c in candidates if c.confidence == "medium")
859 by_kind: _KindCountMap = {}
860 for c in candidates:
861 by_kind[c.kind] = by_kind.get(c.kind, 0) + 1
862 files_with_dead: set[str] = {c.file_path for c in candidates}
863
864 # ── Output ────────────────────────────────────────────────────────────────
865 if count_only and not json_out:
866 print(len(candidates))
867 return
868
869 if json_out:
870 payload = _DeadPayload(
871 **make_envelope(elapsed),
872 source=source_label,
873 total_files_scanned=len(file_bytes),
874 total_symbols_scanned=scanned_symbols,
875 high_confidence_count=high_count,
876 medium_confidence_count=medium_count,
877 results=[c.to_dict() for c in candidates],
878 )
879 if compare_commit is not None:
880 payload["compare_commit_id"] = compare_commit.commit_id
881 payload["new_dead"] = [c.to_dict() for c in new_dead]
882 payload["recovered"] = [c.to_dict() for c in recovered]
883 payload["net_change"] = len(new_dead) - len(recovered)
884 print(json.dumps(payload))
885 return
886
887 if do_delete:
888 _run_delete_mode(root, candidates, auto_yes)
889 return
890
891 if not candidates:
892 print(f" {_c('✅ No dead code candidates found.', _GREEN, use_color=use_color)}")
893 _print_footer_note(use_color)
894 return
895
896 if group_by_file:
897 _print_grouped(candidates, use_color)
898 else:
899 _print_flat(candidates, use_color)
900
901 _print_summary(
902 candidates=candidates,
903 high_count=high_count,
904 medium_count=medium_count,
905 by_kind=by_kind,
906 files_with_dead=files_with_dead,
907 scanned_symbols=scanned_symbols,
908 total_files=len(file_bytes),
909 skipped_count=skipped_count,
910 error_count=error_count,
911 elapsed=elapsed(),
912 top=top,
913 use_color=use_color,
914 )
915
916 if compare_commit is not None:
917 _print_compare_diff(new_dead, recovered, compare_commit, use_color)
918
919 # ── Delete mode ───────────────────────────────────────────────────────────────
920
921 def _run_delete_mode(
922 root: pathlib.Path,
923 candidates: list[_DeadCandidate],
924 auto_yes: bool,
925 ) -> None:
926 """Interactively delete dead symbols from the working tree."""
927 py_candidates = [c for c in candidates if c.file_path.endswith((".py", ".pyi"))]
928 skipped_non_py = len(candidates) - len(py_candidates)
929
930 if not py_candidates:
931 print(" No Python dead-code candidates to delete.")
932 if skipped_non_py:
933 print(f" ({skipped_non_py} non-Python candidate(s) skipped — delete is Python-only)")
934 return
935
936 print(f"\n{_BOLD}muse code dead --delete{_RESET} — {len(py_candidates)} Python candidate(s)")
937 if skipped_non_py:
938 print(f" {_GRAY}({skipped_non_py} non-Python candidate(s) not shown){_RESET}")
939 print(f"{_GRAY}{'─' * 72}{_RESET}")
940
941 # Group by file so we process each file at most once and delete bottom-to-top.
942 by_file: _DeadByFile = {}
943 for c in py_candidates:
944 by_file.setdefault(c.file_path, []).append(c)
945
946 deleted_total = 0
947 skipped_total = 0
948 failed_total = 0
949
950 for file_path in sorted(by_file):
951 file_candidates = by_file[file_path]
952
953 print(f"\n {_CYAN}{_BOLD}{sanitize_display(file_path)}{_RESET} ({len(file_candidates)} candidate(s))")
954
955 # Collect which symbols to delete (after user confirmation).
956 to_delete: list[_DeadCandidate] = []
957 for c in file_candidates:
958 bare = c.address.split("::")[-1]
959 conf_label = (
960 f"{_RED}HIGH{_RESET}" if c.confidence == "high" else f"{_YELLOW}MED{_RESET}"
961 )
962 kind_label = _BLUE + _kind_icon(c.kind) + _RESET
963 print(f" {_RED}✗{_RESET} {_WHITE}{bare}{_RESET} {kind_label} [{conf_label}]")
964
965 if auto_yes:
966 to_delete.append(c)
967 else:
968 try:
969 answer = input(" Delete? [y/N/q] ").strip().lower()
970 except (EOFError, KeyboardInterrupt):
971 print("\n Aborted.")
972 return
973 if answer == "q":
974 print(" Aborted.")
975 return
976 if answer == "y":
977 to_delete.append(c)
978 else:
979 skipped_total += 1
980
981 if not to_delete:
982 continue
983
984 # Find spans for all symbols we will delete, then remove bottom-to-top.
985 abs_path = (root / file_path).resolve()
986 # Path traversal guard: ensure the resolved path stays within root.
987 try:
988 abs_path.relative_to(root.resolve())
989 except ValueError:
990 print(f" {_YELLOW}⚠ {sanitize_display(str(file_path))!r} escapes repo root — skipping{_RESET}")
991 failed_total += len(to_delete)
992 continue
993 if not abs_path.exists():
994 print(f" {_YELLOW}⚠ file not in working tree — skipping{_RESET}")
995 failed_total += len(to_delete)
996 continue
997
998 source = abs_path.read_bytes()
999 spans: list[tuple[int, int, _DeadCandidate]] = []
1000 for c in to_delete:
1001 parts = c.address.split("::")
1002 bare = parts[-1]
1003 parent_class = parts[-2] if len(parts) >= 3 else None
1004 span = _find_symbol_span(source, bare, parent_class)
1005 if span is None:
1006 print(f" {_YELLOW}⚠ could not locate {sanitize_display(bare)} in {sanitize_display(file_path)}{_RESET}")
1007 failed_total += 1
1008 else:
1009 spans.append((*span, c))
1010
1011 if not spans:
1012 continue
1013
1014 # Sort descending by start line so later deletions don't shift earlier lines.
1015 spans.sort(key=lambda x: -x[0])
1016
1017 lines = source.decode(errors="replace").splitlines(keepends=True)
1018 for start, end, c in spans:
1019 bare = c.address.split("::")[-1]
1020 lines = _delete_symbol_lines(lines, start, end)
1021 print(f" {_GREEN}✅ deleted {bare}{_RESET} (lines {start}–{end})")
1022 deleted_total += 1
1023 if auto_yes:
1024 skipped_total = max(0, skipped_total)
1025
1026 abs_path.write_text("".join(lines), encoding="utf-8")
1027
1028 print(f"\n{_GRAY}{'─' * 72}{_RESET}")
1029 print(f" {_GREEN}Deleted:{_RESET} {deleted_total}")
1030 if skipped_total:
1031 print(f" {_GRAY}Skipped:{_RESET} {skipped_total}")
1032 if failed_total:
1033 print(f" {_YELLOW}Failed:{_RESET} {failed_total}")
1034 if deleted_total:
1035 print(f"\n Run {_CYAN}muse status{_RESET} to review, then {_CYAN}muse commit{_RESET} to record.")
1036
1037 # ── Output helpers ────────────────────────────────────────────────────────────
1038
1039 def _save_allowlist(path: str, candidates: list[_DeadCandidate]) -> None:
1040 """Write candidate addresses to *path* as a JSON array."""
1041 try:
1042 pathlib.Path(path).write_text(
1043 json.dumps(sorted(c.address for c in candidates)),
1044 encoding="utf-8",
1045 )
1046 logger.info("✅ Saved %d address(es) to %s", len(candidates), path)
1047 except OSError as exc:
1048 logger.warning("⚠️ Could not write allowlist %s: %s", path, exc)
1049
1050 def _is_test_file(file_path: str) -> bool:
1051 lower = file_path.lower()
1052 return "test" in lower or "spec" in lower
1053
1054 def _err(msg: str, use_color: bool) -> None:
1055 print(_c(f"❌ {msg}", _RED, _BOLD, use_color=use_color), file=sys.stderr)
1056
1057 def _print_header(commit_id: str, branch: str, total_files: int, use_color: bool) -> None:
1058 sha = _c(commit_id, _CYAN, _BOLD, use_color=use_color)
1059 br = _c(branch, _MAGENTA, use_color=use_color)
1060 n = _c(str(total_files), _BOLD, use_color=use_color)
1061 print(f"\n{_c('Dead code candidates', _BOLD, use_color=use_color)} — commit {sha} on {br} — {n} files")
1062 print(_c("━" * 72, _GRAY, use_color=use_color))
1063
1064 def _print_header_workdir(total_files: int, use_color: bool) -> None:
1065 label = _c("working tree", _CYAN, _BOLD, use_color=use_color)
1066 n = _c(str(total_files), _BOLD, use_color=use_color)
1067 print(f"\n{_c('Dead code candidates', _BOLD, use_color=use_color)} — {label} — {n} files")
1068 print(_c("━" * 72, _GRAY, use_color=use_color))
1069
1070 def _kind_icon(kind: str) -> str:
1071 return {
1072 "function": "fn",
1073 "async_function": "async fn",
1074 "method": "method",
1075 "async_method": "async method",
1076 "class": "class",
1077 "variable": "var",
1078 "constant": "const",
1079 }.get(kind, kind)
1080
1081 def _confidence_label(c: _DeadCandidate, use_color: bool) -> str:
1082 if c.confidence == "high":
1083 return _c("HIGH", _RED, _BOLD, use_color=use_color)
1084 return _c("MED ", _YELLOW, use_color=use_color)
1085
1086 def _print_flat(candidates: list[_DeadCandidate], use_color: bool) -> None:
1087 max_addr = min(max(len(c.address) for c in candidates), 80)
1088 max_kind = max(len(_kind_icon(c.kind)) for c in candidates)
1089 prev_conf = ""
1090 for c in candidates:
1091 conf = c.confidence
1092 if conf != prev_conf:
1093 prev_conf = conf
1094 if conf == "high":
1095 label = _c(" ── HIGH CONFIDENCE — not referenced, module not imported", _RED, use_color=use_color)
1096 else:
1097 label = _c(" ── MEDIUM CONFIDENCE — not referenced, module is imported", _YELLOW, use_color=use_color)
1098 print(f"\n{label}")
1099 print(_c(f" {'─' * 68}", _GRAY, use_color=use_color))
1100 addr_str = _c(c.address[:max_addr], _WHITE if conf == "high" else _GRAY, use_color=use_color)
1101 kind_str = _c(_kind_icon(c.kind).ljust(max_kind), _BLUE, use_color=use_color)
1102 conf_str = _confidence_label(c, use_color)
1103 print(f" {addr_str:<{max_addr + 20}} {kind_str} [{conf_str}]")
1104
1105 def _print_grouped(candidates: list[_DeadCandidate], use_color: bool) -> None:
1106 by_file: _DeadByFile = {}
1107 for c in candidates:
1108 by_file.setdefault(c.file_path, []).append(c)
1109
1110 for file_path in sorted(by_file):
1111 group = by_file[file_path]
1112 high_n = sum(1 for c in group if c.confidence == "high")
1113 med_n = sum(1 for c in group if c.confidence == "medium")
1114 counts = []
1115 if high_n:
1116 counts.append(_c(f"{high_n} high", _RED, use_color=use_color))
1117 if med_n:
1118 counts.append(_c(f"{med_n} med", _YELLOW, use_color=use_color))
1119 print(f"\n {_c(file_path, _CYAN, _BOLD, use_color=use_color)} {', '.join(counts)}")
1120 max_name = min(max(len(c.address.split('::')[-1]) for c in group), 60)
1121 for c in sorted(group, key=lambda x: (x.confidence != "high", x.address)):
1122 sym_name = c.address.split("::")[-1] if "::" in c.address else c.address
1123 kind_str = _c(_kind_icon(c.kind), _BLUE, use_color=use_color)
1124 if c.confidence == "high":
1125 sym_str = _c(sym_name.ljust(max_name), _WHITE, use_color=use_color)
1126 marker = _c("✗", _RED, _BOLD, use_color=use_color)
1127 else:
1128 sym_str = _c(sym_name.ljust(max_name), _GRAY, use_color=use_color)
1129 marker = _c("·", _YELLOW, use_color=use_color)
1130 print(f" {marker} {sym_str} {kind_str}")
1131
1132 def _print_summary(
1133 candidates: list[_DeadCandidate],
1134 high_count: int,
1135 medium_count: int,
1136 by_kind: _KindCountMap,
1137 files_with_dead: set[str],
1138 scanned_symbols: int,
1139 total_files: int,
1140 skipped_count: int,
1141 error_count: int,
1142 elapsed: float,
1143 top: int | None,
1144 use_color: bool,
1145 ) -> None:
1146 print(f"\n{_c('━' * 72, _GRAY, use_color=use_color)}")
1147 print(f"{_c('Summary', _BOLD, use_color=use_color)}")
1148 print(f" {_c('High confidence', _RED, use_color=use_color):.<50} {high_count:>6}")
1149 print(f" {_c('Medium confidence', _YELLOW, use_color=use_color):.<50} {medium_count:>6}")
1150 print(f" {'Total candidates':.<42} {len(candidates):>6}")
1151 if top is not None:
1152 print(f" {_c(f'(showing top {top})', _GRAY, use_color=use_color)}")
1153 print(f" {'Symbols scanned':.<42} {scanned_symbols:>6,}")
1154 print(f" {'Files with dead symbols':.<42} {len(files_with_dead):>6}")
1155 print(f" {'Files scanned':.<42} {total_files:>6,}")
1156 if skipped_count:
1157 print(f" {_c('Files skipped (too large)', _GRAY, use_color=use_color):.<50} {skipped_count:>6}")
1158 if error_count:
1159 print(f" {_c('Files with parse errors', _YELLOW, use_color=use_color):.<50} {error_count:>6}")
1160 print(f" {'Elapsed':.<42} {elapsed:>5.1f}s")
1161
1162 if by_kind:
1163 print(f"\n {_c('By kind:', _BOLD, use_color=use_color)}")
1164 for kind, count in sorted(by_kind.items(), key=lambda x: -x[1]):
1165 bar_len = min(count // max(1, max(by_kind.values()) // 20), 20)
1166 bar = _c("█" * bar_len, _BLUE, use_color=use_color)
1167 print(f" {_kind_icon(kind):<16} {bar} {count:>5,}")
1168
1169 _print_footer_note(use_color)
1170
1171 def _print_compare_diff(
1172 new_dead: list[_DeadCandidate],
1173 recovered: list[_DeadCandidate],
1174 compare_commit: CommitRecord,
1175 use_color: bool,
1176 ) -> None:
1177 """Render the dead-code diff section."""
1178 print(f"\n{_c('━' * 72, _GRAY, use_color=use_color)}")
1179 sha = _c(compare_commit.commit_id, _CYAN, _BOLD, use_color=use_color)
1180 print(f"{_c('Dead-code diff', _BOLD, use_color=use_color)} vs {sha}")
1181 net = len(new_dead) - len(recovered)
1182 sign = "+" if net >= 0 else ""
1183 colour = _RED if net > 0 else _GREEN if net < 0 else _GRAY
1184 print(f" Net change: {_c(f'{sign}{net}', colour, use_color=use_color)}")
1185 if new_dead:
1186 print(f"\n {_c(f'New dead ({len(new_dead)}):', _RED, use_color=use_color)}")
1187 for c in new_dead:
1188 print(f" + {c.address} [{_kind_icon(c.kind)}] [{c.confidence.upper()}]")
1189 if recovered:
1190 print(f"\n {_c(f'Recovered ({len(recovered)}):', _GREEN, use_color=use_color)}")
1191 for c in recovered:
1192 print(f" - {c.address} [{_kind_icon(c.kind)}]")
1193
1194 def _print_footer_note(use_color: bool) -> None:
1195 note = (
1196 "Dynamic dispatch, exported APIs, and entry points are not detected.\n"
1197 " Treat results as candidates — verify before deleting.\n"
1198 " Use --delete to interactively remove candidates from the working tree.\n"
1199 " Use --allowlist to suppress known false positives."
1200 )
1201 print(f"\n{_c(note, _GRAY, use_color=use_color)}")
File History 1 commit
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40 docs: add | jq convention to --json section of agent-guide Sonnet 4.6 1 day ago