"""Tests for ``muse rm``.
Covers:
- --cached: stage deletion without touching disk
- no --cached: stage deletion AND delete from disk
- -r / --recursive: required for directories
- -f / --force: bypass safety checks
- -n / --dry-run: preview without side effects
- --json: machine-readable output (always valid, including error paths)
- File not tracked → exit 1
- Directory without -r → exit 1
- Modified file without --force → exit 1
- Staged-addition without --force → exit 1
- Multiple paths in one invocation
- Idempotency: rm already-staged-for-deletion file
- JSON includes duration_ms (float ≥ 0) and exit_code (int)
- JSON on error: --json emits structured output even on user errors
- Path traversal rejected: paths outside the repo root are rejected
- File already gone from disk: tracked file missing on disk is handled gracefully
- Staged-mode-D idempotent with --json
- --dry-run still runs safety checks
- --dry-run --force bypasses safety checks and emits dry_run status
- Symlink in working tree: symlink to a committed path is handled correctly
- Unicode filenames
- Stress: 200 files, remove half
- Stress with timing: duration_ms is present and non-negative
- Data integrity: stage is valid after removal
"""
from __future__ import annotations
import datetime
import json
import pathlib
import pytest
from tests.cli_test_helper import CliRunner
from muse.core.types import blob_id
from muse.core.object_store import write_object
from muse.core.ids import hash_commit, hash_snapshot
from muse.core.commits import (
CommitRecord,
write_commit,
)
from muse.core.snapshots import (
SnapshotRecord,
write_snapshot,
)
from muse.core.types import Manifest
from muse.plugins.code.stage import StagedFileMap, make_entry, read_stage, write_stage
from muse.core.paths import heads_dir, muse_dir
type _EnvDict = dict[str, str]
type _FileBytes = dict[str, bytes]
cli = None # argparse migration — CliRunner ignores this arg
runner = CliRunner()
_REPO_ID = "rm-test"
# ---------------------------------------------------------------------------
# Test-repo bootstrap helpers
# ---------------------------------------------------------------------------
def _init_repo(path: pathlib.Path) -> pathlib.Path:
dot_muse = muse_dir(path)
for d in ("commits", "snapshots", "objects", "refs/heads"):
(dot_muse / d).mkdir(parents=True, exist_ok=True)
(dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
(dot_muse / "repo.json").write_text(
json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8"
)
return path
def _env(repo: pathlib.Path) -> _EnvDict:
return {"MUSE_REPO_ROOT": str(repo)}
_counter = 0
def _commit_files(root: pathlib.Path, files: _FileBytes) -> str:
"""Write *files* to disk and to the object store; create a commit."""
global _counter
_counter += 1
manifest: Manifest = {}
for rel_path, content in files.items():
oid = blob_id(content)
write_object(root, oid, content)
manifest[rel_path] = oid
abs_path = root / rel_path
abs_path.parent.mkdir(parents=True, exist_ok=True)
abs_path.write_bytes(content)
snap_id = hash_snapshot(manifest)
write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
committed_at = datetime.datetime.now(datetime.timezone.utc)
commit_id = hash_commit(
parent_ids=[],
snapshot_id=snap_id,
message=f"commit {_counter}",
committed_at_iso=committed_at.isoformat(),
)
write_commit(
root,
CommitRecord(
commit_id=commit_id,
branch="main",
snapshot_id=snap_id,
message=f"commit {_counter}",
committed_at=committed_at,
),
)
(heads_dir(root) / "main").write_text(commit_id, encoding="utf-8")
return commit_id
# ---------------------------------------------------------------------------
# help
# ---------------------------------------------------------------------------
def test_rm_help() -> None:
result = runner.invoke(cli, ["rm", "--help"])
assert result.exit_code == 0
assert "--cached" in result.output
# ---------------------------------------------------------------------------
# --cached: stage deletion, keep file on disk
# ---------------------------------------------------------------------------
def test_rm_cached_stages_deletion(tmp_path: pathlib.Path) -> None:
"""``muse rm --cached`` writes mode D to stage, leaves file on disk."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"song.txt": b"verse\n"})
result = runner.invoke(
cli, ["rm", "--cached", "song.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
assert (tmp_path / "song.txt").exists()
stage = read_stage(tmp_path)
assert "song.txt" in stage
assert stage["song.txt"]["mode"] == "D"
def test_rm_cached_json_output(tmp_path: pathlib.Path) -> None:
"""``muse rm --cached --json`` emits valid JSON with expected fields."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"notes.txt": b"A\n"})
result = runner.invoke(
cli, ["rm", "--cached", "--json", "notes.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["status"] == "removed"
assert "notes.txt" in data["removed"]
assert data["cached"] is True
assert data["dry_run"] is False
assert data["count"] == 1
# ---------------------------------------------------------------------------
# Without --cached: stage deletion AND delete from disk
# ---------------------------------------------------------------------------
def test_rm_deletes_file_from_disk(tmp_path: pathlib.Path) -> None:
"""``muse rm`` without --cached removes the file from disk."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"beat.mid": b"\x00\x01\x02"})
result = runner.invoke(cli, ["rm", "beat.mid"], env=_env(tmp_path))
assert result.exit_code == 0
assert not (tmp_path / "beat.mid").exists()
stage = read_stage(tmp_path)
assert stage["beat.mid"]["mode"] == "D"
def test_rm_json_no_cached(tmp_path: pathlib.Path) -> None:
"""``muse rm --json`` emits cached=false."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"f.txt": b"x\n"})
result = runner.invoke(cli, ["rm", "--json", "f.txt"], env=_env(tmp_path))
assert result.exit_code == 0
data = json.loads(result.output)
assert data["cached"] is False
assert data["count"] == 1
# ---------------------------------------------------------------------------
# File not tracked → exit 1
# ---------------------------------------------------------------------------
def test_rm_untracked_file_exits_1(tmp_path: pathlib.Path) -> None:
"""Removing an untracked file must exit non-zero."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"existing.txt": b"x\n"})
result = runner.invoke(
cli, ["rm", "does_not_exist.txt"], env=_env(tmp_path)
)
assert result.exit_code != 0
def test_rm_untracked_does_not_affect_stage(tmp_path: pathlib.Path) -> None:
"""Attempting to remove an untracked file must not mutate the stage."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"a.txt": b"a\n"})
runner.invoke(cli, ["rm", "ghost.txt"], env=_env(tmp_path))
stage = read_stage(tmp_path)
assert "a.txt" not in stage
def test_rm_error_json_untracked(tmp_path: pathlib.Path) -> None:
"""``muse rm --json`` on an untracked file emits JSON with exit_code=1."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"existing.txt": b"x\n"})
result = runner.invoke(
cli, ["rm", "--json", "ghost.txt"], env=_env(tmp_path)
)
assert result.exit_code != 0
# First line is JSON; remaining lines are stderr error messages.
data = json.loads(result.output.splitlines()[0])
assert data["exit_code"] == 1
assert data["status"] == "error"
assert data["count"] == 0
assert isinstance(data["duration_ms"], float)
# ---------------------------------------------------------------------------
# Directory without -r → exit 1
# ---------------------------------------------------------------------------
def test_rm_directory_without_recursive_exits_1(tmp_path: pathlib.Path) -> None:
"""Removing a directory path without -r must exit non-zero."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"src/main.py": b"pass\n"})
result = runner.invoke(cli, ["rm", "--cached", "src"], env=_env(tmp_path))
assert result.exit_code != 0
def test_rm_error_json_directory_without_r(tmp_path: pathlib.Path) -> None:
"""``muse rm --json
`` without -r emits JSON with exit_code=1."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"src/main.py": b"pass\n"})
result = runner.invoke(cli, ["rm", "--cached", "--json", "src"], env=_env(tmp_path))
assert result.exit_code != 0
data = json.loads(result.output.splitlines()[0])
assert data["exit_code"] == 1
assert data["status"] == "error"
def test_rm_directory_with_recursive_stages_all(tmp_path: pathlib.Path) -> None:
"""``muse rm -r --cached `` stages deletion for every file under dir."""
_init_repo(tmp_path)
_commit_files(
tmp_path,
{
"src/a.py": b"a\n",
"src/b.py": b"b\n",
"other.txt": b"c\n",
},
)
result = runner.invoke(
cli, ["rm", "-r", "--cached", "src"], env=_env(tmp_path)
)
assert result.exit_code == 0
stage = read_stage(tmp_path)
assert stage["src/a.py"]["mode"] == "D"
assert stage["src/b.py"]["mode"] == "D"
assert "other.txt" not in stage
# ---------------------------------------------------------------------------
# Modified file without --force → exit 1
# ---------------------------------------------------------------------------
def test_rm_modified_file_without_force_exits_1(tmp_path: pathlib.Path) -> None:
"""Removing a locally-modified file without --force must exit non-zero."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"track.txt": b"original\n"})
(tmp_path / "track.txt").write_bytes(b"modified\n")
result = runner.invoke(cli, ["rm", "track.txt"], env=_env(tmp_path))
assert result.exit_code != 0
assert (tmp_path / "track.txt").exists()
def test_rm_error_json_modified_without_force(tmp_path: pathlib.Path) -> None:
"""``muse rm --json`` on a modified file emits JSON with exit_code=1."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"track.txt": b"original\n"})
(tmp_path / "track.txt").write_bytes(b"modified\n")
result = runner.invoke(cli, ["rm", "--json", "track.txt"], env=_env(tmp_path))
assert result.exit_code != 0
data = json.loads(result.output.splitlines()[0])
assert data["exit_code"] == 1
assert data["status"] == "error"
assert isinstance(data["duration_ms"], float)
def test_rm_modified_file_with_force_succeeds(tmp_path: pathlib.Path) -> None:
"""``muse rm --force`` removes a locally-modified file."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"track.txt": b"original\n"})
(tmp_path / "track.txt").write_bytes(b"modified\n")
result = runner.invoke(cli, ["rm", "--force", "track.txt"], env=_env(tmp_path))
assert result.exit_code == 0
assert not (tmp_path / "track.txt").exists()
assert read_stage(tmp_path)["track.txt"]["mode"] == "D"
def test_rm_cached_modified_no_force_ok(tmp_path: pathlib.Path) -> None:
"""``muse rm --cached`` on a modified file is always safe (no disk delete)."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"track.txt": b"original\n"})
(tmp_path / "track.txt").write_bytes(b"modified\n")
result = runner.invoke(
cli, ["rm", "--cached", "track.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
assert (tmp_path / "track.txt").read_bytes() == b"modified\n"
assert read_stage(tmp_path)["track.txt"]["mode"] == "D"
# ---------------------------------------------------------------------------
# Staged-addition without --force → exit 1
# ---------------------------------------------------------------------------
def test_rm_staged_addition_without_force_exits_1(tmp_path: pathlib.Path) -> None:
"""Removing a staged-but-never-committed file without --force exits non-zero."""
_init_repo(tmp_path)
(tmp_path / "new.py").write_bytes(b"print('hi')\n")
stage: StagedFileMap = {
"new.py": make_entry(object_id=blob_id(b"print('hi')\n"), mode="A"),
}
write_stage(tmp_path, stage)
result = runner.invoke(cli, ["rm", "--cached", "new.py"], env=_env(tmp_path))
assert result.exit_code != 0
def test_rm_staged_addition_with_force_removes_from_stage(
tmp_path: pathlib.Path,
) -> None:
"""``muse rm --force --cached`` removes a staged-addition entry from stage."""
_init_repo(tmp_path)
(tmp_path / "new.py").write_bytes(b"print('hi')\n")
stage: StagedFileMap = {
"new.py": make_entry(object_id=blob_id(b"print('hi')\n"), mode="A"),
}
write_stage(tmp_path, stage)
result = runner.invoke(
cli, ["rm", "--force", "--cached", "new.py"], env=_env(tmp_path)
)
assert result.exit_code == 0
assert "new.py" not in read_stage(tmp_path)
# ---------------------------------------------------------------------------
# --dry-run: no side effects
# ---------------------------------------------------------------------------
def test_rm_dry_run_does_not_delete(tmp_path: pathlib.Path) -> None:
"""``muse rm -n`` must not delete the file or write to the stage."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"chord.txt": b"Am\n"})
result = runner.invoke(cli, ["rm", "-n", "chord.txt"], env=_env(tmp_path))
assert result.exit_code == 0
assert (tmp_path / "chord.txt").exists()
assert "chord.txt" not in read_stage(tmp_path)
def test_rm_dry_run_json(tmp_path: pathlib.Path) -> None:
"""``muse rm -n --json`` emits valid JSON with status=dry_run."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"chord.txt": b"Am\n"})
result = runner.invoke(
cli, ["rm", "-n", "--json", "chord.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["status"] == "dry_run"
assert "chord.txt" in data["removed"]
assert data["dry_run"] is True
assert data["count"] == 1
def test_rm_dry_run_cached(tmp_path: pathlib.Path) -> None:
"""``muse rm -n --cached`` previews correctly, writes nothing."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"f.txt": b"x\n"})
result = runner.invoke(
cli, ["rm", "-n", "--cached", "--json", "f.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["status"] == "dry_run"
assert data["cached"] is True
def test_rm_dry_run_still_checks_safety(tmp_path: pathlib.Path) -> None:
"""``--dry-run`` without ``--force`` still rejects modified files."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"track.txt": b"original\n"})
(tmp_path / "track.txt").write_bytes(b"modified\n")
result = runner.invoke(cli, ["rm", "-n", "track.txt"], env=_env(tmp_path))
assert result.exit_code != 0
# File must still be on disk and stage untouched.
assert (tmp_path / "track.txt").exists()
assert "track.txt" not in read_stage(tmp_path)
def test_rm_dry_run_force_bypasses_safety(tmp_path: pathlib.Path) -> None:
"""``--dry-run --force`` previews removal of a modified file without touching anything."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"track.txt": b"original\n"})
(tmp_path / "track.txt").write_bytes(b"modified\n")
result = runner.invoke(
cli, ["rm", "-n", "-f", "--json", "track.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["status"] == "dry_run"
assert "track.txt" in data["removed"]
# Nothing written.
assert (tmp_path / "track.txt").exists()
assert "track.txt" not in read_stage(tmp_path)
# ---------------------------------------------------------------------------
# Multiple paths in one invocation
# ---------------------------------------------------------------------------
def test_rm_multiple_files(tmp_path: pathlib.Path) -> None:
"""Multiple paths in one ``muse rm`` invocation are all removed."""
_init_repo(tmp_path)
_commit_files(
tmp_path,
{"a.txt": b"a\n", "b.txt": b"b\n", "c.txt": b"c\n"},
)
result = runner.invoke(
cli, ["rm", "--cached", "--json", "a.txt", "b.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["count"] == 2
assert "a.txt" in data["removed"]
assert "b.txt" in data["removed"]
assert "c.txt" not in data["removed"]
stage = read_stage(tmp_path)
assert stage["a.txt"]["mode"] == "D"
assert stage["b.txt"]["mode"] == "D"
assert "c.txt" not in stage
# ---------------------------------------------------------------------------
# Idempotency: remove already-staged-for-deletion
# ---------------------------------------------------------------------------
def test_rm_already_staged_deletion_is_idempotent(tmp_path: pathlib.Path) -> None:
"""Calling ``muse rm --cached`` twice on the same file is idempotent."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"x.txt": b"x\n"})
runner.invoke(cli, ["rm", "--cached", "x.txt"], env=_env(tmp_path))
result = runner.invoke(cli, ["rm", "--cached", "x.txt"], env=_env(tmp_path))
assert result.exit_code == 0
assert read_stage(tmp_path)["x.txt"]["mode"] == "D"
def test_rm_idempotent_json_still_valid(tmp_path: pathlib.Path) -> None:
"""Re-removing an already-staged-D file with --json still emits valid JSON."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"x.txt": b"x\n"})
runner.invoke(cli, ["rm", "--cached", "x.txt"], env=_env(tmp_path))
result = runner.invoke(cli, ["rm", "--cached", "--json", "x.txt"], env=_env(tmp_path))
assert result.exit_code == 0
data = json.loads(result.output)
assert data["status"] == "removed"
assert "x.txt" in data["removed"]
assert data["exit_code"] == 0
# ---------------------------------------------------------------------------
# duration_ms and exit_code in JSON
# ---------------------------------------------------------------------------
def test_rm_json_has_duration_ms(tmp_path: pathlib.Path) -> None:
"""``muse rm --json`` includes duration_ms as a non-negative float."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"t.txt": b"t\n"})
result = runner.invoke(cli, ["rm", "--cached", "--json", "t.txt"], env=_env(tmp_path))
assert result.exit_code == 0
data = json.loads(result.output)
assert "duration_ms" in data, "Missing duration_ms field"
assert isinstance(data["duration_ms"], float)
assert data["duration_ms"] >= 0.0
def test_rm_json_has_exit_code_zero_on_success(tmp_path: pathlib.Path) -> None:
"""``muse rm --json`` includes exit_code=0 on success."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"t.txt": b"t\n"})
result = runner.invoke(cli, ["rm", "--cached", "--json", "t.txt"], env=_env(tmp_path))
assert result.exit_code == 0
data = json.loads(result.output)
assert "exit_code" in data, "Missing exit_code field"
assert data["exit_code"] == 0
def test_rm_json_dry_run_has_duration_ms_and_exit_code(tmp_path: pathlib.Path) -> None:
"""``muse rm -n --json`` also includes duration_ms and exit_code."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"t.txt": b"t\n"})
result = runner.invoke(cli, ["rm", "-n", "--json", "t.txt"], env=_env(tmp_path))
assert result.exit_code == 0
data = json.loads(result.output)
assert data["duration_ms"] >= 0.0
assert data["exit_code"] == 0
# ---------------------------------------------------------------------------
# JSON schema completeness
# ---------------------------------------------------------------------------
def test_rm_json_schema_all_fields(tmp_path: pathlib.Path) -> None:
"""JSON output always contains all required fields."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"z.txt": b"z\n"})
result = runner.invoke(
cli, ["rm", "--json", "z.txt"], env=_env(tmp_path)
)
assert result.exit_code == 0
data = json.loads(result.output)
for key in ("status", "removed", "cached", "dry_run", "count", "duration_ms", "exit_code"):
assert key in data, f"Missing key: {key}"
# ---------------------------------------------------------------------------
# Security: path traversal and outside-repo paths
# ---------------------------------------------------------------------------
def test_rm_path_traversal_rejected(tmp_path: pathlib.Path) -> None:
"""A path that escapes the repo root via ``..`` must be rejected."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"a.txt": b"a\n"})
result = runner.invoke(cli, ["rm", "../escape.txt"], env=_env(tmp_path))
assert result.exit_code != 0
def test_rm_absolute_path_outside_repo_rejected(tmp_path: pathlib.Path) -> None:
"""An absolute path outside the repo must be rejected with exit 1."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"a.txt": b"a\n"})
outside = tmp_path.parent / "outside.txt"
outside.write_text("should not be removed", encoding="utf-8")
result = runner.invoke(cli, ["rm", str(outside)], env=_env(tmp_path))
assert result.exit_code != 0
assert outside.exists(), "File outside repo must not be deleted"
# ---------------------------------------------------------------------------
# Edge case: file already gone from disk
# ---------------------------------------------------------------------------
def test_rm_file_already_deleted_from_disk(tmp_path: pathlib.Path) -> None:
"""``muse rm`` on a tracked file that's already missing from disk stages deletion."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"gone.txt": b"was here\n"})
# Manually remove from disk without going through muse rm.
(tmp_path / "gone.txt").unlink()
result = runner.invoke(cli, ["rm", "--cached", "gone.txt"], env=_env(tmp_path))
assert result.exit_code == 0
assert read_stage(tmp_path)["gone.txt"]["mode"] == "D"
def test_rm_disk_missing_no_cached_succeeds(tmp_path: pathlib.Path) -> None:
"""``muse rm`` (no --cached) on a file already gone from disk just stages deletion."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"gone.txt": b"was here\n"})
(tmp_path / "gone.txt").unlink()
result = runner.invoke(cli, ["rm", "gone.txt"], env=_env(tmp_path))
assert result.exit_code == 0
assert read_stage(tmp_path)["gone.txt"]["mode"] == "D"
# ---------------------------------------------------------------------------
# Recursive removal with disk delete
# ---------------------------------------------------------------------------
def test_rm_recursive_deletes_from_disk(tmp_path: pathlib.Path) -> None:
"""``muse rm -r `` removes all tracked files under dir from disk."""
_init_repo(tmp_path)
_commit_files(
tmp_path,
{
"static/app.css": b"body{}\n",
"static/app.js": b"console.log(1)\n",
"src/main.py": b"pass\n",
},
)
result = runner.invoke(
cli, ["rm", "-r", "--json", "static"], env=_env(tmp_path)
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["count"] == 2
assert not (tmp_path / "static" / "app.css").exists()
assert not (tmp_path / "static" / "app.js").exists()
assert (tmp_path / "src" / "main.py").exists()
stage = read_stage(tmp_path)
assert stage["static/app.css"]["mode"] == "D"
assert stage["static/app.js"]["mode"] == "D"
assert "src/main.py" not in stage
# ---------------------------------------------------------------------------
# Unicode filenames
# ---------------------------------------------------------------------------
def test_rm_unicode_filename(tmp_path: pathlib.Path) -> None:
"""``muse rm`` handles filenames with non-ASCII characters."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"café.txt": "café content\n".encode("utf-8")})
result = runner.invoke(cli, ["rm", "--cached", "--json", "café.txt"], env=_env(tmp_path))
assert result.exit_code == 0
data = json.loads(result.output)
assert data["count"] == 1
assert read_stage(tmp_path)["café.txt"]["mode"] == "D"
# ---------------------------------------------------------------------------
# Data integrity: stage is valid JSON after removal
# ---------------------------------------------------------------------------
def test_rm_stage_integrity_after_removal(tmp_path: pathlib.Path) -> None:
"""The stage remains readable (valid JSON) after ``muse rm``."""
_init_repo(tmp_path)
_commit_files(
tmp_path,
{"keep.txt": b"keep\n", "remove.txt": b"remove\n"},
)
runner.invoke(cli, ["rm", "--cached", "remove.txt"], env=_env(tmp_path))
# Stage must be readable and contain exactly the expected entry.
stage = read_stage(tmp_path)
assert "remove.txt" in stage
assert stage["remove.txt"]["mode"] == "D"
assert "keep.txt" not in stage
def test_rm_stage_cleared_when_last_staged_entry_removed(tmp_path: pathlib.Path) -> None:
"""After removing all staged entries, the stage file is cleaned up."""
_init_repo(tmp_path)
_commit_files(tmp_path, {"only.txt": b"only\n"})
# Stage the deletion — this should leave only.txt as "D".
# Then do a second commit to move it to HEAD as deleted... but since we
# can't easily do that here, just verify that re-staging works cleanly.
runner.invoke(cli, ["rm", "--cached", "only.txt"], env=_env(tmp_path))
stage = read_stage(tmp_path)
# The file is in HEAD so it becomes mode D (not removed from stage entirely).
assert stage["only.txt"]["mode"] == "D"
# ---------------------------------------------------------------------------
# Stress: 200 files, remove half
# ---------------------------------------------------------------------------
def test_rm_stress_200_files(tmp_path: pathlib.Path) -> None:
"""Remove 100 of 200 committed files; verify stage and disk state."""
_init_repo(tmp_path)
files = {
f"file_{i:04d}.txt": f"content {i}\n".encode()
for i in range(200)
}
_commit_files(tmp_path, files)
to_remove = [f"file_{i:04d}.txt" for i in range(200) if i % 2 == 0]
result = runner.invoke(
cli,
["rm", "--cached", "--json"] + to_remove,
env=_env(tmp_path),
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["count"] == 100
stage = read_stage(tmp_path)
for i in range(200):
name = f"file_{i:04d}.txt"
if i % 2 == 0:
assert stage[name]["mode"] == "D"
else:
assert name not in stage
for name in files:
assert (tmp_path / name).exists()
def test_rm_stress_timing(tmp_path: pathlib.Path) -> None:
"""Stress remove 50 files with --json; duration_ms is present and non-negative."""
_init_repo(tmp_path)
files = {f"file_{i:03d}.txt": f"x{i}\n".encode() for i in range(50)}
_commit_files(tmp_path, files)
result = runner.invoke(
cli,
["rm", "--cached", "--json"] + list(files),
env=_env(tmp_path),
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["count"] == 50
assert isinstance(data["duration_ms"], float)
assert data["duration_ms"] >= 0.0
class TestRegisterFlags:
def test_default_json_out_is_false(self) -> None:
import argparse
from muse.cli.commands.rm import register
p = argparse.ArgumentParser()
subs = p.add_subparsers()
register(subs)
args = p.parse_args(["rm", "src/billing.py"])
assert args.json_out is False
def test_json_flag_sets_json_out(self) -> None:
import argparse
from muse.cli.commands.rm import register
p = argparse.ArgumentParser()
subs = p.add_subparsers()
register(subs)
args = p.parse_args(["rm", "src/billing.py", "--json"])
assert args.json_out is True
def test_j_shorthand_sets_json_out(self) -> None:
import argparse
from muse.cli.commands.rm import register
p = argparse.ArgumentParser()
subs = p.add_subparsers()
register(subs)
args = p.parse_args(["rm", "src/billing.py", "-j"])
assert args.json_out is True