"""Hardening test suite for ``muse clean``. Coverage: - Unit: _is_ignored, _safe_to_delete, _safe_to_rmdir helpers - Security: path-traversal guard, .muse/ protection, symlink skipping - Error routing: all user errors go to stderr - JSON schema: _CleanResultJson shape for all outcomes - --dry-run: no side effects with and without --json - --include-ignored: respects .museignore patterns - --directories: empty-dir removal, .muse/ immune - Integration: clean lifecycle (commit → add untracked → clean) - E2E: help output, combined flags - Stress: 1 000 untracked files, concurrent reads, 50-pattern ignore list """ from __future__ import annotations import datetime import json import os import pathlib import threading from unittest.mock import patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult from typing import TypedDict from muse.cli.commands.clean import _is_ignored, _safe_to_delete, _safe_to_rmdir from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id runner = CliRunner() # --------------------------------------------------------------------------- # Typed output shape (mirrors _CleanResultJson in clean.py) # --------------------------------------------------------------------------- class _CleanOut(TypedDict, total=False): status: str removed: list[str] dirs_removed: list[str] count: int dry_run: bool duration_ms: float exit_code: int # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path, *, domain: str = "midi") -> pathlib.Path: muse = muse_dir(path) for sub in ("commits", "snapshots", "objects", "refs/heads"): (muse / sub).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": "clean-hard-test", "domain": domain}), encoding="utf-8", ) return path def _commit_file(root: pathlib.Path, rel_path: str, content: bytes) -> str: """Write *content* to *rel_path*, store the object and commit it.""" obj_id = blob_id(content) write_object(root, obj_id, content) (root / rel_path).write_bytes(content) manifest = {rel_path: obj_id} snap_id = hash_snapshot(manifest) snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest) write_snapshot(root, snap) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = hash_commit( parent_ids=[], snapshot_id=snap_id, message="initial", committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message="initial", committed_at=committed_at, ), ) (heads_dir(root) / "main").write_text( commit_id, encoding="utf-8" ) return commit_id def _env(repo: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(repo)} def _invoke(args: list[str], env: Manifest) -> InvokeResult: return runner.invoke(None, args, env=env) def _parse_json(result: InvokeResult) -> _CleanOut: for line in result.output.splitlines(): line = line.strip() if line.startswith("{"): raw = json.loads(line) out = _CleanOut( status=raw["status"], removed=raw["removed"], dirs_removed=raw["dirs_removed"], count=raw["count"], dry_run=raw["dry_run"], ) if "duration_ms" in raw: out["duration_ms"] = raw["duration_ms"] if "exit_code" in raw: out["exit_code"] = raw["exit_code"] return out raise AssertionError(f"No JSON line found in output:\n{result.output}") # --------------------------------------------------------------------------- # Unit: _is_ignored # --------------------------------------------------------------------------- def test_is_ignored_exact_match() -> None: assert _is_ignored("build/out.o", ["build/*"]) is True def test_is_ignored_basename_match() -> None: assert _is_ignored("deep/nested/file.pyc", ["*.pyc"]) is True def test_is_ignored_no_match() -> None: assert _is_ignored("src/main.py", ["*.pyc", "build/*"]) is False def test_is_ignored_negation_unignores() -> None: # First pattern ignores all .log, second un-ignores keep.log. assert _is_ignored("keep.log", ["*.log", "!keep.log"]) is False def test_is_ignored_negation_last_match_wins() -> None: # !keep.log then *.log — last match re-ignores. assert _is_ignored("keep.log", ["!keep.log", "*.log"]) is True def test_is_ignored_empty_patterns() -> None: assert _is_ignored("anything.txt", []) is False def test_is_ignored_deep_path() -> None: assert _is_ignored("a/b/c/d.tmp", ["*.tmp"]) is True # --------------------------------------------------------------------------- # Unit: _safe_to_delete # --------------------------------------------------------------------------- def test_safe_to_delete_normal_file(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) target = tmp_path / "file.txt" target.write_text("x", encoding="utf-8") assert _safe_to_delete(tmp_path, target) is True def test_safe_to_delete_blocks_muse_dir(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) target = head_path(tmp_path) assert _safe_to_delete(tmp_path, target) is False def test_safe_to_delete_blocks_deep_muse(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) target = heads_dir(tmp_path) / "main" assert _safe_to_delete(tmp_path, target) is False # --------------------------------------------------------------------------- # Unit: _safe_to_rmdir # --------------------------------------------------------------------------- def test_safe_to_rmdir_normal_dir(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) d = tmp_path / "empty_dir" d.mkdir() assert _safe_to_rmdir(tmp_path, d) is True def test_safe_to_rmdir_blocks_root(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) assert _safe_to_rmdir(tmp_path, tmp_path) is False def test_safe_to_rmdir_blocks_muse(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) assert _safe_to_rmdir(tmp_path, muse_dir(tmp_path)) is False def test_safe_to_rmdir_blocks_muse_subtree(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) assert _safe_to_rmdir(tmp_path, muse_dir(tmp_path) / "refs") is False # --------------------------------------------------------------------------- # Security: path traversal guard # --------------------------------------------------------------------------- def test_path_traversal_skipped(tmp_path: pathlib.Path) -> None: """walk_workdir returning a path that resolves outside root is skipped.""" _init_repo(tmp_path) outside = tmp_path.parent / "outside_target.txt" outside.write_text("secret", encoding="utf-8") fake_workdir: Manifest = {"../outside_target.txt": "deadbeef"} with patch("muse.cli.commands.clean.walk_workdir", return_value=fake_workdir): result = _invoke(["clean", "-f"], _env(tmp_path)) # Exit 0 — skipped file is not treated as an error. assert result.exit_code == 0 # The outside file must still exist. assert outside.exists() def test_muse_dir_protected_even_if_listed(tmp_path: pathlib.Path) -> None: """Even if walk_workdir incorrectly lists .muse/HEAD, it must not be deleted.""" _init_repo(tmp_path) fake_workdir: Manifest = {".muse/HEAD": "deadbeef"} with patch("muse.cli.commands.clean.walk_workdir", return_value=fake_workdir): result = _invoke(["clean", "-f"], _env(tmp_path)) assert (head_path(tmp_path)).exists() # --------------------------------------------------------------------------- # Error routing: all user errors go to stderr # --------------------------------------------------------------------------- def test_no_flags_error_on_stderr(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) (tmp_path / "junk.txt").write_text("junk", encoding="utf-8") result = _invoke(["clean"], _env(tmp_path)) assert result.exit_code != 0 assert "force" in result.stderr.lower() or "force" in result.output.lower() def test_ignore_load_failure_logs_warning_not_crash(tmp_path: pathlib.Path) -> None: """OSError from load_ignore_config must not abort the command.""" _init_repo(tmp_path) (tmp_path / "junk.txt").write_text("junk", encoding="utf-8") with patch( "muse.cli.commands.clean.load_ignore_config", side_effect=OSError("disk full"), ): result = _invoke(["clean", "-n"], _env(tmp_path)) assert result.exit_code == 0 assert "junk.txt" in result.output # --------------------------------------------------------------------------- # JSON schema: _CleanResultJson # --------------------------------------------------------------------------- def test_json_nothing_to_clean(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert data["status"] == "clean" assert data["removed"] == [] assert data["dirs_removed"] == [] assert data["count"] == 0 assert data["dry_run"] is False def test_json_dry_run_shows_files(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) (tmp_path / "ghost.txt").write_text("ghost", encoding="utf-8") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert data["status"] == "would_remove" assert "ghost.txt" in data["removed"] assert data["count"] == 1 assert data["dry_run"] is True assert (tmp_path / "ghost.txt").exists() # not deleted def test_json_removed_files(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "kept.txt", b"kept") (tmp_path / "remove_me.txt").write_text("bye", encoding="utf-8") result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert data["status"] == "removed" assert "remove_me.txt" in data["removed"] assert data["count"] == 1 assert data["dry_run"] is False def test_json_dirs_removed(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "kept.txt", b"kept") d = tmp_path / "empty_subdir" d.mkdir() (d / "junk.txt").write_text("junk", encoding="utf-8") result = _invoke(["clean", "-f", "-d", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert "empty_subdir/junk.txt" in data["removed"] assert "empty_subdir" in data["dirs_removed"] def test_json_schema_fields_present(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) for key in ("status", "removed", "dirs_removed", "count", "dry_run"): assert key in data, f"Missing key: {key}" # --------------------------------------------------------------------------- # --dry-run: no side effects # --------------------------------------------------------------------------- def test_dry_run_no_deletion(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) (tmp_path / "ephemeral.txt").write_text("keep me", encoding="utf-8") result = _invoke(["clean", "-n"], _env(tmp_path)) assert result.exit_code == 0 assert (tmp_path / "ephemeral.txt").exists() def test_dry_run_shows_count(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) for i in range(5): (tmp_path / f"file_{i}.txt").write_text(str(i), encoding="utf-8") result = _invoke(["clean", "-n"], _env(tmp_path)) assert result.exit_code == 0 assert "5" in result.output def test_dry_run_json_reports_all(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) for i in range(3): (tmp_path / f"tmp_{i}.txt").write_text(str(i), encoding="utf-8") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert data["count"] == 3 assert len(data["removed"]) == 3 assert data["dry_run"] is True # --------------------------------------------------------------------------- # --include-ignored: respects and overrides .museignore # --------------------------------------------------------------------------- def test_include_ignored_deletes_ignored_files(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "debug.log").write_text("log", encoding="utf-8") fake_patterns = ["*.log"] with patch("muse.cli.commands.clean.resolve_patterns", return_value=fake_patterns): # Without -x, the .log file is excluded from cleaning. result_no_x = _invoke(["clean", "-n"], _env(tmp_path)) assert "debug.log" not in result_no_x.output # With -x, the file is included. result_x = _invoke(["clean", "-n", "-x"], _env(tmp_path)) assert "debug.log" in result_x.output # --------------------------------------------------------------------------- # --directories: empty-dir removal # --------------------------------------------------------------------------- def test_directories_removes_empty_dir_after_file_deletion( tmp_path: pathlib.Path, ) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "kept.txt", b"kept") subdir = tmp_path / "subdir" subdir.mkdir() (subdir / "junk.txt").write_text("junk", encoding="utf-8") result = _invoke(["clean", "-f", "-d"], _env(tmp_path)) assert result.exit_code == 0 assert not subdir.exists() def test_directories_leaves_non_empty_dir(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) subdir = tmp_path / "mixed" subdir.mkdir() (subdir / "untracked.txt").write_text("bye", encoding="utf-8") (subdir / "kept.txt").write_bytes(b"keep me") _commit_file(tmp_path, "mixed/kept.txt", b"keep me") result = _invoke(["clean", "-f", "-d"], _env(tmp_path)) assert result.exit_code == 0 # Directory still exists (kept.txt is inside it and tracked). assert subdir.is_dir() def test_directories_dry_run_does_not_remove_dir(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) subdir = tmp_path / "dry_subdir" subdir.mkdir() (subdir / "junk.txt").write_text("junk", encoding="utf-8") result = _invoke(["clean", "-n", "-d"], _env(tmp_path)) assert result.exit_code == 0 assert subdir.is_dir() # --------------------------------------------------------------------------- # Integration: full lifecycle # --------------------------------------------------------------------------- def test_integration_commit_then_clean(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("bye", encoding="utf-8") result = _invoke(["clean", "-f"], _env(tmp_path)) assert result.exit_code == 0 assert not (tmp_path / "untracked.txt").exists() assert (tmp_path / "tracked.txt").exists() def test_integration_already_clean(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "everything.txt", b"all tracked") result = _invoke(["clean", "-f"], _env(tmp_path)) assert result.exit_code == 0 assert "nothing" in result.output.lower() def test_integration_no_commits_cleans_all(tmp_path: pathlib.Path) -> None: """With no HEAD commit every file is untracked.""" _init_repo(tmp_path) (tmp_path / "orphan.txt").write_text("orphan", encoding="utf-8") result = _invoke(["clean", "-f"], _env(tmp_path)) assert result.exit_code == 0 assert not (tmp_path / "orphan.txt").exists() def test_integration_json_full_cycle(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "a.txt", b"a") (tmp_path / "b.txt").write_text("b", encoding="utf-8") (tmp_path / "c.txt").write_text("c", encoding="utf-8") result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert data["count"] == 2 assert set(data["removed"]) == {"b.txt", "c.txt"} assert not (tmp_path / "b.txt").exists() assert not (tmp_path / "c.txt").exists() assert (tmp_path / "a.txt").exists() # --------------------------------------------------------------------------- # E2E: help output # --------------------------------------------------------------------------- def test_help_output() -> None: result = _invoke(["clean", "--help"], {}) assert result.exit_code == 0 for flag in ("-f", "--force", "-n", "--dry-run", "--json"): assert flag in result.output def test_help_describes_json_flag() -> None: result = _invoke(["clean", "--help"], {}) assert "json" in result.output.lower() # --------------------------------------------------------------------------- # Stress: 1 000 untracked files # --------------------------------------------------------------------------- def test_stress_1000_untracked(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) for i in range(1_000): (tmp_path / f"stress_{i:04d}.dat").write_bytes(b"x" * 64) result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert data["count"] == 1_000 remaining = list(tmp_path.glob("stress_*.dat")) assert len(remaining) == 0 def test_stress_1000_dry_run(tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) for i in range(1_000): (tmp_path / f"dry_{i:04d}.dat").write_bytes(b"y" * 64) result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert data["count"] == 1_000 assert data["dry_run"] is True # Nothing deleted. remaining = list(tmp_path.glob("dry_*.dat")) assert len(remaining) == 1_000 def test_stress_50_ignore_patterns(tmp_path: pathlib.Path) -> None: """_is_ignored with 50 patterns must not crash and must filter correctly.""" patterns = [f"*.ext{i}" for i in range(50)] assert _is_ignored("file.ext25", patterns) is True assert _is_ignored("file.py", patterns) is False def test_stress_concurrent_json_reads(tmp_path: pathlib.Path) -> None: """Concurrent dry-run invocations must all exit 0 without data races. CliRunner serialises stdout capture per invocation, so we guard each call with a lock and check only the exit code and JSON parse-ability rather than racing on the shared capture buffer. """ _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") for i in range(20): (tmp_path / f"concurrent_{i}.txt").write_text(str(i), encoding="utf-8") invoke_lock = threading.Lock() errors: list[str] = [] def _worker() -> None: with invoke_lock: r = _invoke(["clean", "-n", "--json"], _env(tmp_path)) try: assert r.exit_code == 0 data = _parse_json(r) assert data["count"] == 20 except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=_worker) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent read failures: {errors}" # --------------------------------------------------------------------------- # Edge cases # --------------------------------------------------------------------------- def test_force_and_dry_run_together_dry_wins(tmp_path: pathlib.Path) -> None: """When both -f and -n are given, -n wins (no deletion).""" _init_repo(tmp_path) (tmp_path / "both_flags.txt").write_text("keep", encoding="utf-8") result = _invoke(["clean", "-f", "-n"], _env(tmp_path)) assert result.exit_code == 0 assert (tmp_path / "both_flags.txt").exists() def test_ansi_in_filename_sanitized(tmp_path: pathlib.Path) -> None: """ANSI escape codes embedded in filenames must not leak to output.""" _init_repo(tmp_path) # Use a filename that contains ANSI escape chars encoded in the name. malicious_name = "malicious\x1b[31mred\x1b[0m.txt" try: (tmp_path / malicious_name).write_text("malicious", encoding="utf-8") except (OSError, ValueError): pytest.skip("filesystem does not support ANSI chars in filenames") result = _invoke(["clean", "-n"], _env(tmp_path)) assert "\x1b[31m" not in result.output def test_clean_respects_muse_dir_immune(tmp_path: pathlib.Path) -> None: """Under no circumstances should clean delete anything inside .muse/.""" _init_repo(tmp_path) head_before = (head_path(tmp_path)).read_text() with patch( "muse.cli.commands.clean.walk_workdir", return_value={ ".muse/HEAD": "abc", ".muse/repo.json": "def", }, ): result = _invoke(["clean", "-f"], _env(tmp_path)) assert result.exit_code == 0 assert (head_path(tmp_path)).read_text() == head_before # --------------------------------------------------------------------------- # Agent supercharge — duration_ms and exit_code in every JSON output # --------------------------------------------------------------------------- class TestElapsed: """Every JSON output path must include ``duration_ms`` as a float.""" def test_nothing_to_clean_has_elapsed(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_dry_run_with_files_has_elapsed(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_force_clean_has_elapsed(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) assert result.exit_code == 0 data = _parse_json(result) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) class TestExitCode: """Every JSON output path must include ``exit_code`` mirroring process exit.""" def test_nothing_to_clean_exit_code_0(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) data = _parse_json(result) assert data["exit_code"] == 0 def test_dry_run_with_files_exit_code_0(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) data = _parse_json(result) assert data["exit_code"] == 0 def test_force_clean_exit_code_0(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) data = _parse_json(result) assert data["exit_code"] == 0 class TestDryRunStatus: """Dry-run with files to remove must report status ``would_remove``, not ``clean``.""" def test_dry_run_with_files_status_is_would_remove(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) data = _parse_json(result) assert data["status"] == "would_remove", ( f"dry-run with files should be 'would_remove', got {data['status']!r}" ) def test_dry_run_no_files_status_is_clean(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) data = _parse_json(result) assert data["status"] == "clean" def test_force_with_files_status_is_removed(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) data = _parse_json(result) assert data["status"] == "removed" class TestJsonSchemaComplete: """Full schema must include all fields including duration_ms and exit_code.""" _FULL_KEYS = {"status", "removed", "dirs_removed", "count", "dry_run", "duration_ms", "exit_code"} def test_nothing_to_clean_schema_complete(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) data = _parse_json(result) missing = self._FULL_KEYS - data.keys() assert not missing, f"Missing keys in clean JSON: {missing}" def test_dry_run_with_files_schema_complete(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) data = _parse_json(result) missing = self._FULL_KEYS - data.keys() assert not missing, f"Missing keys in dry-run JSON: {missing}" def test_force_clean_schema_complete(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit_file(tmp_path, "tracked.txt", b"tracked") (tmp_path / "untracked.txt").write_text("x") result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) data = _parse_json(result) missing = self._FULL_KEYS - data.keys() assert not missing, f"Missing keys in force JSON: {missing}" # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.clean import register as _register_clean from muse.core.paths import head_path, heads_dir, muse_dir def _parse_clean(*args: str) -> _argparse.Namespace: root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_clean(subs) return root_p.parse_args(["clean", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_clean() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_clean("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_clean("-j") assert ns.json_out is True def test_force_flag(self) -> None: ns = _parse_clean("--force") assert ns.force is True def test_dry_run_n_shorthand(self) -> None: ns = _parse_clean("-n") assert ns.dry_run is True