"""Tests for ``muse rm``. Covers: - --cached: stage deletion without touching disk - no --cached: stage deletion AND delete from disk - -r / --recursive: required for directories - -f / --force: bypass safety checks - -n / --dry-run: preview without side effects - --json: machine-readable output (always valid, including error paths) - File not tracked → exit 1 - Directory without -r → exit 1 - Modified file without --force → exit 1 - Staged-addition without --force → exit 1 - Multiple paths in one invocation - Idempotency: rm already-staged-for-deletion file - JSON includes duration_ms (float ≥ 0) and exit_code (int) - JSON on error: --json emits structured output even on user errors - Path traversal rejected: paths outside the repo root are rejected - File already gone from disk: tracked file missing on disk is handled gracefully - Staged-mode-D idempotent with --json - --dry-run still runs safety checks - --dry-run --force bypasses safety checks and emits dry_run status - Symlink in working tree: symlink to a committed path is handled correctly - Unicode filenames - Stress: 200 files, remove half - Stress with timing: duration_ms is present and non-negative - Data integrity: stage is valid after removal """ from __future__ import annotations import datetime import json import pathlib import pytest from tests.cli_test_helper import CliRunner from muse.core.types import blob_id from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest from muse.plugins.code.stage import StagedFileMap, make_entry, read_stage, write_stage from muse.core.paths import heads_dir, muse_dir type _EnvDict = dict[str, str] type _FileBytes = dict[str, bytes] cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() _REPO_ID = "rm-test" # --------------------------------------------------------------------------- # Test-repo bootstrap helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> _EnvDict: return {"MUSE_REPO_ROOT": str(repo)} _counter = 0 def _commit_files(root: pathlib.Path, files: _FileBytes) -> str: """Write *files* to disk and to the object store; create a commit.""" global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): oid = blob_id(content) write_object(root, oid, content) manifest[rel_path] = oid abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = hash_commit( parent_ids=[], snapshot_id=snap_id, message=f"commit {_counter}", committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=f"commit {_counter}", committed_at=committed_at, ), ) (heads_dir(root) / "main").write_text(commit_id, encoding="utf-8") return commit_id # --------------------------------------------------------------------------- # help # --------------------------------------------------------------------------- def test_rm_help() -> None: result = runner.invoke(cli, ["rm", "--help"]) assert result.exit_code == 0 assert "--cached" in result.output # --------------------------------------------------------------------------- # --cached: stage deletion, keep file on disk # --------------------------------------------------------------------------- def test_rm_cached_stages_deletion(tmp_path: pathlib.Path) -> None: """``muse rm --cached`` writes mode D to stage, leaves file on disk.""" _init_repo(tmp_path) _commit_files(tmp_path, {"song.txt": b"verse\n"}) result = runner.invoke( cli, ["rm", "--cached", "song.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 assert (tmp_path / "song.txt").exists() stage = read_stage(tmp_path) assert "song.txt" in stage assert stage["song.txt"]["mode"] == "D" def test_rm_cached_json_output(tmp_path: pathlib.Path) -> None: """``muse rm --cached --json`` emits valid JSON with expected fields.""" _init_repo(tmp_path) _commit_files(tmp_path, {"notes.txt": b"A\n"}) result = runner.invoke( cli, ["rm", "--cached", "--json", "notes.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["status"] == "removed" assert "notes.txt" in data["removed"] assert data["cached"] is True assert data["dry_run"] is False assert data["count"] == 1 # --------------------------------------------------------------------------- # Without --cached: stage deletion AND delete from disk # --------------------------------------------------------------------------- def test_rm_deletes_file_from_disk(tmp_path: pathlib.Path) -> None: """``muse rm`` without --cached removes the file from disk.""" _init_repo(tmp_path) _commit_files(tmp_path, {"beat.mid": b"\x00\x01\x02"}) result = runner.invoke(cli, ["rm", "beat.mid"], env=_env(tmp_path)) assert result.exit_code == 0 assert not (tmp_path / "beat.mid").exists() stage = read_stage(tmp_path) assert stage["beat.mid"]["mode"] == "D" def test_rm_json_no_cached(tmp_path: pathlib.Path) -> None: """``muse rm --json`` emits cached=false.""" _init_repo(tmp_path) _commit_files(tmp_path, {"f.txt": b"x\n"}) result = runner.invoke(cli, ["rm", "--json", "f.txt"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert data["cached"] is False assert data["count"] == 1 # --------------------------------------------------------------------------- # File not tracked → exit 1 # --------------------------------------------------------------------------- def test_rm_untracked_file_exits_1(tmp_path: pathlib.Path) -> None: """Removing an untracked file must exit non-zero.""" _init_repo(tmp_path) _commit_files(tmp_path, {"existing.txt": b"x\n"}) result = runner.invoke( cli, ["rm", "does_not_exist.txt"], env=_env(tmp_path) ) assert result.exit_code != 0 def test_rm_untracked_does_not_affect_stage(tmp_path: pathlib.Path) -> None: """Attempting to remove an untracked file must not mutate the stage.""" _init_repo(tmp_path) _commit_files(tmp_path, {"a.txt": b"a\n"}) runner.invoke(cli, ["rm", "ghost.txt"], env=_env(tmp_path)) stage = read_stage(tmp_path) assert "a.txt" not in stage def test_rm_error_json_untracked(tmp_path: pathlib.Path) -> None: """``muse rm --json`` on an untracked file emits JSON with exit_code=1.""" _init_repo(tmp_path) _commit_files(tmp_path, {"existing.txt": b"x\n"}) result = runner.invoke( cli, ["rm", "--json", "ghost.txt"], env=_env(tmp_path) ) assert result.exit_code != 0 # First line is JSON; remaining lines are stderr error messages. data = json.loads(result.output.splitlines()[0]) assert data["exit_code"] == 1 assert data["status"] == "error" assert data["count"] == 0 assert isinstance(data["duration_ms"], float) # --------------------------------------------------------------------------- # Directory without -r → exit 1 # --------------------------------------------------------------------------- def test_rm_directory_without_recursive_exits_1(tmp_path: pathlib.Path) -> None: """Removing a directory path without -r must exit non-zero.""" _init_repo(tmp_path) _commit_files(tmp_path, {"src/main.py": b"pass\n"}) result = runner.invoke(cli, ["rm", "--cached", "src"], env=_env(tmp_path)) assert result.exit_code != 0 def test_rm_error_json_directory_without_r(tmp_path: pathlib.Path) -> None: """``muse rm --json `` without -r emits JSON with exit_code=1.""" _init_repo(tmp_path) _commit_files(tmp_path, {"src/main.py": b"pass\n"}) result = runner.invoke(cli, ["rm", "--cached", "--json", "src"], env=_env(tmp_path)) assert result.exit_code != 0 data = json.loads(result.output.splitlines()[0]) assert data["exit_code"] == 1 assert data["status"] == "error" def test_rm_directory_with_recursive_stages_all(tmp_path: pathlib.Path) -> None: """``muse rm -r --cached `` stages deletion for every file under dir.""" _init_repo(tmp_path) _commit_files( tmp_path, { "src/a.py": b"a\n", "src/b.py": b"b\n", "other.txt": b"c\n", }, ) result = runner.invoke( cli, ["rm", "-r", "--cached", "src"], env=_env(tmp_path) ) assert result.exit_code == 0 stage = read_stage(tmp_path) assert stage["src/a.py"]["mode"] == "D" assert stage["src/b.py"]["mode"] == "D" assert "other.txt" not in stage # --------------------------------------------------------------------------- # Modified file without --force → exit 1 # --------------------------------------------------------------------------- def test_rm_modified_file_without_force_exits_1(tmp_path: pathlib.Path) -> None: """Removing a locally-modified file without --force must exit non-zero.""" _init_repo(tmp_path) _commit_files(tmp_path, {"track.txt": b"original\n"}) (tmp_path / "track.txt").write_bytes(b"modified\n") result = runner.invoke(cli, ["rm", "track.txt"], env=_env(tmp_path)) assert result.exit_code != 0 assert (tmp_path / "track.txt").exists() def test_rm_error_json_modified_without_force(tmp_path: pathlib.Path) -> None: """``muse rm --json`` on a modified file emits JSON with exit_code=1.""" _init_repo(tmp_path) _commit_files(tmp_path, {"track.txt": b"original\n"}) (tmp_path / "track.txt").write_bytes(b"modified\n") result = runner.invoke(cli, ["rm", "--json", "track.txt"], env=_env(tmp_path)) assert result.exit_code != 0 data = json.loads(result.output.splitlines()[0]) assert data["exit_code"] == 1 assert data["status"] == "error" assert isinstance(data["duration_ms"], float) def test_rm_modified_file_with_force_succeeds(tmp_path: pathlib.Path) -> None: """``muse rm --force`` removes a locally-modified file.""" _init_repo(tmp_path) _commit_files(tmp_path, {"track.txt": b"original\n"}) (tmp_path / "track.txt").write_bytes(b"modified\n") result = runner.invoke(cli, ["rm", "--force", "track.txt"], env=_env(tmp_path)) assert result.exit_code == 0 assert not (tmp_path / "track.txt").exists() assert read_stage(tmp_path)["track.txt"]["mode"] == "D" def test_rm_cached_modified_no_force_ok(tmp_path: pathlib.Path) -> None: """``muse rm --cached`` on a modified file is always safe (no disk delete).""" _init_repo(tmp_path) _commit_files(tmp_path, {"track.txt": b"original\n"}) (tmp_path / "track.txt").write_bytes(b"modified\n") result = runner.invoke( cli, ["rm", "--cached", "track.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 assert (tmp_path / "track.txt").read_bytes() == b"modified\n" assert read_stage(tmp_path)["track.txt"]["mode"] == "D" # --------------------------------------------------------------------------- # Staged-addition without --force → exit 1 # --------------------------------------------------------------------------- def test_rm_staged_addition_without_force_exits_1(tmp_path: pathlib.Path) -> None: """Removing a staged-but-never-committed file without --force exits non-zero.""" _init_repo(tmp_path) (tmp_path / "new.py").write_bytes(b"print('hi')\n") stage: StagedFileMap = { "new.py": make_entry(object_id=blob_id(b"print('hi')\n"), mode="A"), } write_stage(tmp_path, stage) result = runner.invoke(cli, ["rm", "--cached", "new.py"], env=_env(tmp_path)) assert result.exit_code != 0 def test_rm_staged_addition_with_force_removes_from_stage( tmp_path: pathlib.Path, ) -> None: """``muse rm --force --cached`` removes a staged-addition entry from stage.""" _init_repo(tmp_path) (tmp_path / "new.py").write_bytes(b"print('hi')\n") stage: StagedFileMap = { "new.py": make_entry(object_id=blob_id(b"print('hi')\n"), mode="A"), } write_stage(tmp_path, stage) result = runner.invoke( cli, ["rm", "--force", "--cached", "new.py"], env=_env(tmp_path) ) assert result.exit_code == 0 assert "new.py" not in read_stage(tmp_path) # --------------------------------------------------------------------------- # --dry-run: no side effects # --------------------------------------------------------------------------- def test_rm_dry_run_does_not_delete(tmp_path: pathlib.Path) -> None: """``muse rm -n`` must not delete the file or write to the stage.""" _init_repo(tmp_path) _commit_files(tmp_path, {"chord.txt": b"Am\n"}) result = runner.invoke(cli, ["rm", "-n", "chord.txt"], env=_env(tmp_path)) assert result.exit_code == 0 assert (tmp_path / "chord.txt").exists() assert "chord.txt" not in read_stage(tmp_path) def test_rm_dry_run_json(tmp_path: pathlib.Path) -> None: """``muse rm -n --json`` emits valid JSON with status=dry_run.""" _init_repo(tmp_path) _commit_files(tmp_path, {"chord.txt": b"Am\n"}) result = runner.invoke( cli, ["rm", "-n", "--json", "chord.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["status"] == "dry_run" assert "chord.txt" in data["removed"] assert data["dry_run"] is True assert data["count"] == 1 def test_rm_dry_run_cached(tmp_path: pathlib.Path) -> None: """``muse rm -n --cached`` previews correctly, writes nothing.""" _init_repo(tmp_path) _commit_files(tmp_path, {"f.txt": b"x\n"}) result = runner.invoke( cli, ["rm", "-n", "--cached", "--json", "f.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["status"] == "dry_run" assert data["cached"] is True def test_rm_dry_run_still_checks_safety(tmp_path: pathlib.Path) -> None: """``--dry-run`` without ``--force`` still rejects modified files.""" _init_repo(tmp_path) _commit_files(tmp_path, {"track.txt": b"original\n"}) (tmp_path / "track.txt").write_bytes(b"modified\n") result = runner.invoke(cli, ["rm", "-n", "track.txt"], env=_env(tmp_path)) assert result.exit_code != 0 # File must still be on disk and stage untouched. assert (tmp_path / "track.txt").exists() assert "track.txt" not in read_stage(tmp_path) def test_rm_dry_run_force_bypasses_safety(tmp_path: pathlib.Path) -> None: """``--dry-run --force`` previews removal of a modified file without touching anything.""" _init_repo(tmp_path) _commit_files(tmp_path, {"track.txt": b"original\n"}) (tmp_path / "track.txt").write_bytes(b"modified\n") result = runner.invoke( cli, ["rm", "-n", "-f", "--json", "track.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["status"] == "dry_run" assert "track.txt" in data["removed"] # Nothing written. assert (tmp_path / "track.txt").exists() assert "track.txt" not in read_stage(tmp_path) # --------------------------------------------------------------------------- # Multiple paths in one invocation # --------------------------------------------------------------------------- def test_rm_multiple_files(tmp_path: pathlib.Path) -> None: """Multiple paths in one ``muse rm`` invocation are all removed.""" _init_repo(tmp_path) _commit_files( tmp_path, {"a.txt": b"a\n", "b.txt": b"b\n", "c.txt": b"c\n"}, ) result = runner.invoke( cli, ["rm", "--cached", "--json", "a.txt", "b.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["count"] == 2 assert "a.txt" in data["removed"] assert "b.txt" in data["removed"] assert "c.txt" not in data["removed"] stage = read_stage(tmp_path) assert stage["a.txt"]["mode"] == "D" assert stage["b.txt"]["mode"] == "D" assert "c.txt" not in stage # --------------------------------------------------------------------------- # Idempotency: remove already-staged-for-deletion # --------------------------------------------------------------------------- def test_rm_already_staged_deletion_is_idempotent(tmp_path: pathlib.Path) -> None: """Calling ``muse rm --cached`` twice on the same file is idempotent.""" _init_repo(tmp_path) _commit_files(tmp_path, {"x.txt": b"x\n"}) runner.invoke(cli, ["rm", "--cached", "x.txt"], env=_env(tmp_path)) result = runner.invoke(cli, ["rm", "--cached", "x.txt"], env=_env(tmp_path)) assert result.exit_code == 0 assert read_stage(tmp_path)["x.txt"]["mode"] == "D" def test_rm_idempotent_json_still_valid(tmp_path: pathlib.Path) -> None: """Re-removing an already-staged-D file with --json still emits valid JSON.""" _init_repo(tmp_path) _commit_files(tmp_path, {"x.txt": b"x\n"}) runner.invoke(cli, ["rm", "--cached", "x.txt"], env=_env(tmp_path)) result = runner.invoke(cli, ["rm", "--cached", "--json", "x.txt"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert data["status"] == "removed" assert "x.txt" in data["removed"] assert data["exit_code"] == 0 # --------------------------------------------------------------------------- # duration_ms and exit_code in JSON # --------------------------------------------------------------------------- def test_rm_json_has_duration_ms(tmp_path: pathlib.Path) -> None: """``muse rm --json`` includes duration_ms as a non-negative float.""" _init_repo(tmp_path) _commit_files(tmp_path, {"t.txt": b"t\n"}) result = runner.invoke(cli, ["rm", "--cached", "--json", "t.txt"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data, "Missing duration_ms field" assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_rm_json_has_exit_code_zero_on_success(tmp_path: pathlib.Path) -> None: """``muse rm --json`` includes exit_code=0 on success.""" _init_repo(tmp_path) _commit_files(tmp_path, {"t.txt": b"t\n"}) result = runner.invoke(cli, ["rm", "--cached", "--json", "t.txt"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert "exit_code" in data, "Missing exit_code field" assert data["exit_code"] == 0 def test_rm_json_dry_run_has_duration_ms_and_exit_code(tmp_path: pathlib.Path) -> None: """``muse rm -n --json`` also includes duration_ms and exit_code.""" _init_repo(tmp_path) _commit_files(tmp_path, {"t.txt": b"t\n"}) result = runner.invoke(cli, ["rm", "-n", "--json", "t.txt"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert data["duration_ms"] >= 0.0 assert data["exit_code"] == 0 # --------------------------------------------------------------------------- # JSON schema completeness # --------------------------------------------------------------------------- def test_rm_json_schema_all_fields(tmp_path: pathlib.Path) -> None: """JSON output always contains all required fields.""" _init_repo(tmp_path) _commit_files(tmp_path, {"z.txt": b"z\n"}) result = runner.invoke( cli, ["rm", "--json", "z.txt"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) for key in ("status", "removed", "cached", "dry_run", "count", "duration_ms", "exit_code"): assert key in data, f"Missing key: {key}" # --------------------------------------------------------------------------- # Security: path traversal and outside-repo paths # --------------------------------------------------------------------------- def test_rm_path_traversal_rejected(tmp_path: pathlib.Path) -> None: """A path that escapes the repo root via ``..`` must be rejected.""" _init_repo(tmp_path) _commit_files(tmp_path, {"a.txt": b"a\n"}) result = runner.invoke(cli, ["rm", "../escape.txt"], env=_env(tmp_path)) assert result.exit_code != 0 def test_rm_absolute_path_outside_repo_rejected(tmp_path: pathlib.Path) -> None: """An absolute path outside the repo must be rejected with exit 1.""" _init_repo(tmp_path) _commit_files(tmp_path, {"a.txt": b"a\n"}) outside = tmp_path.parent / "outside.txt" outside.write_text("should not be removed", encoding="utf-8") result = runner.invoke(cli, ["rm", str(outside)], env=_env(tmp_path)) assert result.exit_code != 0 assert outside.exists(), "File outside repo must not be deleted" # --------------------------------------------------------------------------- # Edge case: file already gone from disk # --------------------------------------------------------------------------- def test_rm_file_already_deleted_from_disk(tmp_path: pathlib.Path) -> None: """``muse rm`` on a tracked file that's already missing from disk stages deletion.""" _init_repo(tmp_path) _commit_files(tmp_path, {"gone.txt": b"was here\n"}) # Manually remove from disk without going through muse rm. (tmp_path / "gone.txt").unlink() result = runner.invoke(cli, ["rm", "--cached", "gone.txt"], env=_env(tmp_path)) assert result.exit_code == 0 assert read_stage(tmp_path)["gone.txt"]["mode"] == "D" def test_rm_disk_missing_no_cached_succeeds(tmp_path: pathlib.Path) -> None: """``muse rm`` (no --cached) on a file already gone from disk just stages deletion.""" _init_repo(tmp_path) _commit_files(tmp_path, {"gone.txt": b"was here\n"}) (tmp_path / "gone.txt").unlink() result = runner.invoke(cli, ["rm", "gone.txt"], env=_env(tmp_path)) assert result.exit_code == 0 assert read_stage(tmp_path)["gone.txt"]["mode"] == "D" # --------------------------------------------------------------------------- # Recursive removal with disk delete # --------------------------------------------------------------------------- def test_rm_recursive_deletes_from_disk(tmp_path: pathlib.Path) -> None: """``muse rm -r `` removes all tracked files under dir from disk.""" _init_repo(tmp_path) _commit_files( tmp_path, { "static/app.css": b"body{}\n", "static/app.js": b"console.log(1)\n", "src/main.py": b"pass\n", }, ) result = runner.invoke( cli, ["rm", "-r", "--json", "static"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["count"] == 2 assert not (tmp_path / "static" / "app.css").exists() assert not (tmp_path / "static" / "app.js").exists() assert (tmp_path / "src" / "main.py").exists() stage = read_stage(tmp_path) assert stage["static/app.css"]["mode"] == "D" assert stage["static/app.js"]["mode"] == "D" assert "src/main.py" not in stage # --------------------------------------------------------------------------- # Unicode filenames # --------------------------------------------------------------------------- def test_rm_unicode_filename(tmp_path: pathlib.Path) -> None: """``muse rm`` handles filenames with non-ASCII characters.""" _init_repo(tmp_path) _commit_files(tmp_path, {"café.txt": "café content\n".encode("utf-8")}) result = runner.invoke(cli, ["rm", "--cached", "--json", "café.txt"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert data["count"] == 1 assert read_stage(tmp_path)["café.txt"]["mode"] == "D" # --------------------------------------------------------------------------- # Data integrity: stage is valid JSON after removal # --------------------------------------------------------------------------- def test_rm_stage_integrity_after_removal(tmp_path: pathlib.Path) -> None: """The stage remains readable (valid JSON) after ``muse rm``.""" _init_repo(tmp_path) _commit_files( tmp_path, {"keep.txt": b"keep\n", "remove.txt": b"remove\n"}, ) runner.invoke(cli, ["rm", "--cached", "remove.txt"], env=_env(tmp_path)) # Stage must be readable and contain exactly the expected entry. stage = read_stage(tmp_path) assert "remove.txt" in stage assert stage["remove.txt"]["mode"] == "D" assert "keep.txt" not in stage def test_rm_stage_cleared_when_last_staged_entry_removed(tmp_path: pathlib.Path) -> None: """After removing all staged entries, the stage file is cleaned up.""" _init_repo(tmp_path) _commit_files(tmp_path, {"only.txt": b"only\n"}) # Stage the deletion — this should leave only.txt as "D". # Then do a second commit to move it to HEAD as deleted... but since we # can't easily do that here, just verify that re-staging works cleanly. runner.invoke(cli, ["rm", "--cached", "only.txt"], env=_env(tmp_path)) stage = read_stage(tmp_path) # The file is in HEAD so it becomes mode D (not removed from stage entirely). assert stage["only.txt"]["mode"] == "D" # --------------------------------------------------------------------------- # Stress: 200 files, remove half # --------------------------------------------------------------------------- def test_rm_stress_200_files(tmp_path: pathlib.Path) -> None: """Remove 100 of 200 committed files; verify stage and disk state.""" _init_repo(tmp_path) files = { f"file_{i:04d}.txt": f"content {i}\n".encode() for i in range(200) } _commit_files(tmp_path, files) to_remove = [f"file_{i:04d}.txt" for i in range(200) if i % 2 == 0] result = runner.invoke( cli, ["rm", "--cached", "--json"] + to_remove, env=_env(tmp_path), ) assert result.exit_code == 0 data = json.loads(result.output) assert data["count"] == 100 stage = read_stage(tmp_path) for i in range(200): name = f"file_{i:04d}.txt" if i % 2 == 0: assert stage[name]["mode"] == "D" else: assert name not in stage for name in files: assert (tmp_path / name).exists() def test_rm_stress_timing(tmp_path: pathlib.Path) -> None: """Stress remove 50 files with --json; duration_ms is present and non-negative.""" _init_repo(tmp_path) files = {f"file_{i:03d}.txt": f"x{i}\n".encode() for i in range(50)} _commit_files(tmp_path, files) result = runner.invoke( cli, ["rm", "--cached", "--json"] + list(files), env=_env(tmp_path), ) assert result.exit_code == 0 data = json.loads(result.output) assert data["count"] == 50 assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.rm import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rm", "src/billing.py"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.rm import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rm", "src/billing.py", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.rm import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rm", "src/billing.py", "-j"]) assert args.json_out is True