"""``muse diff`` — show what has changed since the last commit. ``muse diff`` always answers: **what has changed since my last commit?** That means HEAD vs the actual working tree, regardless of what is staged. The stage is a commit-preparation tool; it does not change the meaning of diff. Usage ----- Everything changed since last commit (default):: muse diff What *will* be committed (staged changes vs HEAD):: muse diff --staged What is *not yet* staged (working tree vs stage):: muse diff --unstaged Two commits:: muse diff Limit output to specific files or directories:: muse diff -p muse/cli/commands/status.py muse diff -p muse/cli/ -p muse/plugins/ muse diff --staged -p muse/cli/commands/status.py Show shelved changes vs HEAD:: muse diff --shelf # most recent shelf entry muse diff --shelf 1 # shelf entry at index 1 CI / agent pipeline usage:: muse diff --exit-code # exits 1 when changes exist, 0 when clean muse diff --json --exit-code # structured output + exit code for scripting """ import argparse import difflib import json import logging import os import pathlib import sys from collections.abc import Callable from typing import TypedDict from muse.core.types import Manifest from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.errors import ExitCode from muse.core.merge_engine import read_merge_state from muse.core.object_store import read_object from muse.core.repo import require_repo from muse.core.refs import read_current_branch from muse.core.commits import resolve_commit_ref from muse.core.snapshots import ( get_commit_snapshot_manifest, get_head_snapshot_manifest, read_snapshot, ) from muse.core.commits import get_head_snapshot_id from muse.core.validation import sanitize_display from muse.core.snapshot import directories_from_manifest from muse.core.cohen_transform import ( CONFLICT_SEPARATOR, annotate_hunk_action, format_conflict_diff, ) from muse.core.semver_classifier import classify_delta from muse.core.timing import start_timer from muse.domain import DomainOp, PatchOp, SnapshotManifest, StagePlugin from muse.plugins.code._query import flat_directory_ops from muse.plugins.code.stage import EMPTY_DIR_OID, read_stage_dir_renames from muse.plugins.code.symbol_diff import delta_summary from muse.plugins.registry import read_domain, resolve_plugin from muse.cli.commands.shelf import _load_shelf, _resolve_entry logger = logging.getLogger(__name__) class _DiffConflictJson(EnvelopeJson): """JSON output for ``muse diff --conflict --json``. Inherits the 6 standard envelope fields from :class:`~muse.core.envelope.EnvelopeJson`. Fields ------ status "conflict" when a merge is in progress, "no_conflict" otherwise. base_commit Common ancestor commit ID, or None if no merge is in progress. ours_commit The commit ID of the local (ours) side of the merge. theirs_commit The commit ID of the incoming (theirs) side of the merge. ours_label Human-readable label for the ours side (usually the branch name). theirs_label Human-readable label for the theirs side. conflicts List of per-path conflict detail dicts (path, ours_id, theirs_id, …). """ status: str base_commit: str | None ours_commit: str | None theirs_commit: str | None ours_label: str theirs_label: str conflicts: list[dict] class _DiffSymbolsJson(TypedDict): """Per-file symbol change summary nested inside :class:`_DiffJson`. Fields ------ added Symbol names added in this file. deleted Symbol names deleted from this file. modified Symbol names whose bodies changed in this file. """ added: list[str] deleted: list[str] modified: list[str] class _DiffJson(EnvelopeJson): """JSON output for ``muse diff --json`` (normal diff mode). Inherits the 6 standard envelope fields from :class:`~muse.core.envelope.EnvelopeJson`. Fields ------ from_ref Start ref of the diff (branch name, commit SHA, or "workdir"). to_ref End ref of the diff (branch name, commit SHA, or "workdir"). from_commit_id Full commit ID for the from side, or None for the working tree. to_commit_id Full commit ID for the to side, or None for the working tree. has_changes True when at least one file differs between the two sides. summary Human-readable one-line summary of the change set. added Paths of files added relative to from_ref. deleted Paths of files deleted relative to from_ref. modified Paths of files modified relative to from_ref. renamed Map of old_path → new_path for renamed files. total_changes Total count of added + deleted + modified + renamed files. symbols Per-file symbol diff (path → _DiffSymbolsJson). sem_ver_bump Semantic-version bump classification ("major", "minor", "patch", or None when not determinable). breaking_changes List of symbol addresses with breaking API changes. """ from_ref: str to_ref: str from_commit_id: str | None to_commit_id: str | None has_changes: bool summary: str added: list[str] deleted: list[str] modified: list[str] renamed: dict[str, str] total_changes: int symbols: dict[str, _DiffSymbolsJson] sem_ver_bump: str breaking_changes: list[str] _MAX_INLINE_CHILDREN = 12 # Sentinel: the two-space + "L" prefix used by the domain plugin to annotate # symbol locations inside op summaries (e.g. "added function foo L4–8"). _LOC_SEP = " L" # ── Colour helpers ──────────────────────────────────────────────────────────── # Colours are applied only when stdout is a real TTY. When output is piped or # redirected (e.g. into an agent tool, a file, or `less`) the raw text is # emitted without escape sequences. Pass NO_COLOR=1 or TERM=dumb to force # plain output even on a TTY. def _use_color() -> bool: """Return True when ANSI colours should be emitted to stdout.""" if os.environ.get("NO_COLOR") or os.environ.get("TERM") == "dumb": return False return sys.stdout.isatty() def _green(text: str) -> str: return f"\033[32m{text}\033[0m" if _use_color() else text def _red(text: str) -> str: return f"\033[31m{text}\033[0m" if _use_color() else text def _yellow(text: str) -> str: return f"\033[33m{text}\033[0m" if _use_color() else text def _cyan(text: str) -> str: return f"\033[36m{text}\033[0m" if _use_color() else text def _bold(text: str) -> str: return f"\033[1m{text}\033[0m" if _use_color() else text # ── Op categorization ───────────────────────────────────────────────────────── # ── Display helpers ─────────────────────────────────────────────────────────── def _split_loc(summary: str) -> tuple[str, str]: """Split ``'added function foo L4–8'`` into ``('added function foo', 'L4–8')``. Returns the original string and an empty loc when no location suffix is present (e.g. cross-file move annotations that carry no line data). """ if _LOC_SEP in summary: label, _, loc = summary.rpartition(_LOC_SEP) return label, f"L{loc}" return summary, "" def _print_child_ops(child_ops: list[DomainOp]) -> None: """Render symbol-level child ops with aligned columns and colours. Labels are left-padded to a uniform width within the group so the line-range column (``L{start}–{end}``) lines up vertically. Shows up to ``_MAX_INLINE_CHILDREN`` entries inline; summarises the rest on a single trailing line. """ visible = child_ops[:_MAX_INLINE_CHILDREN] overflow = len(child_ops) - len(visible) rows: list[tuple[str, str, str]] = [] for cop in visible: if cop["op"] == "insert": label, loc = _split_loc(cop["content_summary"]) rows.append(("insert", label, loc)) elif cop["op"] == "delete": label, loc = _split_loc(cop["content_summary"]) rows.append(("delete", label, loc)) elif cop["op"] == "replace": label, loc = _split_loc(cop["new_summary"]) rows.append(("replace", label, loc)) elif cop["op"] == "move": label = f"{cop['address']} ({cop['from_position']} → {cop['to_position']})" rows.append(("move", label, "")) else: rows.append(("unknown", "", "")) for i, (op_type, label, loc) in enumerate(rows): is_last = (i == len(rows) - 1) and overflow == 0 connector = "└─" if is_last else "├─" if op_type == "insert": styled = _green(label) elif op_type == "delete": styled = _red(label) elif op_type == "replace": styled = _yellow(label) elif op_type == "move": styled = _cyan(label) else: styled = label suffix = f" {loc}" if loc else "" print(f" {connector} {styled}{suffix}") if overflow > 0: print(f" └─ … and {overflow} more") def _print_structured_delta(ops: list[DomainOp]) -> int: """Print a colour-coded delta op-by-op. Returns the number of ops printed. Colour scheme mirrors standard diff conventions: - Green → added (A) - Red → deleted (D) - Yellow → modified (M) - Cyan → moved / renamed (R) Each branch checks ``op["op"]`` directly so mypy can narrow the TypedDict union to the specific subtype before accessing its fields. """ for op in ops: if op["op"] == "insert": print(_green(f"A {op['address']}")) elif op["op"] == "delete": print(_red(f"D {op['address']}")) elif op["op"] == "replace": print(_yellow(f"M {op['address']}")) elif op["op"] == "move": print( _cyan(f"R {op['address']} ({op['from_position']} → {op['to_position']})") ) elif op["op"] == "rename": print(_cyan(f"R {op['from_address']} → {op['address']}")) elif op["op"] == "patch": child_ops = op["child_ops"] # Use the authoritative file_change field set by build_diff_ops. # Default to "modified" for PatchOps from older callers that # predate this field. fc = op.get("file_change", "modified") if fc == "added": print(_green(f"A {op['address']}")) elif fc == "deleted": print(_red(f"D {op['address']}")) else: print(_yellow(f"M {op['address']}")) _print_child_ops(child_ops) return len(ops) def _print_text_diff( base_files: Manifest, target_files: Manifest, root: pathlib.Path, workdir: pathlib.Path | None, ) -> int: """Print a coloured unified diff for every changed file. Returns change count.""" base_paths = set(base_files) target_paths = set(target_files) changed = ( sorted(target_paths - base_paths) # added + sorted(base_paths - target_paths) # removed + sorted( # modified p for p in base_paths & target_paths if base_files[p] != target_files[p] ) ) for path in changed: # Sanitize the path before using it in diff headers so that file # names containing ANSI escape sequences cannot spoof terminal output. safe_path = sanitize_display(path) # Read base content. if path in base_files: raw_base = read_object(root, base_files[path]) base_lines = ( raw_base.decode("utf-8", errors="replace").splitlines() if raw_base else [] ) base_label = f"a/{safe_path}" else: base_lines = [] base_label = "/dev/null" # Read target content (object store first, then disk for working tree). if path in target_files: raw_target = read_object(root, target_files[path]) if raw_target is None and workdir is not None: disk = workdir / path if disk.is_file(): raw_target = disk.read_bytes() target_lines = ( raw_target.decode("utf-8", errors="replace").splitlines() if raw_target else [] ) target_label = f"b/{safe_path}" else: target_lines = [] target_label = "/dev/null" hunks = list(difflib.unified_diff( base_lines, target_lines, fromfile=base_label, tofile=target_label, lineterm="", )) if not hunks: continue for line in hunks: if line.startswith("---") or line.startswith("+++"): print(_bold(line)) elif line.startswith("@@"): print(_cyan(line)) elif line.startswith("+"): print(_green(line)) elif line.startswith("-"): print(_red(line)) else: print(line) return len(changed) # ── Registration ────────────────────────────────────────────────────────────── def _print_conflict_diff( root: pathlib.Path, base_commit_id: str | None, ours_commit_id: str | None, theirs_commit_id: str | None, conflict_paths: list[str], path_filter: list[str], *, ours_label: str, theirs_label: str, json_out: bool, elapsed: Callable[[], float], ) -> int: """Render a Cohen-transform labeled diff for every conflicting file. For each path in *conflict_paths*, computes ``base→ours`` and ``base→theirs`` unified diffs and renders them with per-hunk action annotations (``[ours: deleted]``, ``[theirs: inserted]``, …) so the user can immediately see *what each side did* rather than staring at two opaque blobs. This is the direct implementation of the conflict-presentation insight from Bram Cohen's Manyana project. Credit: Bram Cohen, https://github.com/bramcohen/manyana. Args: root: Repository root. base_commit_id: Merge-base commit ID (``None`` if unavailable). ours_commit_id: Our branch commit ID at merge time. theirs_commit_id: Their branch commit ID. conflict_paths: Paths with unresolved conflicts (from MERGE_STATE). path_filter: If non-empty, only render paths matching this list. ours_label: Human-readable name for the ours side (branch name). theirs_label: Human-readable name for the theirs side. fmt: ``'text'`` or ``'json'``. Returns: Number of conflicting paths rendered. """ base_manifest = get_commit_snapshot_manifest(root, base_commit_id) or {} if base_commit_id else {} ours_manifest = get_commit_snapshot_manifest(root, ours_commit_id) or {} if ours_commit_id else {} theirs_manifest = get_commit_snapshot_manifest(root, theirs_commit_id) or {} if theirs_commit_id else {} paths = [ p for p in sorted(conflict_paths) if not path_filter or any(p == pf or p.startswith(f"{pf}/") for pf in path_filter) ] if json_out: conflicts_out = [] for path in paths: def _lines(manifest: Manifest, disk_fallback: bool = False) -> list[str]: oid = manifest.get(path) if oid: raw = read_object(root, oid) if raw is not None: return raw.decode("utf-8", errors="replace").splitlines(keepends=True) if disk_fallback: disk = root / path if disk.is_file(): return disk.read_text(encoding="utf-8", errors="replace").splitlines(keepends=True) return [] safe = sanitize_display(path) base_lines = _lines(base_manifest) ours_lines = _lines(ours_manifest, disk_fallback=True) theirs_lines = _lines(theirs_manifest) ours_diff = "".join(difflib.unified_diff( base_lines, ours_lines, fromfile=f"base/{safe}", tofile=f"{ours_label}/{safe}", lineterm="", )) theirs_diff = "".join(difflib.unified_diff( base_lines, theirs_lines, fromfile=f"base/{safe}", tofile=f"{theirs_label}/{safe}", lineterm="", )) conflicts_out.append({ "path": safe, "ours_diff": ours_diff, "theirs_diff": theirs_diff, }) print(json.dumps(_DiffConflictJson( **make_envelope(elapsed), status="conflict", base_commit=base_commit_id, ours_commit=ours_commit_id, theirs_commit=theirs_commit_id, ours_label=ours_label, theirs_label=theirs_label, conflicts=conflicts_out, ))) return len(paths) # Text mode — render with color. use_color = _use_color() for path in paths: lines = format_conflict_diff( path, root, base_manifest, ours_manifest, theirs_manifest, read_object, use_color=use_color, ours_label=ours_label, theirs_label=theirs_label, ) for line in lines: print(line) if paths: print(f"\n{len(paths)} conflicting file(s). " f"Run 'muse checkout --ours/--theirs ' to resolve.") return len(paths) def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``muse diff`` subcommand and its flags.""" parser = subparsers.add_parser( "diff", help="Compare working tree against HEAD, or compare two commits.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "commit_a", nargs="?", default=None, help="Base commit ID (default: HEAD).", ) parser.add_argument( "commit_b", nargs="?", default=None, help="Target commit ID (default: working tree).", ) parser.add_argument( "--path", "-p", dest="paths", action="append", default=[], metavar="path", help="Limit diff to this file or directory. Repeat for multiple paths.", ) parser.add_argument( "--staged", action="store_true", help="Show staged changes vs HEAD (what will be committed).", ) parser.add_argument( "--unstaged", action="store_true", help="Show working-tree changes not yet staged (working tree vs stage).", ) parser.add_argument( "--stat", action="store_true", help="Show summary statistics only.", ) parser.add_argument( "--text", action="store_true", help="Show line-level unified diff instead of semantic symbols.", ) parser.add_argument( "--exit-code", "-z", action="store_true", dest="exit_code", help=( "Exit with code 1 when changes are present, 0 when the working " "tree is clean. Useful in CI pipelines and agent preflight checks." ), ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit machine-readable JSON instead of human text.", ) parser.add_argument( "--shelf", action="store_true", dest="shelf", help=( "Show the shelved changes vs HEAD. " "Pass a positional name or index (default 0) to select an entry: " "muse diff --shelf 1" ), ) parser.add_argument( "--conflict", action="store_true", dest="conflict", help=( "Show a Cohen-transform labeled diff for every conflicting file " "in the current in-progress merge. For each conflict, renders " "base→ours and base→theirs diffs side-by-side, with each hunk " "annotated by its action ([inserted], [deleted], [modified]). " "Exits 1 when no merge is in progress and --conflict is forced." ), ) parser.set_defaults(func=run, conflict=False) # ── Manifest filter ─────────────────────────────────────────────────────────── def _filter_manifest(manifest: Manifest, paths: list[str]) -> Manifest: """Return a copy of *manifest* restricted to entries matching *paths*. Each entry in *paths* is treated as a prefix — it matches both exact file paths (``muse/cli/commands/status.py``) and directory prefixes (``muse/cli/``). An empty *paths* list returns the manifest unchanged. """ if not paths: return manifest normalised = [p.rstrip("/") for p in paths] return { rel: oid for rel, oid in manifest.items() if any(rel == p or rel.startswith(f"{p}/") for p in normalised) } # ── Command entry point ─────────────────────────────────────────────────────── def run(args: argparse.Namespace) -> None: """Show what has changed since the last commit. Default: HEAD vs working tree (everything changed, staged or not). Use ``--staged`` to see only what will be committed; ``--unstaged`` for un-staged edits only. ``--exit-code`` exits 1 when changes exist (useful in CI preflight scripts). Agent quickstart ---------------- :: muse diff --json muse diff --staged --json muse diff HEAD~3 HEAD --json muse diff --exit-code --json JSON fields ----------- from_ref Start ref label (``"HEAD"`` or commit SHA). to_ref End ref label (``"working tree"`` or commit SHA). from_commit_id Full start commit ID, or ``null``. to_commit_id Full end commit ID, or ``null``. has_changes ``true`` when any diff exists. summary Human-readable summary string. added List of added file paths. deleted List of deleted file paths. modified List of modified file paths. renamed Map of old path → new path. total_changes Total number of changed files. symbols Per-file symbol diff: ``added``, ``deleted``, ``modified``. sem_ver_bump Suggested semver bump level or ``null``. breaking_changes List of breaking symbol addresses. Exit codes ---------- 0 No changes (or changes exist but ``--exit-code`` not set). 1 Changes exist and ``--exit-code`` was passed; or invalid arguments. 2 Not inside a Muse repository. """ elapsed = start_timer() commit_a: str | None = args.commit_a commit_b: str | None = args.commit_b # Support a..b range syntax as sugar for two positional args. if commit_a is not None and commit_b is None and ".." in commit_a: commit_a, commit_b = commit_a.split("..", 1) path_filter: list[str] = args.paths staged: bool = args.staged unstaged: bool = args.unstaged shelf: bool = args.shelf stat: bool = args.stat text: bool = args.text exit_code: bool = args.exit_code json_out: bool = args.json_out conflict: bool = getattr(args, "conflict", False) if shelf and staged: if json_out: print(json.dumps({"error": "mutually_exclusive", "flags": ["--shelf", "--staged"], "message": "--shelf and --staged are mutually exclusive"})) print("❌ --shelf and --staged are mutually exclusive.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if shelf and unstaged: if json_out: print(json.dumps({"error": "mutually_exclusive", "flags": ["--shelf", "--unstaged"], "message": "--shelf and --unstaged are mutually exclusive"})) print("❌ --shelf and --unstaged are mutually exclusive.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if staged and unstaged: if json_out: print(json.dumps({"error": "mutually_exclusive", "flags": ["--staged", "--unstaged"], "message": "--staged and --unstaged are mutually exclusive"})) print("❌ --staged and --unstaged are mutually exclusive.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() # ── Cohen-transform conflict diff mode ──────────────────────────────────── # Activated by --conflict, or automatically when a merge is in progress and # no positional refs are given. Renders a labeled two-sided diff for each # conflicting file (base→ours and base→theirs, hunk-annotated). if conflict or (not commit_a and not commit_b and not shelf): merge_state = read_merge_state(root) if conflict and merge_state is None: if json_out: print(json.dumps({"error": "no_merge_in_progress", "message": "--conflict requires an in-progress merge — no MERGE_STATE.json found"})) print( "❌ --conflict requires an in-progress merge. " "No MERGE_STATE.json found.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if merge_state is not None and conflict: branch = read_current_branch(root) ours_label = branch or "ours" theirs_label = merge_state.other_branch or "theirs" count = _print_conflict_diff( root, merge_state.base_commit, merge_state.ours_commit, merge_state.theirs_commit, merge_state.conflict_paths, path_filter, ours_label=ours_label, theirs_label=theirs_label, json_out=json_out, elapsed=elapsed, ) raise SystemExit(1 if count > 0 else 0) # No merge in progress — fall through to normal diff logic. # ── End conflict diff mode ──────────────────────────────────────────────── branch = read_current_branch(root) domain = read_domain(root) plugin = resolve_plugin(root) # Cached commit ID for each resolved ref — populated alongside manifests so # agents can track exactly which commits were compared. from_commit_id: str | None = None to_commit_id: str | None = None def _resolve_manifest(ref: str) -> tuple[Manifest, str | None]: """Resolve a ref to (manifest, commit_id). Exits on unknown ref.""" resolved = resolve_commit_ref(root, branch, ref) if resolved is None: print(f"⚠️ Commit '{sanitize_display(ref)}' not found.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) manifest = get_commit_snapshot_manifest(root, resolved.commit_id) or {} return manifest, resolved.commit_id # Track human-readable ref labels for JSON output so agents know exactly # what was compared without having to re-parse positional arguments. from_ref: str to_ref: str if shelf: # --shelf: diff HEAD vs the shelved snapshot at name/index N. # commit_a is reused as the optional name/index argument (default 0). shelf_selector: str | None = commit_a entries = _load_shelf(root) if not entries: msg = "no shelf entries — run `muse shelf save` to save changes first" if json_out: print(json.dumps({"error": msg, "exit_code": ExitCode.USER_ERROR})) else: print(f"❌ {msg}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) try: idx, entry = _resolve_entry(entries, shelf_selector) except ValueError as exc: if json_out: print(json.dumps({"error": str(exc), "exit_code": ExitCode.USER_ERROR})) else: print(f"❌ {exc}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) label = f"shelf/{idx} {entry['name']}" if entry.get("intent"): label += f": {entry['intent']}" head_files = get_head_snapshot_manifest(root, branch) or {} head_ref = resolve_commit_ref(root, branch, None) from_commit_id = head_ref.commit_id if head_ref else None # Shelf stores the full snapshot directly — no delta reconstruction needed. shelf_files: Manifest = dict(entry["snapshot"]) for deleted_path in entry["deleted"]: shelf_files.pop(deleted_path, None) base_snap = SnapshotManifest( files=head_files, domain=domain, directories=directories_from_manifest(head_files), ) target_snap = SnapshotManifest( files=shelf_files, domain=domain, directories=directories_from_manifest(shelf_files), ) from_ref, to_ref = "HEAD", label elif commit_a is None: head_files = get_head_snapshot_manifest(root, branch) or {} # Read the full HEAD snapshot so we pick up explicitly tracked empty # directories (snapshot.directories) — not just those derivable from file paths. _head_snap_id = get_head_snapshot_id(root, branch) _head_snap = read_snapshot(root, _head_snap_id) if _head_snap_id else None head_dirs: list[str] = _head_snap.directories if _head_snap is not None else [] head_ref = resolve_commit_ref(root, branch, None) from_commit_id = head_ref.commit_id if head_ref else None if staged and isinstance(plugin, StagePlugin): # --staged: what will be committed (stage vs HEAD). # # Build the staged manifest as HEAD + explicit stage entries. # We do NOT delegate to plugin.snapshot() here because when the # stage index is empty that method falls through to a full # working-tree walk — correct for `muse commit` (no stage = commit # everything) but wrong for `muse diff --staged` (no stage = # nothing staged = view equals HEAD, so has_changes=False). base_snap = SnapshotManifest(files=head_files, domain=domain, directories=sorted(set(directories_from_manifest(head_files)) | set(head_dirs))) staged_entries = plugin.read_stage(root) staged_files: dict[str, str] = dict(head_files) staged_dirs: list[str] = [] for _path, _entry in staged_entries.items(): if _path.startswith(".muse/"): continue if _entry["object_id"] == EMPTY_DIR_OID: # Sentinel entry: directory placeholder — keep for SnapshotManifest.directories # but never put in files (has no content object to read). if _entry["mode"] == "A": staged_dirs.append(_path) continue if _entry["mode"] == "D": staged_files.pop(_path, None) else: staged_files[_path] = _entry["object_id"] target_snap = SnapshotManifest( files=staged_files, domain=domain, directories=sorted(set(directories_from_manifest(staged_files)) | set(staged_dirs)), ) from_ref, to_ref = "HEAD", "staged" elif unstaged and isinstance(plugin, StagePlugin): # --unstaged: working-tree changes not yet added to the stage. base_snap = plugin.snapshot(root) # staged manifest target_snap = plugin.workdir_snapshot(root) from_ref, to_ref = "staged", "working tree" elif isinstance(plugin, StagePlugin): # Default with staging: HEAD vs full working tree. base_snap = SnapshotManifest(files=head_files, domain=domain, directories=sorted(set(directories_from_manifest(head_files)) | set(head_dirs))) target_snap = plugin.workdir_snapshot(root) from_ref, to_ref = "HEAD", "working tree" else: # No staging support: HEAD vs working tree (original behaviour). base_snap = SnapshotManifest(files=head_files, domain=domain, directories=sorted(set(directories_from_manifest(head_files)) | set(head_dirs))) target_snap = plugin.snapshot(root) from_ref, to_ref = "HEAD", "working tree" elif commit_b is None: # Single ref: diff HEAD vs that commit's snapshot. head_files = get_head_snapshot_manifest(root, branch) or {} head_ref = resolve_commit_ref(root, branch, None) from_commit_id = head_ref.commit_id if head_ref else None target_manifest, to_commit_id = _resolve_manifest(commit_a) base_snap = SnapshotManifest(files=head_files, domain=domain, directories=directories_from_manifest(head_files)) target_snap = SnapshotManifest(files=target_manifest, domain=domain, directories=directories_from_manifest(target_manifest)) from_ref, to_ref = "HEAD", commit_a else: base_manifest, from_commit_id = _resolve_manifest(commit_a) target_manifest, to_commit_id = _resolve_manifest(commit_b) base_snap = SnapshotManifest(files=base_manifest, domain=domain, directories=directories_from_manifest(base_manifest)) target_snap = SnapshotManifest(files=target_manifest, domain=domain, directories=directories_from_manifest(target_manifest)) from_ref, to_ref = commit_a, commit_b if path_filter: filtered_base_files = _filter_manifest(base_snap["files"], path_filter) filtered_target_files = _filter_manifest(target_snap["files"], path_filter) base_snap = SnapshotManifest( files=filtered_base_files, domain=domain, directories=directories_from_manifest(filtered_base_files), ) target_snap = SnapshotManifest( files=filtered_target_files, domain=domain, directories=directories_from_manifest(filtered_target_files), ) if text and not json_out: workdir = root if commit_a is None else None changed = _print_text_diff( base_snap["files"], target_snap["files"], root, workdir ) if changed == 0: print("No differences.") if exit_code: raise SystemExit(1 if changed > 0 else 0) return delta = plugin.diff(base_snap, target_snap, repo_root=root) # For live diffs (HEAD vs workdir or HEAD vs staged), inject staged directory # renames that the plugin cannot detect via content-hash matching (empty dirs # have no content). Replace matching delete+insert op pairs with a single # rename op and recompute the summary. if commit_a is None and isinstance(plugin, StagePlugin): _staged_dir_renames = read_stage_dir_renames(root) if _staged_dir_renames: _del_addrs = { op["address"]: op for op in delta["ops"] if op["op"] == "delete" and op["address"].endswith("/") } _ins_addrs = { op["address"]: op for op in delta["ops"] if op["op"] == "insert" and op["address"].endswith("/") } _drop: set[str] = set() _new_rename_ops: list[DomainOp] = [] for _old, _new in sorted(_staged_dir_renames.items()): _old_addr, _new_addr = _old + "/", _new + "/" if _old_addr in _del_addrs and _new_addr in _ins_addrs: _drop.add(_old_addr) _drop.add(_new_addr) _new_rename_ops.append({ "op": "rename", "address": _new_addr, "from_address": _old_addr, }) # type: ignore[arg-type] if _new_rename_ops: _new_ops = [op for op in delta["ops"] if op["address"] not in _drop] _new_ops.extend(_new_rename_ops) delta = dict(delta) # type: ignore[assignment] delta["ops"] = _new_ops delta["summary"] = delta_summary(_new_ops) if json_out: # Categorise ops into file-level buckets and extract symbol-level detail. # # Muse's delta ops are file-level at the top; each file's symbol changes # live in PatchOp.child_ops. The JSON schema exposes both layers so # agents can ask "which files changed?" (added/modified/deleted/renamed) # AND "which symbols changed in each file?" (symbols dict). # # Op type → file bucket: # insert → added # delete → deleted # rename → renamed {from_address: address} # patch (file_change="added") → added (whole-file add via patch) # patch (file_change="deleted") → deleted (whole-file del via patch) # patch (otherwise) → modified # anything else → modified # # Symbol extraction (from PatchOp.child_ops): # child op "insert" → symbols[file].added # child op "delete" → symbols[file].deleted # child op anything else → symbols[file].modified added: list[str] = [] deleted: list[str] = [] modified: list[str] = [] renamed: dict[str, str] = {} # {old_path: new_path} symbols = {} # {file: {added,deleted,modified}} def _sym_name(address: str) -> str: """Extract the symbol name from an address like 'file.py::func_name'.""" return address.split("::")[-1] if "::" in address else address def _collect_child_symbols(file_path: str, child_ops: list[PatchOp]) -> None: """Populate symbols[file_path] from a PatchOp's child_ops list.""" if not child_ops: return buckets = symbols.setdefault(file_path, {"added": [], "deleted": [], "modified": []}) for child in child_ops: name = _sym_name(child.get("address", "")) if not name: continue cop = child.get("op", "") if cop == "insert": buckets["added"].append(name) elif cop == "delete": buckets["deleted"].append(name) else: buckets["modified"].append(name) for op in delta["ops"]: op_type = op["op"] address = op["address"] if op_type == "rename": renamed[op["from_address"]] = address elif op_type == "insert": summary = op.get("content_summary", "") if address.endswith("/") or (isinstance(summary, str) and summary.startswith("directory:")): added.append(address.rstrip("/") + "/") else: added.append(address) elif op_type == "delete": summary = op.get("content_summary", "") if address.endswith("/") or (isinstance(summary, str) and summary.startswith("directory:")): deleted.append(address.rstrip("/") + "/") else: deleted.append(address) elif op_type == "patch": fc = op.get("file_change", "modified") if fc == "added": added.append(address) elif fc == "deleted": deleted.append(address) else: modified.append(address) # Symbol-level detail from child_ops — always collected. _collect_child_symbols(address, op.get("child_ops", [])) else: modified.append(address) has_changes = bool(delta["ops"]) classification = classify_delta(delta, repo_root=root) bump = classification.bump _sorted_added = sorted(added) _sorted_deleted = sorted(deleted) _sorted_modified = sorted(modified) print(json.dumps(_DiffJson( **make_envelope(elapsed), from_ref=from_ref, to_ref=to_ref, from_commit_id=from_commit_id, to_commit_id=to_commit_id, has_changes=has_changes, summary=delta["summary"] if has_changes else "", added=_sorted_added, deleted=_sorted_deleted, modified=_sorted_modified, renamed=renamed, total_changes=len(_sorted_added) + len(_sorted_modified) + len(_sorted_deleted) + len(renamed), symbols=symbols, sem_ver_bump=bump, breaking_changes=classification.breaking_addresses, ))) if exit_code: raise SystemExit(1 if has_changes else 0) return if stat: print(delta["summary"] if delta["ops"] else "No differences.") if exit_code: raise SystemExit(1 if delta["ops"] else 0) return changed = _print_structured_delta(delta["ops"]) if changed == 0: print("No differences.") else: print(f"\n{delta['summary']}") if exit_code: raise SystemExit(1 if changed > 0 else 0)