"""``muse clean`` — remove untracked files from the working tree. Scans the working tree against HEAD's snapshot and removes files that are not tracked in any commit. By design, ``--force`` is required to actually delete files; without it the command behaves as a dry-run (equivalent to passing ``-n``). Usage:: muse clean -n # preview — show what would be removed muse clean -f # delete untracked files muse clean -f -d # also delete untracked directories muse clean -f -x # also delete .museignore-excluded files muse clean -f -d -x # everything untracked + ignored muse clean -f --json # machine-readable result All subcommands accept ``--json`` for machine-readable output:: { "status": "clean" | "would_remove" | "removed", "removed": ["path/to/file.txt", ...], "dirs_removed": ["path/to/dir", ...], "count": N, "dry_run": true | false, "duration_ms": 0.000123, "exit_code": 0 } ``status`` values: - ``"clean"`` — nothing to remove (dry-run or force, no untracked files) - ``"would_remove"`` — dry-run with untracked files found; nothing deleted - ``"removed"`` — force-clean completed; files were deleted ``duration_ms`` Wall-clock time from argument parsing to output. ``exit_code`` Mirrors the process exit code: ``0`` for success (clean, would_remove, removed) and ``1`` when files exist but neither --force nor --dry-run given. Lets agents evaluate the result without inspecting the process exit code separately. Exit codes:: 0 — nothing to clean, or clean completed successfully 1 — untracked files exist but neither --force nor --dry-run given 2 — not a Muse repository 3 — I/O error during deletion Security model:: Every candidate path returned by ``walk_workdir`` is validated to sit inside the repository root before any deletion is attempted. Paths that resolve outside the root are skipped with a warning; they cannot be produced by ``walk_workdir`` under normal operation but the guard ensures correctness even if the walker is extended in the future. Directory removal only touches directories whose direct children were all removed in the current run. The repository root, ``.muse/``, and any path inside ``.muse/`` are unconditionally protected. """ import argparse import fnmatch import json import logging import pathlib import sys from typing import TypedDict from muse.core.errors import ExitCode from muse.core.ignore import load_ignore_config, resolve_patterns from muse.core.repo import require_repo from muse.core.snapshot import walk_workdir from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot from muse.core.validation import sanitize_display from muse.plugins.registry import read_domain from muse.core.types import Manifest from muse.core.paths import muse_dir as _muse_dir from muse.core.timing import start_timer from muse.core.envelope import EnvelopeJson, make_envelope logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # JSON wire format # --------------------------------------------------------------------------- class _CleanResultJson(EnvelopeJson): """JSON output for ``muse clean``.""" status: str # "clean" | "would_remove" | "removed" removed: list[str] dirs_removed: list[str] count: int dry_run: bool # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _is_ignored(path: str, patterns: list[str]) -> bool: """Return ``True`` if *path* matches any ``.museignore`` pattern. Uses last-match-wins semantics so that negation patterns (lines starting with ``!``) can un-ignore previously matched paths. Uses ``fnmatch.fnmatch`` against both the full relative path and the filename component (``path.rsplit("/", 1)[-1]``) to mirror the behaviour of ``.gitignore`` pattern matching. """ result = False basename = path.rsplit("/", 1)[-1] for pat in patterns: negate = pat.startswith("!") effective = pat[1:] if negate else pat if fnmatch.fnmatch(path, effective) or fnmatch.fnmatch(basename, effective): result = not negate return result def _safe_to_delete(root: pathlib.Path, target: pathlib.Path) -> bool: """Return ``True`` if *target* is safe to delete. Guards: - Target must resolve inside *root* (prevents path-traversal). - Target must not be a directory (directories are handled separately). - The ``.muse/`` subtree is unconditionally protected. """ try: target.resolve().relative_to(root.resolve()) except ValueError: logger.warning( "⚠️ Skipping %s — resolves outside repository root", target ) return False muse_dir = _muse_dir(root) try: target.relative_to(muse_dir) logger.warning("⚠️ Skipping %s — inside .muse/", target) return False except ValueError: pass return True def _safe_to_rmdir(root: pathlib.Path, d: pathlib.Path) -> bool: """Return ``True`` if *d* is safe to remove as an empty directory. Protects the repository root, ``.muse/``, and any path inside ``.muse/``. """ if d == root: return False muse_dir = _muse_dir(root) if d == muse_dir: return False try: d.relative_to(muse_dir) return False # inside .muse/ except ValueError: pass try: d.resolve().relative_to(root.resolve()) except ValueError: return False # outside root return True # --------------------------------------------------------------------------- # Command registration # --------------------------------------------------------------------------- def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``muse clean`` subcommand.""" parser = subparsers.add_parser( "clean", help="Remove untracked files from the working tree.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "-n", "--dry-run", action="store_true", dest="dry_run", help="Preview — show what would be removed without deleting.", ) parser.add_argument( "-f", "--force", action="store_true", help="Delete untracked files (required unless --dry-run is passed).", ) parser.add_argument( "-x", "--include-ignored", action="store_true", dest="include_ignored", help="Also delete .museignore-excluded files.", ) parser.add_argument( "-d", "--directories", action="store_true", help="Also remove empty untracked directories after file deletion.", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit machine-readable JSON on stdout.", ) parser.set_defaults(func=run) # --------------------------------------------------------------------------- # Main handler # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> None: """Remove untracked files from the working tree. Files not tracked in the HEAD snapshot are considered untracked. ``--force`` is required to actually delete; without it the command exits with an error unless ``--dry-run`` is given. The ``.muse/`` subtree is unconditionally protected regardless of working-tree content. Agent quickstart ---------------- :: muse clean --dry-run --json muse clean --force --json muse clean --force -d -x --json JSON fields ----------- status Outcome: ``"clean"`` (nothing to remove), ``"would_remove"`` (dry-run with untracked files found), or ``"removed"``. removed List of file paths removed (or that would be removed). dirs_removed List of empty directory paths removed (with ``-d``). count Total number of paths removed. dry_run ``true`` when ``--dry-run`` was passed. Exit codes ---------- 0 Success (or nothing to remove). 1 ``--force`` not given and ``--dry-run`` not given. 2 Not inside a Muse repository. """ elapsed = start_timer() dry_run: bool = args.dry_run force: bool = args.force include_ignored: bool = args.include_ignored directories: bool = args.directories json_out: bool = args.json_out if not force and not dry_run: print( "⚠️ fatal: clean.requireForce is set to true.\n" " Use --force to remove files, or --dry-run / -n to preview.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() branch = read_current_branch(root) domain = read_domain(root) # Build committed manifest (empty for a branch with no commits yet). committed: Manifest = {} head_commit_id = get_head_commit_id(root, branch) if head_commit_id: commit = read_commit(root, head_commit_id) if commit: snap = read_snapshot(root, commit.snapshot_id) if snap: committed = snap.manifest # Build current workdir manifest. current = walk_workdir(root) # Load ignore patterns; warn on failure but continue. ignored_patterns: list[str] = [] if not include_ignored: try: ignore_cfg = load_ignore_config(root) ignored_patterns = resolve_patterns(ignore_cfg, domain) except OSError as exc: logger.warning("⚠️ Could not load ignore config: %s", exc) # Collect untracked paths. untracked: list[str] = [] for rel_path in sorted(current): if rel_path in committed: continue if not include_ignored and _is_ignored(rel_path, ignored_patterns): continue untracked.append(rel_path) if not untracked: if json_out: print(json.dumps(_CleanResultJson( **make_envelope(elapsed), status="clean", removed=[], dirs_removed=[], count=0, dry_run=dry_run, ))) else: print("Nothing to clean.") return prefix = "[dry-run] " if dry_run else "" verb = "Would remove" if dry_run else "Removing" removed_files: list[str] = [] removed_dirs_list: list[str] = [] candidate_dirs: set[pathlib.Path] = set() for rel_path in untracked: target = root / rel_path if not json_out: print(f"{prefix}{verb}: {sanitize_display(rel_path)}") if not dry_run: if not _safe_to_delete(root, target): continue try: target.unlink(missing_ok=True) removed_files.append(rel_path) if directories: candidate_dirs.add(target.parent) except OSError as exc: print( f"❌ Could not remove {sanitize_display(rel_path)}: {exc}", file=sys.stderr, ) raise SystemExit(ExitCode.INTERNAL_ERROR) from exc else: removed_files.append(rel_path) # Remove empty directories (bottom-up), protected by _safe_to_rmdir. if not dry_run and directories: for d in sorted(candidate_dirs, key=lambda p: len(p.parts), reverse=True): if not _safe_to_rmdir(root, d): continue try: if d.is_dir() and not any(d.iterdir()): d.rmdir() rel_dir = str(d.relative_to(root)) removed_dirs_list.append(rel_dir) if not json_out: print(f"Removing directory: {sanitize_display(rel_dir)}") except OSError: pass count = len(removed_files) if json_out: print(json.dumps(_CleanResultJson( **make_envelope(elapsed), status="would_remove" if dry_run else "removed", removed=removed_files, dirs_removed=removed_dirs_list, count=count, dry_run=dry_run, ))) else: if dry_run: print(f"\n{count} untracked file(s) would be removed.") else: print(f"\n✅ Removed {count} untracked file(s).")