"""muse code breakage — detect symbol-level breakage in the working tree. Checks the current working tree against a committed snapshot for structural breakage that would fail at runtime or import time: 1. **stale_import** — a working-tree file imports a name that exists nowhere in the HEAD snapshot (and is also not defined locally). Severity: warning. 2. **removed_public_method** — a class that appears in both HEAD and the working tree is missing a public method it had in HEAD. This catches public-API regressions before they break callers. Severity: error. Analysis is purely structural — no code is executed, no type checker is invoked. It operates on the committed symbol graph plus a live working-tree parse (results are served from the persistent symbol cache when available, so repeated runs on a warm cache are fast). Usage:: muse code breakage muse code breakage --language Python muse code breakage --path "muse/core/*.py" muse code breakage --commit HEAD~3 muse code breakage --strict muse code breakage --json Flags: ``--language LANG`` Restrict analysis to files of this language (e.g. ``Python``). ``--path PATTERN`` Only check files whose path matches this glob pattern (e.g. ``"muse/core/*.py"``). ``--commit REF`` Diff against this commit instead of HEAD (branch name, commit ID, or tag). Useful for checking "does my working tree still build cleanly against an older baseline?" ``--strict`` Treat warnings as errors: exit non-zero if any warning-level issues are found, not just error-level ones. ``--json`` Emit a machine-readable JSON object. Consumers should check ``$.errors`` and ``$.warnings`` (and respect ``strict``) rather than the exit code alone. """ import argparse import fnmatch import json import logging import pathlib import sys from typing import TypedDict from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.timing import start_timer from muse.core.repo import require_repo from muse.core.types import Manifest from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import resolve_commit_ref from muse.core.snapshots import get_commit_snapshot_manifest from muse.core.symbol_cache import load_symbol_cache from muse.plugins.code._query import is_semantic, language_of, symbols_for_snapshot from muse.plugins.code.ast_parser import SymbolTree from muse.core.validation import sanitize_display type _SymbolTreeMap = dict[str, SymbolTree] type _MethodMap = dict[str, set[str]] logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- class _BreakageIssue(TypedDict): """One breakage finding, serialisable to JSON.""" issue_type: str path: str description: str severity: str # "error" | "warning" class _BreakageOutputJson(EnvelopeJson): """Top-level JSON payload emitted by ``muse code breakage --json``. Fields ------ commit Short commit ID checked against. branch Current branch name. language_filter Language filter passed via ``--language``, or ``None``. path_filter Glob filter passed via ``--path``, or ``None``. strict Whether ``--strict`` was set. file_count Number of files analysed. issues List of :class:`_BreakageIssue` dicts. total Total issue count (errors + warning_count). errors Number of error-severity issues. warning_count Number of warning-severity issues. """ commit: str branch: str language_filter: str | None path_filter: str | None strict: bool file_count: int issues: list[_BreakageIssue] total: int errors: int warning_count: int # --------------------------------------------------------------------------- # Index helpers # --------------------------------------------------------------------------- def _build_head_names_set(head_sym_map: _SymbolTreeMap) -> set[str]: """Return the set of all non-import symbol *names* across HEAD. Used for O(1) stale-import lookup: a working-tree import is stale if and only if the imported name is absent from this set (and also not defined locally in the working-tree file). """ names: set[str] = set() for tree in head_sym_map.values(): for rec in tree.values(): if rec["kind"] != "import": names.add(rec["name"]) return names def _build_head_class_methods( head_sym_map: _SymbolTreeMap, ) -> _MethodMap: """Return a map of ``"file_path::ClassName"`` → ``{method_name, ...}`` from HEAD. Used for Check 2: a class that drops a public method it had in HEAD is flagged as a ``removed_public_method`` breakage. """ class_methods: _MethodMap = {} for fp, tree in head_sym_map.items(): for rec in tree.values(): if rec["kind"] != "method": continue qn: str = rec["qualified_name"] if "." not in qn: continue class_part, _ = qn.rsplit(".", 1) key = f"{fp}::{class_part}" class_methods.setdefault(key, set()).add(rec["name"]) return class_methods # --------------------------------------------------------------------------- # Per-file analysis # --------------------------------------------------------------------------- def _check_file( file_path: str, working_tree: SymbolTree, head_tree: SymbolTree, head_names_set: set[str], head_class_methods: _MethodMap, head_file_paths: frozenset[str], ) -> list[_BreakageIssue]: """Return all breakage issues for one file. Args: file_path: Workspace-relative POSIX path. working_tree: Symbols parsed from the working-tree version of the file (may be empty if the file is new or not semantic). head_tree: Symbols parsed from the HEAD-committed version of the file (empty if the file is new). head_names_set: O(1)-lookup set of all non-import symbol names in the entire HEAD snapshot. head_class_methods: ``"file::Class"`` → public method names in HEAD, used to detect removed methods. head_file_paths: O(1)-lookup set of all file paths in the HEAD snapshot; used to distinguish module imports (e.g. ``from muse.cli.commands import breakage``) from symbol imports so they are not falsely flagged as stale. """ if not working_tree: return [] issues: list[_BreakageIssue] = [] # Names defined locally in the working-tree file (non-import symbols). local_names: set[str] = { rec["name"] for rec in working_tree.values() if rec["kind"] != "import" } # ----------------------------------------------------------------------- # Check 1: stale imports # ----------------------------------------------------------------------- # A working-tree import is stale when its name exists neither in the HEAD # snapshot (anywhere — we use a codebase-wide set for speed) nor is it # defined locally in the same file, AND it does not resolve to a known # module file in the HEAD snapshot. # # Only muse-internal imports are checked. Stdlib, third-party, __future__, # and typing imports are deliberately excluded — they live outside the # Muse symbol graph and can never go "stale" by definition. # # The qualified_name written by the AST parser is: # "import::::" — from import # "import::" — import (module IS the name) # A muse-internal import is one where starts with "muse." or # equals "muse", or (for bare `import muse.X`) the name starts with "muse.". # # Module-import disambiguation: ``from muse.cli.commands import breakage`` # records name="breakage" and source_module="muse.cli.commands". The name # "breakage" will never appear in ``head_names_set`` (which contains symbol # names, not module names), so without a module check it would be a false # positive. We convert source_module to a filesystem path and check # whether ``{path}/{name}.py`` or ``{path}/{name}/__init__.py`` is a known # file in the HEAD snapshot — if so, the import targets a module, not a # symbol, and is valid. # # Complexity: O(1) per import — all lookups hit frozenset/set. for rec in working_tree.values(): if rec["kind"] != "import": continue name: str = rec["name"] if name.startswith("*:"): continue # wildcard imports — cannot check statically # Determine source module from the qualified_name written by ast_parser. # Format: "import::::" or "import::". qn: str = rec.get("qualified_name", "") parts = qn.split("::") if len(parts) == 3: # from import source_module = parts[1] else: # bare `import ` — the name IS the module source_module = name # Skip anything that is not a muse-internal import. if source_module != "muse" and not source_module.startswith("muse."): continue # ``parts[2]`` is the *original* pre-alias name stored in qualified_name # by the AST parser. For ``from muse.core.store import CommitRecord as # MuseCliCommit``, parts[2]="CommitRecord" and name="MuseCliCommit". # We must look up the original in the HEAD snapshot — the alias will # never appear as a top-level symbol anywhere. original = parts[2] if len(parts) == 3 else name if original not in head_names_set and name not in local_names: # Check whether the original name resolves to a submodule file. # # Two cases: # # 1. ``from muse.cli.commands import age`` (len==3): # source_module="muse.cli.commands", original="age" # → check "muse/cli/commands/age.py" # # 2. ``import muse.core.rebase`` (len!=3, bare import): # source_module=name=original="muse.core.rebase" # → the dotted name IS the full module path # → check "muse/core/rebase.py" directly (not appending again) module_dir = source_module.replace(".", "/") if len(parts) == 3: is_module = ( f"{module_dir}/{original}.py" in head_file_paths or f"{module_dir}/{original}/__init__.py" in head_file_paths ) else: # bare import: module_dir already encodes the full path is_module = ( f"{module_dir}.py" in head_file_paths or f"{module_dir}/__init__.py" in head_file_paths ) if is_module: continue # valid module import — not stale issues.append( _BreakageIssue( issue_type="stale_import", path=file_path, description=( f"imports '{original}'" f"{f' (as {name!r})' if name != original else ''}" " but no symbol or module with that name " "exists in the HEAD snapshot" ), severity="warning", ) ) # ----------------------------------------------------------------------- # Check 2: removed public methods # ----------------------------------------------------------------------- # For each class that appears in BOTH HEAD and the working tree, flag any # public method that existed in HEAD but is missing from the working-tree # class body. Private methods (``_``-prefixed) are intentionally excluded # — they are implementation detail, not public API. # # Only applies to Python / Python-stub files; other adapters may not # produce reliable method records. suffix = pathlib.PurePosixPath(file_path).suffix.lower() if suffix in {".py", ".pyi"} and head_tree: # Build working-tree class → methods map for this file. working_class_methods: _MethodMap = {} for rec in working_tree.values(): if rec["kind"] != "method": continue qn = rec["qualified_name"] if "." not in qn: continue class_part, _ = qn.rsplit(".", 1) working_class_methods.setdefault(class_part, set()).add(rec["name"]) for rec in head_tree.values(): if rec["kind"] != "class": continue class_name: str = rec["name"] head_key = f"{file_path}::{class_name}" expected = head_class_methods.get(head_key, set()) actual = working_class_methods.get(class_name, set()) for method in sorted(expected - actual): if not method.startswith("_"): issues.append( _BreakageIssue( issue_type="removed_public_method", path=file_path, description=( f"class '{class_name}' is missing public method " f"'{method}' that existed in HEAD" ), severity="error", ) ) return issues # --------------------------------------------------------------------------- # CLI registration # --------------------------------------------------------------------------- def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``breakage`` subcommand.""" parser = subparsers.add_parser( "breakage", help="Detect symbol-level breakage in the working tree vs HEAD snapshot.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--language", "-l", default=None, metavar="LANG", dest="language", help="Restrict to files of this language (e.g. Python).", ) parser.add_argument( "--commit", "-c", default=None, metavar="REF", dest="commit_ref", help="Check against this commit/branch/tag instead of HEAD.", ) parser.add_argument( "--path", "-p", default=None, metavar="PATTERN", dest="path_filter", help="Only check files matching this glob pattern (e.g. 'muse/core/*.py').", ) parser.add_argument( "--strict", action="store_true", dest="strict", help="Exit non-zero if any warnings are found (not just errors).", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit results as JSON.", ) parser.set_defaults(func=run) def run(args: argparse.Namespace) -> None: """Detect symbol-level breakage in the working tree vs the HEAD snapshot. Compares the working tree against the committed HEAD snapshot (or a named ref) for two classes of structural breakage: stale imports (a file imports a name that no longer exists in the snapshot) and removed public methods (a class drops a method that callers may depend on). No code is executed — analysis is purely structural. Agent quickstart ---------------- :: muse code breakage --json muse code breakage --language Python --json muse code breakage --path "muse/core/*.py" --json muse code breakage --strict --json JSON fields ----------- commit Short commit ID checked against. branch Current branch name. language_filter Language filter passed via ``--language``; ``null`` if none. path_filter Glob filter passed via ``--path``; ``null`` if none. strict ``true`` if ``--strict`` was set. file_count Number of files analysed. issues List of issue objects; each has ``issue_type``, ``file_path``, ``description``, and ``severity`` (``"error"`` or ``"warning"``). total Total issue count (``errors + warning_count``). errors Number of error-severity issues. warning_count Number of warning-severity issues. exit_code 0 = clean; 1 = errors found (or warnings under ``--strict``). Exit codes ---------- 0 No errors (warnings tolerated unless ``--strict``). 1 Errors found; or warnings found when ``--strict`` is active. 2 Not inside a Muse repository. """ elapsed = start_timer() language: str | None = args.language json_out: bool = args.json_out commit_ref: str | None = getattr(args, "commit_ref", None) path_filter: str | None = getattr(args, "path_filter", None) strict: bool = getattr(args, "strict", False) root = require_repo() branch = read_current_branch(root) # Resolve branch names before calling resolve_commit_ref, which only # handles commit SHAs and HEAD~N notation. resolved_ref: str | None = commit_ref if commit_ref is not None: branch_head = get_head_commit_id(root, commit_ref) if branch_head is not None: resolved_ref = branch_head # promote to full commit ID commit = resolve_commit_ref(root, branch, resolved_ref) if commit is None: ref_label = commit_ref or "HEAD" print(f"❌ No commit found for ref '{ref_label}'.", file=sys.stderr) raise SystemExit(1) manifest = get_commit_snapshot_manifest(root, commit.commit_id) if manifest is None: print( f"❌ Cannot read snapshot for commit {commit.commit_id} — " "repository may be corrupt.", file=sys.stderr, ) raise SystemExit(1) # Apply glob path filter before loading symbols (avoids parsing unused files). filtered_manifest: Manifest = ( {fp: oid for fp, oid in manifest.items() if fnmatch.fnmatch(fp, path_filter)} if path_filter is not None else dict(manifest) ) # Load HEAD symbols (committed, from object store) and working-tree symbols # (from disk) in a single shared cache cycle to minimise I/O. shared_cache = load_symbol_cache(root) head_sym_map = symbols_for_snapshot( root, filtered_manifest, language_filter=language, cache=shared_cache, ) working_sym_map = symbols_for_snapshot( root, filtered_manifest, workdir=root, language_filter=language, cache=shared_cache, ) shared_cache.save() # Build O(1) indexes once; _check_file uses them per-file. head_names_set = _build_head_names_set(head_sym_map) head_class_methods = _build_head_class_methods(head_sym_map) head_file_paths = frozenset(head_sym_map.keys()) all_issues: list[_BreakageIssue] = [] for file_path in sorted(filtered_manifest.keys()): if not is_semantic(file_path): continue if language and language_of(file_path) != language: continue working_tree = working_sym_map.get(file_path, {}) head_tree = head_sym_map.get(file_path, {}) issues = _check_file( file_path, working_tree, head_tree, head_names_set, head_class_methods, head_file_paths, ) all_issues.extend(issues) errors = sum(1 for i in all_issues if i["severity"] == "error") warnings = sum(1 for i in all_issues if i["severity"] == "warning") exit_code = 1 if (errors > 0 or (strict and warnings > 0)) else 0 if json_out: print(json.dumps(_BreakageOutputJson( **make_envelope(elapsed, exit_code=exit_code), commit=commit.commit_id, branch=branch, language_filter=language, path_filter=path_filter, strict=strict, file_count=len(filtered_manifest), issues=list(all_issues), total=len(all_issues), errors=errors, warning_count=warnings, ))) raise SystemExit(exit_code) ref_label = commit.commit_id if commit_ref: ref_label = f"{commit_ref} ({commit.commit_id})" print(f"\nBreakage check — working tree vs {ref_label}") if language: print(f" (language: {language})") if path_filter: print(f" (path: {sanitize_display(path_filter)})") print("─" * 62) if not all_issues: print("\n ✅ No structural breakage detected.") raise SystemExit(0) for issue in all_issues: icon = "🔴" if issue["severity"] == "error" else "⚠️ " print(f"\n{icon} {sanitize_display(issue['issue_type'])}") print(f" {sanitize_display(issue['path'])}") print(f" {issue['description']}") print(f"\n {errors} error(s), {warnings} warning(s)") raise SystemExit(exit_code)