dead.py
python
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40
docs: add | jq convention to --json section of agent-guide
Sonnet 4.6
1 day ago
| 1 | """muse code dead — dead code detection. |
| 2 | |
| 3 | Finds symbols that are **never referenced** and whose containing module is |
| 4 | **never imported** by anything else in the codebase. |
| 5 | |
| 6 | A symbol is a dead-code candidate when two independent conditions hold: |
| 7 | |
| 8 | 1. **No reference**: its bare name does not appear as any ``ast.Name`` id |
| 9 | or ``ast.Attribute`` attr anywhere in the codebase. This is broader |
| 10 | than call-site detection — it catches attribute accesses, keyword |
| 11 | argument values, type annotations, ``isinstance`` checks, and every |
| 12 | other form of name usage, not just direct calls. |
| 13 | |
| 14 | 2. **No import**: its containing file's module name does not appear in |
| 15 | any ``import``-kind symbol in any other file. |
| 16 | |
| 17 | Both conditions must hold simultaneously. A function that is never |
| 18 | referenced but lives in a module that *is* imported is still reachable — |
| 19 | it may be part of an exported API even if it's not used internally. |
| 20 | |
| 21 | Performance |
| 22 | ----------- |
| 23 | All files are processed in a **single parallel pass** and AST-parsed |
| 24 | exactly once. Without ``--commit`` the working tree is read from disk, |
| 25 | so uncommitted changes are immediately visible. With ``--commit`` the |
| 26 | specified historical snapshot is read from the object store. |
| 27 | Imports, references, and symbol trees are all extracted in the same |
| 28 | pass, then combined. ``--workers`` controls the thread-pool size. |
| 29 | |
| 30 | Security |
| 31 | -------- |
| 32 | ``ast.parse`` never executes code. Files exceeding ``--max-file-bytes`` |
| 33 | (default 512 KB) are skipped to prevent stalls on generated or minified |
| 34 | files. ``--delete`` validates every file path inside the repo root before |
| 35 | touching the working tree. |
| 36 | |
| 37 | Known limitations |
| 38 | ----------------- |
| 39 | - Symbols whose names are extremely common (e.g. ``run``, ``name``) may |
| 40 | appear as false negatives because a matching name exists somewhere else. |
| 41 | - Exported APIs: symbols accessed from outside the repo (library code) |
| 42 | appear dead because the callers are not in the snapshot. |
| 43 | - Entry points: ``main()``, CLI callbacks, and test functions appear dead |
| 44 | by design. Use ``--exclude-tests`` to hide test file symbols. |
| 45 | - tree-sitter languages: reference extraction is Python-only. Symbols in |
| 46 | Go/Rust/TypeScript files are checked for import-graph reachability only. |
| 47 | - ``--delete`` is Python-only (requires AST line-range information). |
| 48 | |
| 49 | Usage:: |
| 50 | |
| 51 | muse code dead |
| 52 | muse code dead --kind function |
| 53 | muse code dead --exclude-tests |
| 54 | muse code dead --exclude-private |
| 55 | muse code dead --high-confidence-only |
| 56 | muse code dead --path "musehub/services/*" |
| 57 | muse code dead --language Python |
| 58 | muse code dead --top 50 |
| 59 | muse code dead --group-by-file |
| 60 | muse code dead --commit HEAD~10 |
| 61 | muse code dead --workers 8 |
| 62 | muse code dead --json |
| 63 | muse code dead --delete |
| 64 | muse code dead --delete --yes |
| 65 | muse code dead --allowlist .muse/dead-allowlist.json |
| 66 | |
| 67 | Confidence levels:: |
| 68 | |
| 69 | HIGH — not referenced AND module not imported → almost certainly dead |
| 70 | MEDIUM — not referenced, but module IS imported → may be exported API surface |
| 71 | |
| 72 | Flags: |
| 73 | |
| 74 | ``--kind KIND, -k KIND`` |
| 75 | Restrict to symbols of a specific kind (function, class, method, …). |
| 76 | |
| 77 | ``--exclude-tests`` |
| 78 | Exclude symbols in files whose path contains ``test`` or ``spec``. |
| 79 | |
| 80 | ``--exclude-private`` |
| 81 | Exclude symbols whose bare name starts with ``_``. |
| 82 | |
| 83 | ``--high-confidence-only`` |
| 84 | Show only HIGH confidence candidates (module not imported). |
| 85 | |
| 86 | ``--path GLOB, -p GLOB`` |
| 87 | Restrict to files matching this glob pattern (e.g. ``"musehub/services/*"``). |
| 88 | |
| 89 | ``--language LANG, -l LANG`` |
| 90 | Restrict to files of a specific language (e.g. ``Python``, ``TypeScript``). |
| 91 | |
| 92 | ``--top N`` |
| 93 | Show only the top N candidates. |
| 94 | |
| 95 | ``--group-by-file, -g`` |
| 96 | Group output by file instead of a flat sorted list. |
| 97 | |
| 98 | ``--commit REF, -c REF`` |
| 99 | Analyse a historical snapshot instead of HEAD. |
| 100 | |
| 101 | ``--workers N, -w N`` |
| 102 | Number of parallel worker threads for file parsing (default: 8). |
| 103 | |
| 104 | ``--max-file-bytes N`` |
| 105 | Skip files larger than N bytes (default: 524288 = 512 KB). |
| 106 | |
| 107 | ``--no-color`` |
| 108 | Disable ANSI color output. |
| 109 | |
| 110 | ``--json`` |
| 111 | Emit results as JSON. |
| 112 | |
| 113 | ``--delete`` |
| 114 | Interactively delete dead symbols from the working tree (Python only). |
| 115 | Prompts for each candidate unless ``--yes`` is also given. |
| 116 | |
| 117 | ``--yes, -y`` |
| 118 | Skip confirmation prompts when used with ``--delete``. |
| 119 | |
| 120 | ``--allowlist FILE`` |
| 121 | JSON file containing a list of symbol addresses to suppress from output. |
| 122 | Addresses are matched as exact strings against the ``address`` field. |
| 123 | Example file: ``[\"muse/cli/config.py::MuseConfig\"]`` |
| 124 | """ |
| 125 | |
| 126 | import argparse |
| 127 | import ast |
| 128 | import fnmatch |
| 129 | import json |
| 130 | import logging |
| 131 | import os |
| 132 | import pathlib |
| 133 | import sys |
| 134 | from concurrent.futures import ThreadPoolExecutor, as_completed |
| 135 | from dataclasses import dataclass, field |
| 136 | from typing import TypedDict |
| 137 | |
| 138 | from muse.core.errors import ExitCode |
| 139 | from muse.core.object_store import read_object |
| 140 | from muse.core.repo import require_repo |
| 141 | from muse.core.types import Manifest |
| 142 | from muse.core.paths import dead_allowlist_path as _dead_allowlist_path |
| 143 | from muse.core.refs import read_current_branch |
| 144 | from muse.core.commits import ( |
| 145 | CommitRecord, |
| 146 | resolve_commit_ref, |
| 147 | ) |
| 148 | from muse.core.snapshots import get_commit_snapshot_manifest |
| 149 | from muse.plugins.code._framework import ImplicitEdgeGraph, build_implicit_edge_graph |
| 150 | from muse.plugins.code._query import language_of |
| 151 | from muse.plugins.code.ast_parser import SEMANTIC_EXTENSIONS, SymbolTree, parse_symbols |
| 152 | |
| 153 | type _BlobMap = dict[str, bytes] |
| 154 | type _KindCountMap = dict[str, int] |
| 155 | type _DeadByFile = dict[str, list["_DeadCandidate"]] |
| 156 | from muse.core.validation import MAX_AST_BYTES, clamp_int, sanitize_display |
| 157 | from muse.core.envelope import EnvelopeJson, make_envelope |
| 158 | from muse.core.timing import start_timer |
| 159 | |
| 160 | logger = logging.getLogger(__name__) |
| 161 | |
| 162 | class _DeadCandidateJson(TypedDict): |
| 163 | """JSON-serialisable representation of one dead-code candidate.""" |
| 164 | |
| 165 | address: str |
| 166 | path: str |
| 167 | kind: str |
| 168 | referenced: bool |
| 169 | module_imported: bool |
| 170 | confidence: str |
| 171 | reason: str |
| 172 | |
| 173 | class _DeadPayload(EnvelopeJson, total=False): |
| 174 | """JSON output for ``muse code dead``.""" |
| 175 | |
| 176 | source: str |
| 177 | total_files_scanned: int |
| 178 | total_symbols_scanned: int |
| 179 | high_confidence_count: int |
| 180 | medium_confidence_count: int |
| 181 | results: list[_DeadCandidateJson] |
| 182 | compare_commit_id: str |
| 183 | new_dead: list[_DeadCandidateJson] |
| 184 | recovered: list[_DeadCandidateJson] |
| 185 | net_change: int |
| 186 | |
| 187 | class _ScanKwargs(TypedDict): |
| 188 | """Keyword arguments forwarded to every :func:`_scan_file_bytes` call. |
| 189 | |
| 190 | Collected into a TypedDict so the ``**scan_kwargs`` spread is type-safe |
| 191 | without a ``# type: ignore`` and the common args are defined once. |
| 192 | """ |
| 193 | |
| 194 | kind_filter: str | None |
| 195 | max_file_bytes: int |
| 196 | workers: int |
| 197 | language_filter: str | None |
| 198 | path_filter: str | None |
| 199 | exclude_tests: bool |
| 200 | exclude_private: bool |
| 201 | high_confidence_only: bool |
| 202 | allowlist: frozenset[str] |
| 203 | |
| 204 | _PY_SUFFIXES: frozenset[str] = frozenset({".py", ".pyi"}) |
| 205 | _MAX_WORKERS: int = 64 |
| 206 | _MIN_FILE_BYTES: int = 4_096 |
| 207 | |
| 208 | # Maximum file size we'll parse (512 KB). Prevents stalling on generated files. |
| 209 | _DEFAULT_MAX_FILE_BYTES: int = 524_288 |
| 210 | |
| 211 | # ── ANSI colours ────────────────────────────────────────────────────────────── |
| 212 | |
| 213 | _RESET = "\033[0m" |
| 214 | _BOLD = "\033[1m" |
| 215 | _DIM = "\033[2m" |
| 216 | _RED = "\033[31m" |
| 217 | _YELLOW = "\033[33m" |
| 218 | _CYAN = "\033[36m" |
| 219 | _GREEN = "\033[32m" |
| 220 | _BLUE = "\033[34m" |
| 221 | _MAGENTA = "\033[35m" |
| 222 | _WHITE = "\033[37m" |
| 223 | _GRAY = "\033[90m" |
| 224 | |
| 225 | def _c(text: str, *codes: str, use_color: bool = True) -> str: |
| 226 | """Wrap *text* with ANSI escape codes if *use_color* is True.""" |
| 227 | if not use_color: |
| 228 | return text |
| 229 | return "".join(codes) + text + _RESET |
| 230 | |
| 231 | # ── Data structures ─────────────────────────────────────────────────────────── |
| 232 | |
| 233 | @dataclass |
| 234 | class _FileAnalysis: |
| 235 | """Everything extracted from a single file in one pass.""" |
| 236 | file_path: str |
| 237 | lang: str |
| 238 | symbol_tree: SymbolTree = field(default_factory=dict) |
| 239 | # Every name referenced anywhere in the file (ast.Name ids + ast.Attribute attrs). |
| 240 | # This is broader than call-sites: catches attribute access, keyword args, |
| 241 | # type annotations, isinstance checks, decorator names, etc. |
| 242 | ref_names: set[str] = field(default_factory=set) |
| 243 | # Imported module/name strings (from import-kind symbols) |
| 244 | imported_names: set[str] = field(default_factory=set) |
| 245 | skipped: bool = False |
| 246 | error: str | None = None |
| 247 | |
| 248 | @dataclass |
| 249 | class _DeadCandidate: |
| 250 | address: str |
| 251 | file_path: str |
| 252 | kind: str |
| 253 | referenced: bool |
| 254 | module_imported: bool |
| 255 | |
| 256 | @property |
| 257 | def confidence(self) -> str: |
| 258 | return "high" if not self.module_imported else "medium" |
| 259 | |
| 260 | @property |
| 261 | def reason(self) -> str: |
| 262 | if not self.referenced and not self.module_imported: |
| 263 | return "not referenced, module not imported" |
| 264 | return "not referenced (module imported — may be exported API)" |
| 265 | |
| 266 | def to_dict(self) -> _DeadCandidateJson: |
| 267 | return _DeadCandidateJson( |
| 268 | address=self.address, |
| 269 | path=self.file_path, |
| 270 | kind=self.kind, |
| 271 | referenced=self.referenced, |
| 272 | module_imported=self.module_imported, |
| 273 | confidence=self.confidence, |
| 274 | reason=self.reason, |
| 275 | ) |
| 276 | |
| 277 | # ── Single-pass file analysis ───────────────────────────────────────────────── |
| 278 | |
| 279 | def _analyse_file( |
| 280 | file_path: str, |
| 281 | raw: bytes, |
| 282 | kind_filter: str | None, |
| 283 | max_file_bytes: int, |
| 284 | ) -> _FileAnalysis: |
| 285 | """Parse and extract symbols + references + imports from one file. |
| 286 | |
| 287 | Thread-safe: pure functions only, no shared mutable state. |
| 288 | The caller is responsible for supplying the raw file bytes — either read |
| 289 | from disk (working tree) or fetched from the object store (historical commit). |
| 290 | """ |
| 291 | lang = language_of(file_path) |
| 292 | result = _FileAnalysis(file_path=file_path, lang=lang) |
| 293 | |
| 294 | if len(raw) > max_file_bytes: |
| 295 | result.skipped = True |
| 296 | return result |
| 297 | |
| 298 | suffix = pathlib.PurePosixPath(file_path).suffix.lower() |
| 299 | if suffix not in SEMANTIC_EXTENSIONS: |
| 300 | return result |
| 301 | |
| 302 | # ── Symbol extraction (all languages with AST support) ───────────────── |
| 303 | try: |
| 304 | tree = parse_symbols(raw, file_path) |
| 305 | except Exception as exc: # noqa: BLE001 |
| 306 | result.error = str(exc) |
| 307 | return result |
| 308 | |
| 309 | for rec in tree.values(): |
| 310 | if rec["kind"] == "import": |
| 311 | result.imported_names.add(rec["qualified_name"]) |
| 312 | |
| 313 | if kind_filter: |
| 314 | tree = {addr: rec for addr, rec in tree.items() if rec["kind"] == kind_filter} |
| 315 | result.symbol_tree = tree |
| 316 | |
| 317 | # ── Reference + module-import extraction (Python only via stdlib ast) ───── |
| 318 | # We walk ALL nodes once: |
| 319 | # |
| 320 | # ast.Name / ast.Attribute — broad reference tracking (fixes logger, |
| 321 | # func=run keyword args, property access, isinstance args, annotations) |
| 322 | # |
| 323 | # ast.ImportFrom / ast.Import — extract the actual dotted module paths |
| 324 | # ("from muse.core.store import X" → "muse.core.store"). The Muse |
| 325 | # symbol tree stores imports as "import::symbolname" with no module |
| 326 | # path, so we must supplement it here to make _module_is_imported work. |
| 327 | if suffix in _PY_SUFFIXES: |
| 328 | try: |
| 329 | if len(raw) > MAX_AST_BYTES: |
| 330 | return result |
| 331 | py_tree = ast.parse(raw) |
| 332 | except SyntaxError: |
| 333 | return result |
| 334 | for node in ast.walk(py_tree): |
| 335 | if isinstance(node, ast.Name): |
| 336 | result.ref_names.add(node.id) |
| 337 | elif isinstance(node, ast.Attribute): |
| 338 | result.ref_names.add(node.attr) |
| 339 | elif isinstance(node, ast.ImportFrom): |
| 340 | if node.module: |
| 341 | result.imported_names.add(node.module) |
| 342 | elif isinstance(node, ast.Import): |
| 343 | for alias in node.names: |
| 344 | result.imported_names.add(alias.name) |
| 345 | |
| 346 | return result |
| 347 | |
| 348 | # ── Module-import matching ──────────────────────────────────────────────────── |
| 349 | |
| 350 | def _module_is_imported(file_path: str, imported_names: set[str]) -> bool: |
| 351 | """Return True if *file_path*'s module name appears anywhere in *imported_names*.""" |
| 352 | stem = pathlib.PurePosixPath(file_path).stem |
| 353 | module = pathlib.PurePosixPath(file_path).with_suffix("").as_posix().replace("/", ".") |
| 354 | for imp in imported_names: |
| 355 | if ( |
| 356 | imp == stem |
| 357 | or imp == module |
| 358 | or imp.endswith(f".{stem}") |
| 359 | or imp.endswith(f".{module}") |
| 360 | or stem in imp.split(".") |
| 361 | ): |
| 362 | return True |
| 363 | return False |
| 364 | |
| 365 | # ── Path filter ─────────────────────────────────────────────────────────────── |
| 366 | |
| 367 | def _matches_path_filter(file_path: str, pattern: str | None) -> bool: |
| 368 | if pattern is None: |
| 369 | return True |
| 370 | return fnmatch.fnmatch(file_path, pattern) or fnmatch.fnmatch(file_path, f"**/{pattern}") |
| 371 | |
| 372 | # ── Symbol deletion (Python only) ───────────────────────────────────────────── |
| 373 | |
| 374 | def _find_symbol_span(source: bytes, bare_name: str, parent_class: str | None) -> tuple[int, int] | None: |
| 375 | """Return (start_lineno, end_lineno) 1-indexed for the named symbol. |
| 376 | |
| 377 | Accounts for decorator lines (start is the first decorator's line). |
| 378 | Returns None if the symbol cannot be located. |
| 379 | """ |
| 380 | try: |
| 381 | if len(source) > MAX_AST_BYTES: |
| 382 | return None |
| 383 | tree = ast.parse(source) |
| 384 | except SyntaxError: |
| 385 | return None |
| 386 | |
| 387 | search_body: list[ast.stmt] = tree.body |
| 388 | if parent_class: |
| 389 | for node in ast.walk(tree): |
| 390 | if isinstance(node, ast.ClassDef) and node.name == parent_class: |
| 391 | search_body = list(node.body) |
| 392 | break |
| 393 | |
| 394 | for node in search_body: |
| 395 | node_name: str | None = None |
| 396 | if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): |
| 397 | node_name = node.name |
| 398 | elif isinstance(node, ast.Assign): |
| 399 | if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name): |
| 400 | node_name = node.targets[0].id |
| 401 | elif isinstance(node, ast.AnnAssign): |
| 402 | if isinstance(node.target, ast.Name): |
| 403 | node_name = node.target.id |
| 404 | |
| 405 | if node_name != bare_name: |
| 406 | continue |
| 407 | |
| 408 | if not hasattr(node, "end_lineno") or node.end_lineno is None: |
| 409 | return None |
| 410 | end: int = node.end_lineno |
| 411 | start: int = node.lineno |
| 412 | if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): |
| 413 | if node.decorator_list: |
| 414 | start = node.decorator_list[0].lineno |
| 415 | return (start, end) |
| 416 | |
| 417 | return None |
| 418 | |
| 419 | def _delete_symbol_lines(source_lines: list[str], start: int, end: int) -> list[str]: |
| 420 | """Remove lines *start*–*end* (1-indexed, inclusive) and normalise blank lines.""" |
| 421 | before = list(source_lines[: start - 1]) |
| 422 | after = list(source_lines[end:]) |
| 423 | |
| 424 | # Strip trailing blank lines from the block before the deletion point. |
| 425 | while before and not before[-1].strip(): |
| 426 | before.pop() |
| 427 | |
| 428 | # Add one blank separator line if there is still content below. |
| 429 | if after and any(line.strip() for line in after): |
| 430 | return before + ["\n"] + after |
| 431 | return before + after |
| 432 | |
| 433 | # ── Repo ID ─────────────────────────────────────────────────────────────────── |
| 434 | |
| 435 | # ── Allowlist ───────────────────────────────────────────────────────────────── |
| 436 | |
| 437 | def _load_allowlist(path: str | None) -> frozenset[str]: |
| 438 | """Load a JSON list of symbol addresses that should be suppressed.""" |
| 439 | if path is None: |
| 440 | return frozenset() |
| 441 | try: |
| 442 | raw = pathlib.Path(path).read_text(encoding="utf-8") |
| 443 | parsed = json.loads(raw) |
| 444 | if not isinstance(parsed, list): |
| 445 | logger.warning("dead-code allowlist must be a JSON array; ignoring %s", path) |
| 446 | return frozenset() |
| 447 | return frozenset(str(x) for x in parsed) |
| 448 | except (OSError, json.JSONDecodeError) as exc: |
| 449 | logger.warning("Could not load allowlist %s: %s", path, exc) |
| 450 | return frozenset() |
| 451 | |
| 452 | # ── Shared scan pipeline ────────────────────────────────────────────────────── |
| 453 | |
| 454 | def _load_file_bytes( |
| 455 | root: pathlib.Path, |
| 456 | manifest: Manifest, |
| 457 | from_disk: bool, |
| 458 | ) -> _BlobMap: |
| 459 | """Build the ``file_path → bytes`` map for the scan. |
| 460 | |
| 461 | When *from_disk* is True, read each file from the working tree. Files |
| 462 | deleted from the working tree are excluded entirely — a deleted file has |
| 463 | no symbols, so its symbols cannot be dead. When False, read exclusively |
| 464 | from the object store (historical snapshot). |
| 465 | """ |
| 466 | result: _BlobMap = {} |
| 467 | for fp, oid in manifest.items(): |
| 468 | if from_disk: |
| 469 | try: |
| 470 | result[fp] = (root / fp).read_bytes() |
| 471 | except OSError: |
| 472 | pass # File deleted from working tree — exclude from scan. |
| 473 | else: |
| 474 | raw = read_object(root, oid) |
| 475 | if raw is not None: |
| 476 | result[fp] = raw |
| 477 | return result |
| 478 | |
| 479 | def _scan_file_bytes( |
| 480 | file_bytes: _BlobMap, |
| 481 | kind_filter: str | None, |
| 482 | max_file_bytes: int, |
| 483 | workers: int, |
| 484 | language_filter: str | None, |
| 485 | path_filter: str | None, |
| 486 | exclude_tests: bool, |
| 487 | exclude_private: bool, |
| 488 | high_confidence_only: bool, |
| 489 | allowlist: frozenset[str], |
| 490 | entry_point_addresses: frozenset[str] = frozenset(), |
| 491 | ) -> tuple[list[_DeadCandidate], int, float, int, int]: |
| 492 | """Full dead-code analysis pipeline. |
| 493 | |
| 494 | Args: |
| 495 | file_bytes: Map of ``file_path → raw bytes`` to analyse. |
| 496 | kind_filter: Restrict to symbols of this kind, or ``None``. |
| 497 | max_file_bytes: Skip files larger than this many bytes. |
| 498 | workers: Number of parallel parse threads. |
| 499 | language_filter: Restrict to this language name, or ``None``. |
| 500 | path_filter: Glob pattern for file path restriction. |
| 501 | exclude_tests: When ``True``, skip test files. |
| 502 | exclude_private: When ``True``, skip ``_private`` symbols. |
| 503 | high_confidence_only: When ``True``, only return high-confidence hits. |
| 504 | allowlist: Set of symbol addresses to suppress. |
| 505 | entry_point_addresses: Addresses of framework-wired entry points. |
| 506 | These are *never* reported as dead code because |
| 507 | they are externally reachable via the framework |
| 508 | even though no user code calls them explicitly. |
| 509 | |
| 510 | Returns: |
| 511 | ``(candidates, scanned_symbols, duration_ms, skipped, errors)``. |
| 512 | """ |
| 513 | elapsed = start_timer() |
| 514 | analyses: list[_FileAnalysis] = [] |
| 515 | |
| 516 | with ThreadPoolExecutor(max_workers=workers) as pool: |
| 517 | futures = { |
| 518 | pool.submit(_analyse_file, fp, raw, kind_filter, max_file_bytes): fp |
| 519 | for fp, raw in file_bytes.items() |
| 520 | } |
| 521 | for future in as_completed(futures): |
| 522 | analyses.append(future.result()) |
| 523 | |
| 524 | all_ref_names: set[str] = set() |
| 525 | all_imported_names: set[str] = set() |
| 526 | for a in analyses: |
| 527 | all_ref_names.update(a.ref_names) |
| 528 | all_imported_names.update(a.imported_names) |
| 529 | |
| 530 | candidates: list[_DeadCandidate] = [] |
| 531 | scanned_symbols = 0 |
| 532 | |
| 533 | for analysis in sorted(analyses, key=lambda a: a.file_path): |
| 534 | if analysis.skipped or analysis.error: |
| 535 | continue |
| 536 | if not _matches_path_filter(analysis.file_path, path_filter): |
| 537 | continue |
| 538 | if language_filter and analysis.lang != language_filter: |
| 539 | continue |
| 540 | if exclude_tests and _is_test_file(analysis.file_path): |
| 541 | continue |
| 542 | |
| 543 | mod_imported = _module_is_imported(analysis.file_path, all_imported_names) |
| 544 | |
| 545 | for address, rec in sorted(analysis.symbol_tree.items()): |
| 546 | if rec["kind"] == "import": |
| 547 | continue |
| 548 | scanned_symbols += 1 |
| 549 | bare_name = rec["name"].split(".")[-1] |
| 550 | if exclude_private and bare_name.startswith("_"): |
| 551 | continue |
| 552 | if address in allowlist: |
| 553 | continue |
| 554 | if bare_name in all_ref_names: |
| 555 | continue |
| 556 | if address in entry_point_addresses: |
| 557 | continue |
| 558 | cand = _DeadCandidate( |
| 559 | address=address, |
| 560 | file_path=analysis.file_path, |
| 561 | kind=rec["kind"], |
| 562 | referenced=False, |
| 563 | module_imported=mod_imported, |
| 564 | ) |
| 565 | if high_confidence_only and cand.confidence != "high": |
| 566 | continue |
| 567 | candidates.append(cand) |
| 568 | |
| 569 | candidates.sort(key=lambda c: (c.confidence != "high", c.file_path, c.address)) |
| 570 | |
| 571 | skipped = sum(1 for a in analyses if a.skipped) |
| 572 | errors = sum(1 for a in analyses if a.error) |
| 573 | |
| 574 | return candidates, scanned_symbols, elapsed, skipped, errors |
| 575 | |
| 576 | # ── CLI registration ────────────────────────────────────────────────────────── |
| 577 | |
| 578 | def _get_code_subs( |
| 579 | subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]", |
| 580 | ) -> "argparse._SubParsersAction[argparse.ArgumentParser]": |
| 581 | """Return the subparsers action for the 'code' sub-parser. |
| 582 | |
| 583 | If ``subparsers`` already contains a ``code`` parser (i.e. this is being |
| 584 | called with the top-level subparsers after app setup), reuse its existing |
| 585 | subparsers action. If not, create a minimal ``code`` parser so that |
| 586 | ``register(top_level_subs)`` produces the path ``code dead``. |
| 587 | """ |
| 588 | choices = getattr(subparsers, "choices", None) or {} |
| 589 | if "code" in choices: |
| 590 | code_parser = choices["code"] |
| 591 | for action in code_parser._actions: |
| 592 | if isinstance(action, argparse._SubParsersAction): |
| 593 | return action |
| 594 | return code_parser.add_subparsers() |
| 595 | code_parser = subparsers.add_parser("code") |
| 596 | return code_parser.add_subparsers() |
| 597 | |
| 598 | def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: |
| 599 | """Register the dead subcommand under the 'code' namespace.""" |
| 600 | subparsers = _get_code_subs(subparsers) |
| 601 | parser = subparsers.add_parser( |
| 602 | "dead", |
| 603 | help="Find symbols with no references and no importers — dead code candidates.", |
| 604 | description=__doc__, |
| 605 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 606 | ) |
| 607 | parser.add_argument( |
| 608 | "--kind", "-k", default=None, metavar="KIND", dest="kind_filter", |
| 609 | help="Restrict to symbols of this kind (function, class, method, async_function, …).", |
| 610 | ) |
| 611 | parser.add_argument( |
| 612 | "--include-tests", action="store_true", dest="include_tests", |
| 613 | help=( |
| 614 | "Include test files (paths containing 'test' or 'spec') in the analysis. " |
| 615 | "Tests are excluded by default because pytest discovers them by naming " |
| 616 | "convention rather than by reference, which produces thousands of false positives." |
| 617 | ), |
| 618 | ) |
| 619 | parser.add_argument( |
| 620 | "--exclude-private", action="store_true", dest="exclude_private", |
| 621 | help="Exclude symbols whose name starts with '_'.", |
| 622 | ) |
| 623 | parser.add_argument( |
| 624 | "--high-confidence-only", action="store_true", dest="high_confidence_only", |
| 625 | help="Show only HIGH confidence candidates (module not imported).", |
| 626 | ) |
| 627 | parser.add_argument( |
| 628 | "--path", "-p", default=None, metavar="GLOB", dest="path_filter", |
| 629 | help="Restrict to files matching this glob pattern (e.g. 'musehub/services/*').", |
| 630 | ) |
| 631 | parser.add_argument( |
| 632 | "--language", "-l", default="Python", metavar="LANG", dest="language_filter", |
| 633 | help=( |
| 634 | "Restrict to files of a specific language (default: Python). " |
| 635 | "Use --language all to scan every language including Markdown, TOML, etc. " |
| 636 | "Markdown sections and variables are never Python references, so scanning " |
| 637 | "them without this filter produces thousands of false positives." |
| 638 | ), |
| 639 | ) |
| 640 | parser.add_argument( |
| 641 | "--top", "-n", default=None, type=int, metavar="N", dest="top", |
| 642 | help="Show only the top N candidates.", |
| 643 | ) |
| 644 | parser.add_argument( |
| 645 | "--group-by-file", "-g", action="store_true", dest="group_by_file", |
| 646 | help="Group output by file instead of a flat sorted list.", |
| 647 | ) |
| 648 | parser.add_argument( |
| 649 | "--commit", "-c", default=None, metavar="REF", dest="ref", |
| 650 | help=( |
| 651 | "Analyse a historical committed snapshot instead of the working tree. " |
| 652 | "Accepts a full commit ID, a short prefix, HEAD, or a branch name." |
| 653 | ), |
| 654 | ) |
| 655 | parser.add_argument( |
| 656 | "--workers", "-w", default=8, type=int, metavar="N", dest="workers", |
| 657 | help="Number of parallel worker threads for parsing (default: 8).", |
| 658 | ) |
| 659 | parser.add_argument( |
| 660 | "--max-file-bytes", default=_DEFAULT_MAX_FILE_BYTES, type=int, |
| 661 | metavar="N", dest="max_file_bytes", |
| 662 | help="Skip files larger than N bytes (default: 524288).", |
| 663 | ) |
| 664 | parser.add_argument( |
| 665 | "--no-color", action="store_true", dest="no_color", |
| 666 | help="Disable ANSI colour output.", |
| 667 | ) |
| 668 | parser.add_argument( |
| 669 | "--json", "-j", action="store_true", dest="json_out", |
| 670 | help="Emit results as JSON.", |
| 671 | ) |
| 672 | parser.add_argument( |
| 673 | "--delete", action="store_true", dest="delete", |
| 674 | help=( |
| 675 | "Interactively delete dead symbols from the working tree (Python only). " |
| 676 | "Prompts for each candidate unless --yes is also given." |
| 677 | ), |
| 678 | ) |
| 679 | parser.add_argument( |
| 680 | "--yes", "-y", action="store_true", dest="yes", |
| 681 | help="Skip confirmation prompts when used with --delete.", |
| 682 | ) |
| 683 | parser.add_argument( |
| 684 | "--allowlist", default=None, metavar="FILE", dest="allowlist", |
| 685 | help=( |
| 686 | "JSON file with a list of symbol addresses to suppress. " |
| 687 | "Example: [\".muse/dead-allowlist.json\"]" |
| 688 | ), |
| 689 | ) |
| 690 | parser.add_argument( |
| 691 | "--compare", default=None, metavar="REF", dest="compare_ref", |
| 692 | help=( |
| 693 | "Diff dead-code results against this commit reference. " |
| 694 | "Shows which symbols newly became dead and which were recovered." |
| 695 | ), |
| 696 | ) |
| 697 | parser.add_argument( |
| 698 | "--count", action="store_true", dest="count_only", |
| 699 | help="Print only the total count of dead-code candidates (scriptable).", |
| 700 | ) |
| 701 | parser.add_argument( |
| 702 | "--save-allowlist", default=None, metavar="FILE", dest="save_allowlist", |
| 703 | help=( |
| 704 | "Save all found dead-code candidate addresses to FILE as a JSON list. " |
| 705 | "Use as input to --allowlist to permanently suppress known false positives." |
| 706 | ), |
| 707 | ) |
| 708 | parser.set_defaults(func=run) |
| 709 | |
| 710 | # ── Main logic ──────────────────────────────────────────────────────────────── |
| 711 | |
| 712 | def run(args: argparse.Namespace) -> None: |
| 713 | """Find symbols with no references and no importers — dead code candidates. |
| 714 | |
| 715 | Scans the working tree (or a historical snapshot with ``--commit``) for |
| 716 | symbols that are never called, never accessed, and whose containing module |
| 717 | is never imported by anything else in the codebase. Both conditions must |
| 718 | hold simultaneously — a symbol in an imported module is still reachable |
| 719 | even if unused internally. |
| 720 | |
| 721 | Agent quickstart |
| 722 | ---------------- |
| 723 | :: |
| 724 | |
| 725 | muse code dead --json |
| 726 | muse code dead --high-confidence-only --json |
| 727 | muse code dead --kind function --exclude-tests --json |
| 728 | muse code dead --commit HEAD~10 --json |
| 729 | |
| 730 | JSON fields |
| 731 | ----------- |
| 732 | source Working tree label or commit ref used. |
| 733 | total_files_scanned Number of files processed. |
| 734 | total_symbols_scanned Number of symbols analysed. |
| 735 | high_confidence_count Candidates with high-confidence dead classification. |
| 736 | medium_confidence_count Candidates with medium-confidence classification. |
| 737 | results List of candidate objects: ``address``, ``kind``, |
| 738 | ``confidence``, ``file``, ``line``. |
| 739 | |
| 740 | Exit codes |
| 741 | ---------- |
| 742 | 0 Analysis complete (dead candidates may or may not be found). |
| 743 | 1 Invalid arguments or ref not found. |
| 744 | 2 Not inside a Muse repository. |
| 745 | """ |
| 746 | kind_filter: str | None = args.kind_filter |
| 747 | exclude_tests: bool = not args.include_tests |
| 748 | exclude_private: bool = args.exclude_private |
| 749 | high_confidence_only: bool = args.high_confidence_only |
| 750 | path_filter: str | None = args.path_filter |
| 751 | raw_lang: str = args.language_filter |
| 752 | language_filter: str | None = None if raw_lang.lower() == "all" else raw_lang |
| 753 | top: int | None = (clamp_int(args.top, 1, 100_000, 'top') if args.top is not None else None) |
| 754 | group_by_file: bool = args.group_by_file |
| 755 | ref: str | None = args.ref |
| 756 | compare_ref: str | None = args.compare_ref |
| 757 | workers: int = min(max(1, args.workers), _MAX_WORKERS) |
| 758 | max_file_bytes: int = max(args.max_file_bytes, _MIN_FILE_BYTES) |
| 759 | json_out: bool = args.json_out |
| 760 | do_delete: bool = args.delete |
| 761 | auto_yes: bool = args.yes |
| 762 | allowlist_path: str | None = args.allowlist |
| 763 | count_only: bool = args.count_only |
| 764 | save_allowlist_path: str | None = args.save_allowlist |
| 765 | use_color: bool = not args.no_color and sys.stdout.isatty() and not json_out and not do_delete |
| 766 | |
| 767 | if do_delete and compare_ref: |
| 768 | print("❌ --delete and --compare are mutually exclusive.", file=sys.stderr) |
| 769 | raise SystemExit(ExitCode.USER_ERROR) |
| 770 | |
| 771 | root = require_repo() |
| 772 | branch = read_current_branch(root) |
| 773 | |
| 774 | allowlist = _load_allowlist(allowlist_path) |
| 775 | default_allowlist_path = _dead_allowlist_path(root) |
| 776 | if default_allowlist_path.exists() and not allowlist_path: |
| 777 | allowlist = allowlist | _load_allowlist(str(default_allowlist_path)) |
| 778 | |
| 779 | # ── Resolve file bytes ──────────────────────────────────────────────────── |
| 780 | commit: CommitRecord | None |
| 781 | source_label: str |
| 782 | |
| 783 | if ref is None: |
| 784 | commit = resolve_commit_ref(root, branch, None) |
| 785 | if commit is None: |
| 786 | _err("No commits found — repository may be empty.", use_color) |
| 787 | raise SystemExit(ExitCode.USER_ERROR) |
| 788 | manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {} |
| 789 | file_bytes = _load_file_bytes(root, manifest, from_disk=True) |
| 790 | source_label = "working tree" |
| 791 | if not json_out and not do_delete and not count_only: |
| 792 | _print_header_workdir(len(file_bytes), use_color) |
| 793 | else: |
| 794 | commit = resolve_commit_ref(root, branch, ref) |
| 795 | if commit is None: |
| 796 | _err(f"Commit '{ref}' not found.", use_color) |
| 797 | raise SystemExit(ExitCode.USER_ERROR) |
| 798 | manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {} |
| 799 | file_bytes = _load_file_bytes(root, manifest, from_disk=False) |
| 800 | source_label = f"{commit.commit_id} on {branch}" |
| 801 | if not json_out and not do_delete and not count_only: |
| 802 | _print_header(commit.commit_id, branch, len(file_bytes), use_color) |
| 803 | |
| 804 | # ── Build implicit entry-point graph ───────────────────────────────────── |
| 805 | # Entry-point symbols are framework-wired (e.g. FastAPI route handlers). |
| 806 | # They are externally reachable via the runtime even though no user code |
| 807 | # calls them directly — they must never be flagged as dead. |
| 808 | implicit_graph: ImplicitEdgeGraph = build_implicit_edge_graph(root, manifest) |
| 809 | entry_point_addresses: frozenset[str] = frozenset(implicit_graph.keys()) |
| 810 | |
| 811 | # ── Common scan args ────────────────────────────────────────────────────── |
| 812 | scan_kwargs = _ScanKwargs( |
| 813 | kind_filter=kind_filter, |
| 814 | max_file_bytes=max_file_bytes, |
| 815 | workers=workers, |
| 816 | language_filter=language_filter, |
| 817 | path_filter=path_filter, |
| 818 | exclude_tests=exclude_tests, |
| 819 | exclude_private=exclude_private, |
| 820 | high_confidence_only=high_confidence_only, |
| 821 | allowlist=allowlist, |
| 822 | ) |
| 823 | |
| 824 | candidates, scanned_symbols, elapsed, skipped_count, error_count = _scan_file_bytes( |
| 825 | file_bytes, **scan_kwargs, entry_point_addresses=entry_point_addresses |
| 826 | ) |
| 827 | |
| 828 | if top is not None: |
| 829 | candidates = candidates[:top] |
| 830 | |
| 831 | # ── --save-allowlist ────────────────────────────────────────────────────── |
| 832 | if save_allowlist_path: |
| 833 | _save_allowlist(save_allowlist_path, candidates) |
| 834 | |
| 835 | # ── --compare diff ──────────────────────────────────────────────────────── |
| 836 | compare_commit: CommitRecord | None = None |
| 837 | new_dead: list[_DeadCandidate] = [] |
| 838 | recovered: list[_DeadCandidate] = [] |
| 839 | |
| 840 | if compare_ref: |
| 841 | compare_commit = resolve_commit_ref(root, branch, compare_ref) |
| 842 | if compare_commit is None: |
| 843 | _err(f"--compare commit '{compare_ref}' not found.", use_color) |
| 844 | raise SystemExit(ExitCode.USER_ERROR) |
| 845 | compare_manifest = get_commit_snapshot_manifest(root, compare_commit.commit_id) or {} |
| 846 | compare_file_bytes = _load_file_bytes(root, compare_manifest, from_disk=False) |
| 847 | compare_candidates, _, _, _, _ = _scan_file_bytes( |
| 848 | compare_file_bytes, **scan_kwargs |
| 849 | ) |
| 850 | current_addrs = {c.address for c in candidates} |
| 851 | compare_addrs = {c.address for c in compare_candidates} |
| 852 | new_dead = [c for c in candidates if c.address not in compare_addrs] |
| 853 | recovered_addrs = compare_addrs - current_addrs |
| 854 | recovered = [c for c in compare_candidates if c.address in recovered_addrs] |
| 855 | |
| 856 | # ── Stats ───────────────────────────────────────────────────────────────── |
| 857 | high_count = sum(1 for c in candidates if c.confidence == "high") |
| 858 | medium_count = sum(1 for c in candidates if c.confidence == "medium") |
| 859 | by_kind: _KindCountMap = {} |
| 860 | for c in candidates: |
| 861 | by_kind[c.kind] = by_kind.get(c.kind, 0) + 1 |
| 862 | files_with_dead: set[str] = {c.file_path for c in candidates} |
| 863 | |
| 864 | # ── Output ──────────────────────────────────────────────────────────────── |
| 865 | if count_only and not json_out: |
| 866 | print(len(candidates)) |
| 867 | return |
| 868 | |
| 869 | if json_out: |
| 870 | payload = _DeadPayload( |
| 871 | **make_envelope(elapsed), |
| 872 | source=source_label, |
| 873 | total_files_scanned=len(file_bytes), |
| 874 | total_symbols_scanned=scanned_symbols, |
| 875 | high_confidence_count=high_count, |
| 876 | medium_confidence_count=medium_count, |
| 877 | results=[c.to_dict() for c in candidates], |
| 878 | ) |
| 879 | if compare_commit is not None: |
| 880 | payload["compare_commit_id"] = compare_commit.commit_id |
| 881 | payload["new_dead"] = [c.to_dict() for c in new_dead] |
| 882 | payload["recovered"] = [c.to_dict() for c in recovered] |
| 883 | payload["net_change"] = len(new_dead) - len(recovered) |
| 884 | print(json.dumps(payload)) |
| 885 | return |
| 886 | |
| 887 | if do_delete: |
| 888 | _run_delete_mode(root, candidates, auto_yes) |
| 889 | return |
| 890 | |
| 891 | if not candidates: |
| 892 | print(f" {_c('✅ No dead code candidates found.', _GREEN, use_color=use_color)}") |
| 893 | _print_footer_note(use_color) |
| 894 | return |
| 895 | |
| 896 | if group_by_file: |
| 897 | _print_grouped(candidates, use_color) |
| 898 | else: |
| 899 | _print_flat(candidates, use_color) |
| 900 | |
| 901 | _print_summary( |
| 902 | candidates=candidates, |
| 903 | high_count=high_count, |
| 904 | medium_count=medium_count, |
| 905 | by_kind=by_kind, |
| 906 | files_with_dead=files_with_dead, |
| 907 | scanned_symbols=scanned_symbols, |
| 908 | total_files=len(file_bytes), |
| 909 | skipped_count=skipped_count, |
| 910 | error_count=error_count, |
| 911 | elapsed=elapsed(), |
| 912 | top=top, |
| 913 | use_color=use_color, |
| 914 | ) |
| 915 | |
| 916 | if compare_commit is not None: |
| 917 | _print_compare_diff(new_dead, recovered, compare_commit, use_color) |
| 918 | |
| 919 | # ── Delete mode ─────────────────────────────────────────────────────────────── |
| 920 | |
| 921 | def _run_delete_mode( |
| 922 | root: pathlib.Path, |
| 923 | candidates: list[_DeadCandidate], |
| 924 | auto_yes: bool, |
| 925 | ) -> None: |
| 926 | """Interactively delete dead symbols from the working tree.""" |
| 927 | py_candidates = [c for c in candidates if c.file_path.endswith((".py", ".pyi"))] |
| 928 | skipped_non_py = len(candidates) - len(py_candidates) |
| 929 | |
| 930 | if not py_candidates: |
| 931 | print(" No Python dead-code candidates to delete.") |
| 932 | if skipped_non_py: |
| 933 | print(f" ({skipped_non_py} non-Python candidate(s) skipped — delete is Python-only)") |
| 934 | return |
| 935 | |
| 936 | print(f"\n{_BOLD}muse code dead --delete{_RESET} — {len(py_candidates)} Python candidate(s)") |
| 937 | if skipped_non_py: |
| 938 | print(f" {_GRAY}({skipped_non_py} non-Python candidate(s) not shown){_RESET}") |
| 939 | print(f"{_GRAY}{'─' * 72}{_RESET}") |
| 940 | |
| 941 | # Group by file so we process each file at most once and delete bottom-to-top. |
| 942 | by_file: _DeadByFile = {} |
| 943 | for c in py_candidates: |
| 944 | by_file.setdefault(c.file_path, []).append(c) |
| 945 | |
| 946 | deleted_total = 0 |
| 947 | skipped_total = 0 |
| 948 | failed_total = 0 |
| 949 | |
| 950 | for file_path in sorted(by_file): |
| 951 | file_candidates = by_file[file_path] |
| 952 | |
| 953 | print(f"\n {_CYAN}{_BOLD}{sanitize_display(file_path)}{_RESET} ({len(file_candidates)} candidate(s))") |
| 954 | |
| 955 | # Collect which symbols to delete (after user confirmation). |
| 956 | to_delete: list[_DeadCandidate] = [] |
| 957 | for c in file_candidates: |
| 958 | bare = c.address.split("::")[-1] |
| 959 | conf_label = ( |
| 960 | f"{_RED}HIGH{_RESET}" if c.confidence == "high" else f"{_YELLOW}MED{_RESET}" |
| 961 | ) |
| 962 | kind_label = _BLUE + _kind_icon(c.kind) + _RESET |
| 963 | print(f" {_RED}✗{_RESET} {_WHITE}{bare}{_RESET} {kind_label} [{conf_label}]") |
| 964 | |
| 965 | if auto_yes: |
| 966 | to_delete.append(c) |
| 967 | else: |
| 968 | try: |
| 969 | answer = input(" Delete? [y/N/q] ").strip().lower() |
| 970 | except (EOFError, KeyboardInterrupt): |
| 971 | print("\n Aborted.") |
| 972 | return |
| 973 | if answer == "q": |
| 974 | print(" Aborted.") |
| 975 | return |
| 976 | if answer == "y": |
| 977 | to_delete.append(c) |
| 978 | else: |
| 979 | skipped_total += 1 |
| 980 | |
| 981 | if not to_delete: |
| 982 | continue |
| 983 | |
| 984 | # Find spans for all symbols we will delete, then remove bottom-to-top. |
| 985 | abs_path = (root / file_path).resolve() |
| 986 | # Path traversal guard: ensure the resolved path stays within root. |
| 987 | try: |
| 988 | abs_path.relative_to(root.resolve()) |
| 989 | except ValueError: |
| 990 | print(f" {_YELLOW}⚠ {sanitize_display(str(file_path))!r} escapes repo root — skipping{_RESET}") |
| 991 | failed_total += len(to_delete) |
| 992 | continue |
| 993 | if not abs_path.exists(): |
| 994 | print(f" {_YELLOW}⚠ file not in working tree — skipping{_RESET}") |
| 995 | failed_total += len(to_delete) |
| 996 | continue |
| 997 | |
| 998 | source = abs_path.read_bytes() |
| 999 | spans: list[tuple[int, int, _DeadCandidate]] = [] |
| 1000 | for c in to_delete: |
| 1001 | parts = c.address.split("::") |
| 1002 | bare = parts[-1] |
| 1003 | parent_class = parts[-2] if len(parts) >= 3 else None |
| 1004 | span = _find_symbol_span(source, bare, parent_class) |
| 1005 | if span is None: |
| 1006 | print(f" {_YELLOW}⚠ could not locate {sanitize_display(bare)} in {sanitize_display(file_path)}{_RESET}") |
| 1007 | failed_total += 1 |
| 1008 | else: |
| 1009 | spans.append((*span, c)) |
| 1010 | |
| 1011 | if not spans: |
| 1012 | continue |
| 1013 | |
| 1014 | # Sort descending by start line so later deletions don't shift earlier lines. |
| 1015 | spans.sort(key=lambda x: -x[0]) |
| 1016 | |
| 1017 | lines = source.decode(errors="replace").splitlines(keepends=True) |
| 1018 | for start, end, c in spans: |
| 1019 | bare = c.address.split("::")[-1] |
| 1020 | lines = _delete_symbol_lines(lines, start, end) |
| 1021 | print(f" {_GREEN}✅ deleted {bare}{_RESET} (lines {start}–{end})") |
| 1022 | deleted_total += 1 |
| 1023 | if auto_yes: |
| 1024 | skipped_total = max(0, skipped_total) |
| 1025 | |
| 1026 | abs_path.write_text("".join(lines), encoding="utf-8") |
| 1027 | |
| 1028 | print(f"\n{_GRAY}{'─' * 72}{_RESET}") |
| 1029 | print(f" {_GREEN}Deleted:{_RESET} {deleted_total}") |
| 1030 | if skipped_total: |
| 1031 | print(f" {_GRAY}Skipped:{_RESET} {skipped_total}") |
| 1032 | if failed_total: |
| 1033 | print(f" {_YELLOW}Failed:{_RESET} {failed_total}") |
| 1034 | if deleted_total: |
| 1035 | print(f"\n Run {_CYAN}muse status{_RESET} to review, then {_CYAN}muse commit{_RESET} to record.") |
| 1036 | |
| 1037 | # ── Output helpers ──────────────────────────────────────────────────────────── |
| 1038 | |
| 1039 | def _save_allowlist(path: str, candidates: list[_DeadCandidate]) -> None: |
| 1040 | """Write candidate addresses to *path* as a JSON array.""" |
| 1041 | try: |
| 1042 | pathlib.Path(path).write_text( |
| 1043 | json.dumps(sorted(c.address for c in candidates)), |
| 1044 | encoding="utf-8", |
| 1045 | ) |
| 1046 | logger.info("✅ Saved %d address(es) to %s", len(candidates), path) |
| 1047 | except OSError as exc: |
| 1048 | logger.warning("⚠️ Could not write allowlist %s: %s", path, exc) |
| 1049 | |
| 1050 | def _is_test_file(file_path: str) -> bool: |
| 1051 | lower = file_path.lower() |
| 1052 | return "test" in lower or "spec" in lower |
| 1053 | |
| 1054 | def _err(msg: str, use_color: bool) -> None: |
| 1055 | print(_c(f"❌ {msg}", _RED, _BOLD, use_color=use_color), file=sys.stderr) |
| 1056 | |
| 1057 | def _print_header(commit_id: str, branch: str, total_files: int, use_color: bool) -> None: |
| 1058 | sha = _c(commit_id, _CYAN, _BOLD, use_color=use_color) |
| 1059 | br = _c(branch, _MAGENTA, use_color=use_color) |
| 1060 | n = _c(str(total_files), _BOLD, use_color=use_color) |
| 1061 | print(f"\n{_c('Dead code candidates', _BOLD, use_color=use_color)} — commit {sha} on {br} — {n} files") |
| 1062 | print(_c("━" * 72, _GRAY, use_color=use_color)) |
| 1063 | |
| 1064 | def _print_header_workdir(total_files: int, use_color: bool) -> None: |
| 1065 | label = _c("working tree", _CYAN, _BOLD, use_color=use_color) |
| 1066 | n = _c(str(total_files), _BOLD, use_color=use_color) |
| 1067 | print(f"\n{_c('Dead code candidates', _BOLD, use_color=use_color)} — {label} — {n} files") |
| 1068 | print(_c("━" * 72, _GRAY, use_color=use_color)) |
| 1069 | |
| 1070 | def _kind_icon(kind: str) -> str: |
| 1071 | return { |
| 1072 | "function": "fn", |
| 1073 | "async_function": "async fn", |
| 1074 | "method": "method", |
| 1075 | "async_method": "async method", |
| 1076 | "class": "class", |
| 1077 | "variable": "var", |
| 1078 | "constant": "const", |
| 1079 | }.get(kind, kind) |
| 1080 | |
| 1081 | def _confidence_label(c: _DeadCandidate, use_color: bool) -> str: |
| 1082 | if c.confidence == "high": |
| 1083 | return _c("HIGH", _RED, _BOLD, use_color=use_color) |
| 1084 | return _c("MED ", _YELLOW, use_color=use_color) |
| 1085 | |
| 1086 | def _print_flat(candidates: list[_DeadCandidate], use_color: bool) -> None: |
| 1087 | max_addr = min(max(len(c.address) for c in candidates), 80) |
| 1088 | max_kind = max(len(_kind_icon(c.kind)) for c in candidates) |
| 1089 | prev_conf = "" |
| 1090 | for c in candidates: |
| 1091 | conf = c.confidence |
| 1092 | if conf != prev_conf: |
| 1093 | prev_conf = conf |
| 1094 | if conf == "high": |
| 1095 | label = _c(" ── HIGH CONFIDENCE — not referenced, module not imported", _RED, use_color=use_color) |
| 1096 | else: |
| 1097 | label = _c(" ── MEDIUM CONFIDENCE — not referenced, module is imported", _YELLOW, use_color=use_color) |
| 1098 | print(f"\n{label}") |
| 1099 | print(_c(f" {'─' * 68}", _GRAY, use_color=use_color)) |
| 1100 | addr_str = _c(c.address[:max_addr], _WHITE if conf == "high" else _GRAY, use_color=use_color) |
| 1101 | kind_str = _c(_kind_icon(c.kind).ljust(max_kind), _BLUE, use_color=use_color) |
| 1102 | conf_str = _confidence_label(c, use_color) |
| 1103 | print(f" {addr_str:<{max_addr + 20}} {kind_str} [{conf_str}]") |
| 1104 | |
| 1105 | def _print_grouped(candidates: list[_DeadCandidate], use_color: bool) -> None: |
| 1106 | by_file: _DeadByFile = {} |
| 1107 | for c in candidates: |
| 1108 | by_file.setdefault(c.file_path, []).append(c) |
| 1109 | |
| 1110 | for file_path in sorted(by_file): |
| 1111 | group = by_file[file_path] |
| 1112 | high_n = sum(1 for c in group if c.confidence == "high") |
| 1113 | med_n = sum(1 for c in group if c.confidence == "medium") |
| 1114 | counts = [] |
| 1115 | if high_n: |
| 1116 | counts.append(_c(f"{high_n} high", _RED, use_color=use_color)) |
| 1117 | if med_n: |
| 1118 | counts.append(_c(f"{med_n} med", _YELLOW, use_color=use_color)) |
| 1119 | print(f"\n {_c(file_path, _CYAN, _BOLD, use_color=use_color)} {', '.join(counts)}") |
| 1120 | max_name = min(max(len(c.address.split('::')[-1]) for c in group), 60) |
| 1121 | for c in sorted(group, key=lambda x: (x.confidence != "high", x.address)): |
| 1122 | sym_name = c.address.split("::")[-1] if "::" in c.address else c.address |
| 1123 | kind_str = _c(_kind_icon(c.kind), _BLUE, use_color=use_color) |
| 1124 | if c.confidence == "high": |
| 1125 | sym_str = _c(sym_name.ljust(max_name), _WHITE, use_color=use_color) |
| 1126 | marker = _c("✗", _RED, _BOLD, use_color=use_color) |
| 1127 | else: |
| 1128 | sym_str = _c(sym_name.ljust(max_name), _GRAY, use_color=use_color) |
| 1129 | marker = _c("·", _YELLOW, use_color=use_color) |
| 1130 | print(f" {marker} {sym_str} {kind_str}") |
| 1131 | |
| 1132 | def _print_summary( |
| 1133 | candidates: list[_DeadCandidate], |
| 1134 | high_count: int, |
| 1135 | medium_count: int, |
| 1136 | by_kind: _KindCountMap, |
| 1137 | files_with_dead: set[str], |
| 1138 | scanned_symbols: int, |
| 1139 | total_files: int, |
| 1140 | skipped_count: int, |
| 1141 | error_count: int, |
| 1142 | elapsed: float, |
| 1143 | top: int | None, |
| 1144 | use_color: bool, |
| 1145 | ) -> None: |
| 1146 | print(f"\n{_c('━' * 72, _GRAY, use_color=use_color)}") |
| 1147 | print(f"{_c('Summary', _BOLD, use_color=use_color)}") |
| 1148 | print(f" {_c('High confidence', _RED, use_color=use_color):.<50} {high_count:>6}") |
| 1149 | print(f" {_c('Medium confidence', _YELLOW, use_color=use_color):.<50} {medium_count:>6}") |
| 1150 | print(f" {'Total candidates':.<42} {len(candidates):>6}") |
| 1151 | if top is not None: |
| 1152 | print(f" {_c(f'(showing top {top})', _GRAY, use_color=use_color)}") |
| 1153 | print(f" {'Symbols scanned':.<42} {scanned_symbols:>6,}") |
| 1154 | print(f" {'Files with dead symbols':.<42} {len(files_with_dead):>6}") |
| 1155 | print(f" {'Files scanned':.<42} {total_files:>6,}") |
| 1156 | if skipped_count: |
| 1157 | print(f" {_c('Files skipped (too large)', _GRAY, use_color=use_color):.<50} {skipped_count:>6}") |
| 1158 | if error_count: |
| 1159 | print(f" {_c('Files with parse errors', _YELLOW, use_color=use_color):.<50} {error_count:>6}") |
| 1160 | print(f" {'Elapsed':.<42} {elapsed:>5.1f}s") |
| 1161 | |
| 1162 | if by_kind: |
| 1163 | print(f"\n {_c('By kind:', _BOLD, use_color=use_color)}") |
| 1164 | for kind, count in sorted(by_kind.items(), key=lambda x: -x[1]): |
| 1165 | bar_len = min(count // max(1, max(by_kind.values()) // 20), 20) |
| 1166 | bar = _c("█" * bar_len, _BLUE, use_color=use_color) |
| 1167 | print(f" {_kind_icon(kind):<16} {bar} {count:>5,}") |
| 1168 | |
| 1169 | _print_footer_note(use_color) |
| 1170 | |
| 1171 | def _print_compare_diff( |
| 1172 | new_dead: list[_DeadCandidate], |
| 1173 | recovered: list[_DeadCandidate], |
| 1174 | compare_commit: CommitRecord, |
| 1175 | use_color: bool, |
| 1176 | ) -> None: |
| 1177 | """Render the dead-code diff section.""" |
| 1178 | print(f"\n{_c('━' * 72, _GRAY, use_color=use_color)}") |
| 1179 | sha = _c(compare_commit.commit_id, _CYAN, _BOLD, use_color=use_color) |
| 1180 | print(f"{_c('Dead-code diff', _BOLD, use_color=use_color)} vs {sha}") |
| 1181 | net = len(new_dead) - len(recovered) |
| 1182 | sign = "+" if net >= 0 else "" |
| 1183 | colour = _RED if net > 0 else _GREEN if net < 0 else _GRAY |
| 1184 | print(f" Net change: {_c(f'{sign}{net}', colour, use_color=use_color)}") |
| 1185 | if new_dead: |
| 1186 | print(f"\n {_c(f'New dead ({len(new_dead)}):', _RED, use_color=use_color)}") |
| 1187 | for c in new_dead: |
| 1188 | print(f" + {c.address} [{_kind_icon(c.kind)}] [{c.confidence.upper()}]") |
| 1189 | if recovered: |
| 1190 | print(f"\n {_c(f'Recovered ({len(recovered)}):', _GREEN, use_color=use_color)}") |
| 1191 | for c in recovered: |
| 1192 | print(f" - {c.address} [{_kind_icon(c.kind)}]") |
| 1193 | |
| 1194 | def _print_footer_note(use_color: bool) -> None: |
| 1195 | note = ( |
| 1196 | "Dynamic dispatch, exported APIs, and entry points are not detected.\n" |
| 1197 | " Treat results as candidates — verify before deleting.\n" |
| 1198 | " Use --delete to interactively remove candidates from the working tree.\n" |
| 1199 | " Use --allowlist to suppress known false positives." |
| 1200 | ) |
| 1201 | print(f"\n{_c(note, _GRAY, use_color=use_color)}") |
File History
1 commit
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40
docs: add | jq convention to --json section of agent-guide
Sonnet 4.6
1 day ago