test_cmd_clean.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Tests for ``muse clean``. |
| 2 | |
| 3 | Covers: --dry-run preview, --force delete, --directories, no-force error, |
| 4 | already-clean repo, multiple untracked files, stress: 500 untracked files. |
| 5 | """ |
| 6 | |
| 7 | from __future__ import annotations |
| 8 | |
| 9 | import json |
| 10 | import pathlib |
| 11 | |
| 12 | import pytest |
| 13 | from tests.cli_test_helper import CliRunner |
| 14 | |
| 15 | cli = None # argparse migration — CliRunner ignores this arg |
| 16 | from muse.core.object_store import write_object |
| 17 | from muse.core.ids import hash_commit, hash_snapshot |
| 18 | from muse.core.commits import ( |
| 19 | CommitRecord, |
| 20 | write_commit, |
| 21 | ) |
| 22 | from muse.core.snapshots import ( |
| 23 | SnapshotRecord, |
| 24 | write_snapshot, |
| 25 | ) |
| 26 | from muse.core.types import Manifest, blob_id |
| 27 | from muse.core.paths import heads_dir, muse_dir |
| 28 | |
| 29 | import datetime |
| 30 | |
| 31 | runner = CliRunner() |
| 32 | |
| 33 | |
| 34 | # --------------------------------------------------------------------------- |
| 35 | # Helpers |
| 36 | # --------------------------------------------------------------------------- |
| 37 | |
| 38 | |
| 39 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 40 | dot_muse = muse_dir(path) |
| 41 | (dot_muse / "commits").mkdir(parents=True) |
| 42 | (dot_muse / "snapshots").mkdir(parents=True) |
| 43 | (dot_muse / "objects").mkdir(parents=True) |
| 44 | (dot_muse / "refs" / "heads").mkdir(parents=True) |
| 45 | (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 46 | (dot_muse / "repo.json").write_text( |
| 47 | json.dumps({"repo_id": "clean-test", "domain": "midi"}), encoding="utf-8" |
| 48 | ) |
| 49 | return path |
| 50 | |
| 51 | |
| 52 | def _env(repo: pathlib.Path) -> Manifest: |
| 53 | return {"MUSE_REPO_ROOT": str(repo)} |
| 54 | |
| 55 | |
| 56 | def _commit_file(root: pathlib.Path, rel_path: str, content: bytes) -> str: |
| 57 | """Write a file, store its object, and commit it. Returns commit_id.""" |
| 58 | obj_id = blob_id(content) |
| 59 | write_object(root, obj_id, content) |
| 60 | (root / rel_path).write_bytes(content) |
| 61 | manifest = {rel_path: obj_id} |
| 62 | snap_id = hash_snapshot(manifest) |
| 63 | snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest) |
| 64 | write_snapshot(root, snap) |
| 65 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 66 | commit_id = hash_commit( parent_ids=[], |
| 67 | snapshot_id=snap_id, |
| 68 | message="initial", |
| 69 | committed_at_iso=committed_at.isoformat(), |
| 70 | ) |
| 71 | write_commit(root, CommitRecord( |
| 72 | commit_id=commit_id, |
| 73 | branch="main", |
| 74 | snapshot_id=snap_id, |
| 75 | message="initial", |
| 76 | committed_at=committed_at, |
| 77 | )) |
| 78 | (heads_dir(root) / "main").write_text(commit_id, encoding="utf-8") |
| 79 | return commit_id |
| 80 | |
| 81 | |
| 82 | # --------------------------------------------------------------------------- |
| 83 | # Unit: safety guard — no flags |
| 84 | # --------------------------------------------------------------------------- |
| 85 | |
| 86 | |
| 87 | def test_clean_no_force_exits_with_error(tmp_path: pathlib.Path) -> None: |
| 88 | _init_repo(tmp_path) |
| 89 | (tmp_path / "untracked.txt").write_text("hello", encoding="utf-8") |
| 90 | result = runner.invoke(cli, ["clean"], env=_env(tmp_path)) |
| 91 | assert result.exit_code != 0 |
| 92 | |
| 93 | |
| 94 | def test_clean_help() -> None: |
| 95 | result = runner.invoke(cli, ["clean", "--help"]) |
| 96 | assert result.exit_code == 0 |
| 97 | # Rich injects ANSI codes between '--' dashes; the short flag '-f' is reliable. |
| 98 | assert "--force" in result.output or "-f" in result.output |
| 99 | |
| 100 | |
| 101 | # --------------------------------------------------------------------------- |
| 102 | # Unit: dry-run shows but does not delete |
| 103 | # --------------------------------------------------------------------------- |
| 104 | |
| 105 | |
| 106 | def test_clean_dry_run_shows_untracked(tmp_path: pathlib.Path) -> None: |
| 107 | _init_repo(tmp_path) |
| 108 | _commit_file(tmp_path, "tracked.txt", b"I am tracked") |
| 109 | untracked = tmp_path / "ghost.txt" |
| 110 | untracked.write_text("untracked", encoding="utf-8") |
| 111 | |
| 112 | result = runner.invoke(cli, ["clean", "-n"], env=_env(tmp_path)) |
| 113 | assert result.exit_code == 0 |
| 114 | assert "ghost.txt" in result.output |
| 115 | assert untracked.exists() # not deleted |
| 116 | |
| 117 | |
| 118 | def test_clean_dry_run_short_flag(tmp_path: pathlib.Path) -> None: |
| 119 | _init_repo(tmp_path) |
| 120 | (tmp_path / "junk.txt").write_text("junk", encoding="utf-8") |
| 121 | result = runner.invoke(cli, ["clean", "-n"], env=_env(tmp_path)) |
| 122 | assert result.exit_code == 0 |
| 123 | |
| 124 | |
| 125 | # --------------------------------------------------------------------------- |
| 126 | # Unit: --force deletes untracked files |
| 127 | # --------------------------------------------------------------------------- |
| 128 | |
| 129 | |
| 130 | def test_clean_force_deletes_untracked(tmp_path: pathlib.Path) -> None: |
| 131 | _init_repo(tmp_path) |
| 132 | _commit_file(tmp_path, "kept.txt", b"keep me") |
| 133 | untracked = tmp_path / "delete_me.txt" |
| 134 | untracked.write_text("bye", encoding="utf-8") |
| 135 | |
| 136 | result = runner.invoke(cli, ["clean", "-f"], env=_env(tmp_path)) |
| 137 | assert result.exit_code == 0 |
| 138 | assert not untracked.exists() |
| 139 | assert (tmp_path / "kept.txt").exists() |
| 140 | |
| 141 | |
| 142 | def test_clean_force_nothing_to_clean(tmp_path: pathlib.Path) -> None: |
| 143 | _init_repo(tmp_path) |
| 144 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 145 | |
| 146 | result = runner.invoke(cli, ["clean", "-f"], env=_env(tmp_path)) |
| 147 | assert result.exit_code == 0 |
| 148 | assert "nothing" in result.output.lower() |
| 149 | |
| 150 | |
| 151 | # --------------------------------------------------------------------------- |
| 152 | # Unit: --directories removes empty dirs |
| 153 | # --------------------------------------------------------------------------- |
| 154 | |
| 155 | |
| 156 | def test_clean_directories_removes_empty_dir(tmp_path: pathlib.Path) -> None: |
| 157 | _init_repo(tmp_path) |
| 158 | _commit_file(tmp_path, "kept.txt", b"kept") |
| 159 | empty_dir = tmp_path / "empty_dir" |
| 160 | empty_dir.mkdir() |
| 161 | (empty_dir / "junk.txt").write_text("junk", encoding="utf-8") |
| 162 | |
| 163 | result = runner.invoke(cli, ["clean", "-f", "-d"], env=_env(tmp_path)) |
| 164 | assert result.exit_code == 0 |
| 165 | assert not (empty_dir / "junk.txt").exists() |
| 166 | |
| 167 | |
| 168 | # --------------------------------------------------------------------------- |
| 169 | # Integration: multiple untracked files |
| 170 | # --------------------------------------------------------------------------- |
| 171 | |
| 172 | |
| 173 | def test_clean_multiple_untracked(tmp_path: pathlib.Path) -> None: |
| 174 | _init_repo(tmp_path) |
| 175 | for i in range(10): |
| 176 | (tmp_path / f"untracked_{i}.txt").write_text(f"data {i}", encoding="utf-8") |
| 177 | |
| 178 | result = runner.invoke(cli, ["clean", "-f"], env=_env(tmp_path)) |
| 179 | assert result.exit_code == 0 |
| 180 | remaining = [f for f in tmp_path.iterdir() if f.name.startswith("untracked")] |
| 181 | assert len(remaining) == 0 |
| 182 | |
| 183 | |
| 184 | # --------------------------------------------------------------------------- |
| 185 | # Stress: 500 untracked files |
| 186 | # --------------------------------------------------------------------------- |
| 187 | |
| 188 | |
| 189 | def test_clean_stress_500_untracked(tmp_path: pathlib.Path) -> None: |
| 190 | _init_repo(tmp_path) |
| 191 | for i in range(500): |
| 192 | (tmp_path / f"stress_{i}.dat").write_bytes(b"x" * 100) |
| 193 | |
| 194 | result = runner.invoke(cli, ["clean", "-f"], env=_env(tmp_path)) |
| 195 | assert result.exit_code == 0 |
| 196 | remaining = list(tmp_path.glob("stress_*.dat")) |
| 197 | assert len(remaining) == 0 |