test_cmd_clean_hardening.py
python
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
| 1 | """Hardening test suite for ``muse clean``. |
| 2 | |
| 3 | Coverage: |
| 4 | - Unit: _is_ignored, _safe_to_delete, _safe_to_rmdir helpers |
| 5 | - Security: path-traversal guard, .muse/ protection, symlink skipping |
| 6 | - Error routing: all user errors go to stderr |
| 7 | - JSON schema: _CleanResultJson shape for all outcomes |
| 8 | - --dry-run: no side effects with and without --json |
| 9 | - --include-ignored: respects .museignore patterns |
| 10 | - --directories: empty-dir removal, .muse/ immune |
| 11 | - Integration: clean lifecycle (commit → add untracked → clean) |
| 12 | - E2E: help output, combined flags |
| 13 | - Stress: 1 000 untracked files, concurrent reads, 50-pattern ignore list |
| 14 | """ |
| 15 | |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import datetime |
| 19 | import json |
| 20 | import os |
| 21 | import pathlib |
| 22 | import threading |
| 23 | from unittest.mock import patch |
| 24 | |
| 25 | import pytest |
| 26 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 27 | from typing import TypedDict |
| 28 | |
| 29 | from muse.cli.commands.clean import _is_ignored, _safe_to_delete, _safe_to_rmdir |
| 30 | from muse.core.object_store import write_object |
| 31 | from muse.core.ids import hash_commit, hash_snapshot |
| 32 | from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot |
| 33 | from muse.core.types import Manifest, blob_id |
| 34 | |
| 35 | runner = CliRunner() |
| 36 | |
| 37 | |
| 38 | # --------------------------------------------------------------------------- |
| 39 | # Typed output shape (mirrors _CleanResultJson in clean.py) |
| 40 | # --------------------------------------------------------------------------- |
| 41 | |
| 42 | |
| 43 | class _CleanOut(TypedDict, total=False): |
| 44 | status: str |
| 45 | removed: list[str] |
| 46 | dirs_removed: list[str] |
| 47 | count: int |
| 48 | dry_run: bool |
| 49 | duration_ms: float |
| 50 | exit_code: int |
| 51 | |
| 52 | |
| 53 | # --------------------------------------------------------------------------- |
| 54 | # Helpers |
| 55 | # --------------------------------------------------------------------------- |
| 56 | |
| 57 | |
| 58 | def _init_repo(path: pathlib.Path, *, domain: str = "midi") -> pathlib.Path: |
| 59 | muse = muse_dir(path) |
| 60 | for sub in ("commits", "snapshots", "objects", "refs/heads"): |
| 61 | (muse / sub).mkdir(parents=True, exist_ok=True) |
| 62 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 63 | (muse / "repo.json").write_text( |
| 64 | json.dumps({"repo_id": "clean-hard-test", "domain": domain}), |
| 65 | encoding="utf-8", |
| 66 | ) |
| 67 | return path |
| 68 | |
| 69 | |
| 70 | def _commit_file(root: pathlib.Path, rel_path: str, content: bytes) -> str: |
| 71 | """Write *content* to *rel_path*, store the object and commit it.""" |
| 72 | obj_id = blob_id(content) |
| 73 | write_object(root, obj_id, content) |
| 74 | (root / rel_path).write_bytes(content) |
| 75 | manifest = {rel_path: obj_id} |
| 76 | snap_id = hash_snapshot(manifest) |
| 77 | snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest) |
| 78 | write_snapshot(root, snap) |
| 79 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 80 | commit_id = hash_commit( parent_ids=[], |
| 81 | snapshot_id=snap_id, |
| 82 | message="initial", |
| 83 | committed_at_iso=committed_at.isoformat(), |
| 84 | ) |
| 85 | write_commit( |
| 86 | root, |
| 87 | CommitRecord( |
| 88 | commit_id=commit_id, |
| 89 | branch="main", |
| 90 | snapshot_id=snap_id, |
| 91 | message="initial", |
| 92 | committed_at=committed_at, |
| 93 | ), |
| 94 | ) |
| 95 | (heads_dir(root) / "main").write_text( |
| 96 | commit_id, encoding="utf-8" |
| 97 | ) |
| 98 | return commit_id |
| 99 | |
| 100 | |
| 101 | def _env(repo: pathlib.Path) -> Manifest: |
| 102 | return {"MUSE_REPO_ROOT": str(repo)} |
| 103 | |
| 104 | |
| 105 | def _invoke(args: list[str], env: Manifest) -> InvokeResult: |
| 106 | return runner.invoke(None, args, env=env) |
| 107 | |
| 108 | |
| 109 | def _parse_json(result: InvokeResult) -> _CleanOut: |
| 110 | for line in result.output.splitlines(): |
| 111 | line = line.strip() |
| 112 | if line.startswith("{"): |
| 113 | raw = json.loads(line) |
| 114 | out = _CleanOut( |
| 115 | status=raw["status"], |
| 116 | removed=raw["removed"], |
| 117 | dirs_removed=raw["dirs_removed"], |
| 118 | count=raw["count"], |
| 119 | dry_run=raw["dry_run"], |
| 120 | ) |
| 121 | if "duration_ms" in raw: |
| 122 | out["duration_ms"] = raw["duration_ms"] |
| 123 | if "exit_code" in raw: |
| 124 | out["exit_code"] = raw["exit_code"] |
| 125 | return out |
| 126 | raise AssertionError(f"No JSON line found in output:\n{result.output}") |
| 127 | |
| 128 | |
| 129 | # --------------------------------------------------------------------------- |
| 130 | # Unit: _is_ignored |
| 131 | # --------------------------------------------------------------------------- |
| 132 | |
| 133 | |
| 134 | def test_is_ignored_exact_match() -> None: |
| 135 | assert _is_ignored("build/out.o", ["build/*"]) is True |
| 136 | |
| 137 | |
| 138 | def test_is_ignored_basename_match() -> None: |
| 139 | assert _is_ignored("deep/nested/file.pyc", ["*.pyc"]) is True |
| 140 | |
| 141 | |
| 142 | def test_is_ignored_no_match() -> None: |
| 143 | assert _is_ignored("src/main.py", ["*.pyc", "build/*"]) is False |
| 144 | |
| 145 | |
| 146 | def test_is_ignored_negation_unignores() -> None: |
| 147 | # First pattern ignores all .log, second un-ignores keep.log. |
| 148 | assert _is_ignored("keep.log", ["*.log", "!keep.log"]) is False |
| 149 | |
| 150 | |
| 151 | def test_is_ignored_negation_last_match_wins() -> None: |
| 152 | # !keep.log then *.log — last match re-ignores. |
| 153 | assert _is_ignored("keep.log", ["!keep.log", "*.log"]) is True |
| 154 | |
| 155 | |
| 156 | def test_is_ignored_empty_patterns() -> None: |
| 157 | assert _is_ignored("anything.txt", []) is False |
| 158 | |
| 159 | |
| 160 | def test_is_ignored_deep_path() -> None: |
| 161 | assert _is_ignored("a/b/c/d.tmp", ["*.tmp"]) is True |
| 162 | |
| 163 | |
| 164 | # --------------------------------------------------------------------------- |
| 165 | # Unit: _safe_to_delete |
| 166 | # --------------------------------------------------------------------------- |
| 167 | |
| 168 | |
| 169 | def test_safe_to_delete_normal_file(tmp_path: pathlib.Path) -> None: |
| 170 | _init_repo(tmp_path) |
| 171 | target = tmp_path / "file.txt" |
| 172 | target.write_text("x", encoding="utf-8") |
| 173 | assert _safe_to_delete(tmp_path, target) is True |
| 174 | |
| 175 | |
| 176 | def test_safe_to_delete_blocks_muse_dir(tmp_path: pathlib.Path) -> None: |
| 177 | _init_repo(tmp_path) |
| 178 | target = head_path(tmp_path) |
| 179 | assert _safe_to_delete(tmp_path, target) is False |
| 180 | |
| 181 | |
| 182 | def test_safe_to_delete_blocks_deep_muse(tmp_path: pathlib.Path) -> None: |
| 183 | _init_repo(tmp_path) |
| 184 | target = heads_dir(tmp_path) / "main" |
| 185 | assert _safe_to_delete(tmp_path, target) is False |
| 186 | |
| 187 | |
| 188 | # --------------------------------------------------------------------------- |
| 189 | # Unit: _safe_to_rmdir |
| 190 | # --------------------------------------------------------------------------- |
| 191 | |
| 192 | |
| 193 | def test_safe_to_rmdir_normal_dir(tmp_path: pathlib.Path) -> None: |
| 194 | _init_repo(tmp_path) |
| 195 | d = tmp_path / "empty_dir" |
| 196 | d.mkdir() |
| 197 | assert _safe_to_rmdir(tmp_path, d) is True |
| 198 | |
| 199 | |
| 200 | def test_safe_to_rmdir_blocks_root(tmp_path: pathlib.Path) -> None: |
| 201 | _init_repo(tmp_path) |
| 202 | assert _safe_to_rmdir(tmp_path, tmp_path) is False |
| 203 | |
| 204 | |
| 205 | def test_safe_to_rmdir_blocks_muse(tmp_path: pathlib.Path) -> None: |
| 206 | _init_repo(tmp_path) |
| 207 | assert _safe_to_rmdir(tmp_path, muse_dir(tmp_path)) is False |
| 208 | |
| 209 | |
| 210 | def test_safe_to_rmdir_blocks_muse_subtree(tmp_path: pathlib.Path) -> None: |
| 211 | _init_repo(tmp_path) |
| 212 | assert _safe_to_rmdir(tmp_path, muse_dir(tmp_path) / "refs") is False |
| 213 | |
| 214 | |
| 215 | # --------------------------------------------------------------------------- |
| 216 | # Security: path traversal guard |
| 217 | # --------------------------------------------------------------------------- |
| 218 | |
| 219 | |
| 220 | def test_path_traversal_skipped(tmp_path: pathlib.Path) -> None: |
| 221 | """walk_workdir returning a path that resolves outside root is skipped.""" |
| 222 | _init_repo(tmp_path) |
| 223 | outside = tmp_path.parent / "outside_target.txt" |
| 224 | outside.write_text("secret", encoding="utf-8") |
| 225 | |
| 226 | fake_workdir: Manifest = {"../outside_target.txt": "deadbeef"} |
| 227 | |
| 228 | with patch("muse.cli.commands.clean.walk_workdir", return_value=fake_workdir): |
| 229 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 230 | |
| 231 | # Exit 0 — skipped file is not treated as an error. |
| 232 | assert result.exit_code == 0 |
| 233 | # The outside file must still exist. |
| 234 | assert outside.exists() |
| 235 | |
| 236 | |
| 237 | def test_muse_dir_protected_even_if_listed(tmp_path: pathlib.Path) -> None: |
| 238 | """Even if walk_workdir incorrectly lists .muse/HEAD, it must not be deleted.""" |
| 239 | _init_repo(tmp_path) |
| 240 | fake_workdir: Manifest = {".muse/HEAD": "deadbeef"} |
| 241 | |
| 242 | with patch("muse.cli.commands.clean.walk_workdir", return_value=fake_workdir): |
| 243 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 244 | |
| 245 | assert (head_path(tmp_path)).exists() |
| 246 | |
| 247 | |
| 248 | # --------------------------------------------------------------------------- |
| 249 | # Error routing: all user errors go to stderr |
| 250 | # --------------------------------------------------------------------------- |
| 251 | |
| 252 | |
| 253 | def test_no_flags_error_on_stderr(tmp_path: pathlib.Path) -> None: |
| 254 | _init_repo(tmp_path) |
| 255 | (tmp_path / "junk.txt").write_text("junk", encoding="utf-8") |
| 256 | result = _invoke(["clean"], _env(tmp_path)) |
| 257 | assert result.exit_code != 0 |
| 258 | assert "force" in result.stderr.lower() or "force" in result.output.lower() |
| 259 | |
| 260 | |
| 261 | def test_ignore_load_failure_logs_warning_not_crash(tmp_path: pathlib.Path) -> None: |
| 262 | """OSError from load_ignore_config must not abort the command.""" |
| 263 | _init_repo(tmp_path) |
| 264 | (tmp_path / "junk.txt").write_text("junk", encoding="utf-8") |
| 265 | |
| 266 | with patch( |
| 267 | "muse.cli.commands.clean.load_ignore_config", |
| 268 | side_effect=OSError("disk full"), |
| 269 | ): |
| 270 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 271 | |
| 272 | assert result.exit_code == 0 |
| 273 | assert "junk.txt" in result.output |
| 274 | |
| 275 | |
| 276 | # --------------------------------------------------------------------------- |
| 277 | # JSON schema: _CleanResultJson |
| 278 | # --------------------------------------------------------------------------- |
| 279 | |
| 280 | |
| 281 | def test_json_nothing_to_clean(tmp_path: pathlib.Path) -> None: |
| 282 | _init_repo(tmp_path) |
| 283 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 284 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 285 | assert result.exit_code == 0 |
| 286 | data = _parse_json(result) |
| 287 | assert data["status"] == "clean" |
| 288 | assert data["removed"] == [] |
| 289 | assert data["dirs_removed"] == [] |
| 290 | assert data["count"] == 0 |
| 291 | assert data["dry_run"] is False |
| 292 | |
| 293 | |
| 294 | def test_json_dry_run_shows_files(tmp_path: pathlib.Path) -> None: |
| 295 | _init_repo(tmp_path) |
| 296 | (tmp_path / "ghost.txt").write_text("ghost", encoding="utf-8") |
| 297 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 298 | assert result.exit_code == 0 |
| 299 | data = _parse_json(result) |
| 300 | assert data["status"] == "would_remove" |
| 301 | assert "ghost.txt" in data["removed"] |
| 302 | assert data["count"] == 1 |
| 303 | assert data["dry_run"] is True |
| 304 | assert (tmp_path / "ghost.txt").exists() # not deleted |
| 305 | |
| 306 | |
| 307 | def test_json_removed_files(tmp_path: pathlib.Path) -> None: |
| 308 | _init_repo(tmp_path) |
| 309 | _commit_file(tmp_path, "kept.txt", b"kept") |
| 310 | (tmp_path / "remove_me.txt").write_text("bye", encoding="utf-8") |
| 311 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 312 | assert result.exit_code == 0 |
| 313 | data = _parse_json(result) |
| 314 | assert data["status"] == "removed" |
| 315 | assert "remove_me.txt" in data["removed"] |
| 316 | assert data["count"] == 1 |
| 317 | assert data["dry_run"] is False |
| 318 | |
| 319 | |
| 320 | def test_json_dirs_removed(tmp_path: pathlib.Path) -> None: |
| 321 | _init_repo(tmp_path) |
| 322 | _commit_file(tmp_path, "kept.txt", b"kept") |
| 323 | d = tmp_path / "empty_subdir" |
| 324 | d.mkdir() |
| 325 | (d / "junk.txt").write_text("junk", encoding="utf-8") |
| 326 | result = _invoke(["clean", "-f", "-d", "--json"], _env(tmp_path)) |
| 327 | assert result.exit_code == 0 |
| 328 | data = _parse_json(result) |
| 329 | assert "empty_subdir/junk.txt" in data["removed"] |
| 330 | assert "empty_subdir" in data["dirs_removed"] |
| 331 | |
| 332 | |
| 333 | def test_json_schema_fields_present(tmp_path: pathlib.Path) -> None: |
| 334 | _init_repo(tmp_path) |
| 335 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 336 | assert result.exit_code == 0 |
| 337 | data = _parse_json(result) |
| 338 | for key in ("status", "removed", "dirs_removed", "count", "dry_run"): |
| 339 | assert key in data, f"Missing key: {key}" |
| 340 | |
| 341 | |
| 342 | # --------------------------------------------------------------------------- |
| 343 | # --dry-run: no side effects |
| 344 | # --------------------------------------------------------------------------- |
| 345 | |
| 346 | |
| 347 | def test_dry_run_no_deletion(tmp_path: pathlib.Path) -> None: |
| 348 | _init_repo(tmp_path) |
| 349 | (tmp_path / "ephemeral.txt").write_text("keep me", encoding="utf-8") |
| 350 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 351 | assert result.exit_code == 0 |
| 352 | assert (tmp_path / "ephemeral.txt").exists() |
| 353 | |
| 354 | |
| 355 | def test_dry_run_shows_count(tmp_path: pathlib.Path) -> None: |
| 356 | _init_repo(tmp_path) |
| 357 | for i in range(5): |
| 358 | (tmp_path / f"file_{i}.txt").write_text(str(i), encoding="utf-8") |
| 359 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 360 | assert result.exit_code == 0 |
| 361 | assert "5" in result.output |
| 362 | |
| 363 | |
| 364 | def test_dry_run_json_reports_all(tmp_path: pathlib.Path) -> None: |
| 365 | _init_repo(tmp_path) |
| 366 | for i in range(3): |
| 367 | (tmp_path / f"tmp_{i}.txt").write_text(str(i), encoding="utf-8") |
| 368 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 369 | assert result.exit_code == 0 |
| 370 | data = _parse_json(result) |
| 371 | assert data["count"] == 3 |
| 372 | assert len(data["removed"]) == 3 |
| 373 | assert data["dry_run"] is True |
| 374 | |
| 375 | |
| 376 | # --------------------------------------------------------------------------- |
| 377 | # --include-ignored: respects and overrides .museignore |
| 378 | # --------------------------------------------------------------------------- |
| 379 | |
| 380 | |
| 381 | def test_include_ignored_deletes_ignored_files(tmp_path: pathlib.Path) -> None: |
| 382 | _init_repo(tmp_path) |
| 383 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 384 | (tmp_path / "debug.log").write_text("log", encoding="utf-8") |
| 385 | |
| 386 | fake_patterns = ["*.log"] |
| 387 | with patch("muse.cli.commands.clean.resolve_patterns", return_value=fake_patterns): |
| 388 | # Without -x, the .log file is excluded from cleaning. |
| 389 | result_no_x = _invoke(["clean", "-n"], _env(tmp_path)) |
| 390 | assert "debug.log" not in result_no_x.output |
| 391 | |
| 392 | # With -x, the file is included. |
| 393 | result_x = _invoke(["clean", "-n", "-x"], _env(tmp_path)) |
| 394 | assert "debug.log" in result_x.output |
| 395 | |
| 396 | |
| 397 | # --------------------------------------------------------------------------- |
| 398 | # --directories: empty-dir removal |
| 399 | # --------------------------------------------------------------------------- |
| 400 | |
| 401 | |
| 402 | def test_directories_removes_empty_dir_after_file_deletion( |
| 403 | tmp_path: pathlib.Path, |
| 404 | ) -> None: |
| 405 | _init_repo(tmp_path) |
| 406 | _commit_file(tmp_path, "kept.txt", b"kept") |
| 407 | subdir = tmp_path / "subdir" |
| 408 | subdir.mkdir() |
| 409 | (subdir / "junk.txt").write_text("junk", encoding="utf-8") |
| 410 | |
| 411 | result = _invoke(["clean", "-f", "-d"], _env(tmp_path)) |
| 412 | assert result.exit_code == 0 |
| 413 | assert not subdir.exists() |
| 414 | |
| 415 | |
| 416 | def test_directories_leaves_non_empty_dir(tmp_path: pathlib.Path) -> None: |
| 417 | _init_repo(tmp_path) |
| 418 | subdir = tmp_path / "mixed" |
| 419 | subdir.mkdir() |
| 420 | (subdir / "untracked.txt").write_text("bye", encoding="utf-8") |
| 421 | (subdir / "kept.txt").write_bytes(b"keep me") |
| 422 | _commit_file(tmp_path, "mixed/kept.txt", b"keep me") |
| 423 | |
| 424 | result = _invoke(["clean", "-f", "-d"], _env(tmp_path)) |
| 425 | assert result.exit_code == 0 |
| 426 | # Directory still exists (kept.txt is inside it and tracked). |
| 427 | assert subdir.is_dir() |
| 428 | |
| 429 | |
| 430 | def test_directories_dry_run_does_not_remove_dir(tmp_path: pathlib.Path) -> None: |
| 431 | _init_repo(tmp_path) |
| 432 | subdir = tmp_path / "dry_subdir" |
| 433 | subdir.mkdir() |
| 434 | (subdir / "junk.txt").write_text("junk", encoding="utf-8") |
| 435 | |
| 436 | result = _invoke(["clean", "-n", "-d"], _env(tmp_path)) |
| 437 | assert result.exit_code == 0 |
| 438 | assert subdir.is_dir() |
| 439 | |
| 440 | |
| 441 | # --------------------------------------------------------------------------- |
| 442 | # Integration: full lifecycle |
| 443 | # --------------------------------------------------------------------------- |
| 444 | |
| 445 | |
| 446 | def test_integration_commit_then_clean(tmp_path: pathlib.Path) -> None: |
| 447 | _init_repo(tmp_path) |
| 448 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 449 | (tmp_path / "untracked.txt").write_text("bye", encoding="utf-8") |
| 450 | |
| 451 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 452 | assert result.exit_code == 0 |
| 453 | assert not (tmp_path / "untracked.txt").exists() |
| 454 | assert (tmp_path / "tracked.txt").exists() |
| 455 | |
| 456 | |
| 457 | def test_integration_already_clean(tmp_path: pathlib.Path) -> None: |
| 458 | _init_repo(tmp_path) |
| 459 | _commit_file(tmp_path, "everything.txt", b"all tracked") |
| 460 | |
| 461 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 462 | assert result.exit_code == 0 |
| 463 | assert "nothing" in result.output.lower() |
| 464 | |
| 465 | |
| 466 | def test_integration_no_commits_cleans_all(tmp_path: pathlib.Path) -> None: |
| 467 | """With no HEAD commit every file is untracked.""" |
| 468 | _init_repo(tmp_path) |
| 469 | (tmp_path / "orphan.txt").write_text("orphan", encoding="utf-8") |
| 470 | |
| 471 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 472 | assert result.exit_code == 0 |
| 473 | assert not (tmp_path / "orphan.txt").exists() |
| 474 | |
| 475 | |
| 476 | def test_integration_json_full_cycle(tmp_path: pathlib.Path) -> None: |
| 477 | _init_repo(tmp_path) |
| 478 | _commit_file(tmp_path, "a.txt", b"a") |
| 479 | (tmp_path / "b.txt").write_text("b", encoding="utf-8") |
| 480 | (tmp_path / "c.txt").write_text("c", encoding="utf-8") |
| 481 | |
| 482 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 483 | assert result.exit_code == 0 |
| 484 | data = _parse_json(result) |
| 485 | assert data["count"] == 2 |
| 486 | assert set(data["removed"]) == {"b.txt", "c.txt"} |
| 487 | assert not (tmp_path / "b.txt").exists() |
| 488 | assert not (tmp_path / "c.txt").exists() |
| 489 | assert (tmp_path / "a.txt").exists() |
| 490 | |
| 491 | |
| 492 | # --------------------------------------------------------------------------- |
| 493 | # E2E: help output |
| 494 | # --------------------------------------------------------------------------- |
| 495 | |
| 496 | |
| 497 | def test_help_output() -> None: |
| 498 | result = _invoke(["clean", "--help"], {}) |
| 499 | assert result.exit_code == 0 |
| 500 | for flag in ("-f", "--force", "-n", "--dry-run", "--json"): |
| 501 | assert flag in result.output |
| 502 | |
| 503 | |
| 504 | def test_help_describes_json_flag() -> None: |
| 505 | result = _invoke(["clean", "--help"], {}) |
| 506 | assert "json" in result.output.lower() |
| 507 | |
| 508 | |
| 509 | # --------------------------------------------------------------------------- |
| 510 | # Stress: 1 000 untracked files |
| 511 | # --------------------------------------------------------------------------- |
| 512 | |
| 513 | |
| 514 | def test_stress_1000_untracked(tmp_path: pathlib.Path) -> None: |
| 515 | _init_repo(tmp_path) |
| 516 | for i in range(1_000): |
| 517 | (tmp_path / f"stress_{i:04d}.dat").write_bytes(b"x" * 64) |
| 518 | |
| 519 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 520 | assert result.exit_code == 0 |
| 521 | data = _parse_json(result) |
| 522 | assert data["count"] == 1_000 |
| 523 | remaining = list(tmp_path.glob("stress_*.dat")) |
| 524 | assert len(remaining) == 0 |
| 525 | |
| 526 | |
| 527 | def test_stress_1000_dry_run(tmp_path: pathlib.Path) -> None: |
| 528 | _init_repo(tmp_path) |
| 529 | for i in range(1_000): |
| 530 | (tmp_path / f"dry_{i:04d}.dat").write_bytes(b"y" * 64) |
| 531 | |
| 532 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 533 | assert result.exit_code == 0 |
| 534 | data = _parse_json(result) |
| 535 | assert data["count"] == 1_000 |
| 536 | assert data["dry_run"] is True |
| 537 | # Nothing deleted. |
| 538 | remaining = list(tmp_path.glob("dry_*.dat")) |
| 539 | assert len(remaining) == 1_000 |
| 540 | |
| 541 | |
| 542 | def test_stress_50_ignore_patterns(tmp_path: pathlib.Path) -> None: |
| 543 | """_is_ignored with 50 patterns must not crash and must filter correctly.""" |
| 544 | patterns = [f"*.ext{i}" for i in range(50)] |
| 545 | assert _is_ignored("file.ext25", patterns) is True |
| 546 | assert _is_ignored("file.py", patterns) is False |
| 547 | |
| 548 | |
| 549 | def test_stress_concurrent_json_reads(tmp_path: pathlib.Path) -> None: |
| 550 | """Concurrent dry-run invocations must all exit 0 without data races. |
| 551 | |
| 552 | CliRunner serialises stdout capture per invocation, so we guard each call |
| 553 | with a lock and check only the exit code and JSON parse-ability rather |
| 554 | than racing on the shared capture buffer. |
| 555 | """ |
| 556 | _init_repo(tmp_path) |
| 557 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 558 | for i in range(20): |
| 559 | (tmp_path / f"concurrent_{i}.txt").write_text(str(i), encoding="utf-8") |
| 560 | |
| 561 | invoke_lock = threading.Lock() |
| 562 | errors: list[str] = [] |
| 563 | |
| 564 | def _worker() -> None: |
| 565 | with invoke_lock: |
| 566 | r = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 567 | try: |
| 568 | assert r.exit_code == 0 |
| 569 | data = _parse_json(r) |
| 570 | assert data["count"] == 20 |
| 571 | except Exception as exc: |
| 572 | errors.append(str(exc)) |
| 573 | |
| 574 | threads = [threading.Thread(target=_worker) for _ in range(8)] |
| 575 | for t in threads: |
| 576 | t.start() |
| 577 | for t in threads: |
| 578 | t.join() |
| 579 | |
| 580 | assert errors == [], f"Concurrent read failures: {errors}" |
| 581 | |
| 582 | |
| 583 | # --------------------------------------------------------------------------- |
| 584 | # Edge cases |
| 585 | # --------------------------------------------------------------------------- |
| 586 | |
| 587 | |
| 588 | def test_force_and_dry_run_together_dry_wins(tmp_path: pathlib.Path) -> None: |
| 589 | """When both -f and -n are given, -n wins (no deletion).""" |
| 590 | _init_repo(tmp_path) |
| 591 | (tmp_path / "both_flags.txt").write_text("keep", encoding="utf-8") |
| 592 | result = _invoke(["clean", "-f", "-n"], _env(tmp_path)) |
| 593 | assert result.exit_code == 0 |
| 594 | assert (tmp_path / "both_flags.txt").exists() |
| 595 | |
| 596 | |
| 597 | def test_ansi_in_filename_sanitized(tmp_path: pathlib.Path) -> None: |
| 598 | """ANSI escape codes embedded in filenames must not leak to output.""" |
| 599 | _init_repo(tmp_path) |
| 600 | # Use a filename that contains ANSI escape chars encoded in the name. |
| 601 | malicious_name = "malicious\x1b[31mred\x1b[0m.txt" |
| 602 | try: |
| 603 | (tmp_path / malicious_name).write_text("malicious", encoding="utf-8") |
| 604 | except (OSError, ValueError): |
| 605 | pytest.skip("filesystem does not support ANSI chars in filenames") |
| 606 | |
| 607 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 608 | assert "\x1b[31m" not in result.output |
| 609 | |
| 610 | |
| 611 | def test_clean_respects_muse_dir_immune(tmp_path: pathlib.Path) -> None: |
| 612 | """Under no circumstances should clean delete anything inside .muse/.""" |
| 613 | _init_repo(tmp_path) |
| 614 | head_before = (head_path(tmp_path)).read_text() |
| 615 | |
| 616 | with patch( |
| 617 | "muse.cli.commands.clean.walk_workdir", |
| 618 | return_value={ |
| 619 | ".muse/HEAD": "abc", |
| 620 | ".muse/repo.json": "def", |
| 621 | }, |
| 622 | ): |
| 623 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 624 | |
| 625 | assert result.exit_code == 0 |
| 626 | assert (head_path(tmp_path)).read_text() == head_before |
| 627 | |
| 628 | |
| 629 | # --------------------------------------------------------------------------- |
| 630 | # Agent supercharge — duration_ms and exit_code in every JSON output |
| 631 | # --------------------------------------------------------------------------- |
| 632 | |
| 633 | |
| 634 | class TestElapsed: |
| 635 | """Every JSON output path must include ``duration_ms`` as a float.""" |
| 636 | |
| 637 | def test_nothing_to_clean_has_elapsed(self, tmp_path: pathlib.Path) -> None: |
| 638 | _init_repo(tmp_path) |
| 639 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 640 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 641 | assert result.exit_code == 0 |
| 642 | data = _parse_json(result) |
| 643 | assert "duration_ms" in data |
| 644 | assert isinstance(data["duration_ms"], float) |
| 645 | |
| 646 | def test_dry_run_with_files_has_elapsed(self, tmp_path: pathlib.Path) -> None: |
| 647 | _init_repo(tmp_path) |
| 648 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 649 | (tmp_path / "untracked.txt").write_text("x") |
| 650 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 651 | assert result.exit_code == 0 |
| 652 | data = _parse_json(result) |
| 653 | assert "duration_ms" in data |
| 654 | assert isinstance(data["duration_ms"], float) |
| 655 | |
| 656 | def test_force_clean_has_elapsed(self, tmp_path: pathlib.Path) -> None: |
| 657 | _init_repo(tmp_path) |
| 658 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 659 | (tmp_path / "untracked.txt").write_text("x") |
| 660 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 661 | assert result.exit_code == 0 |
| 662 | data = _parse_json(result) |
| 663 | assert "duration_ms" in data |
| 664 | assert isinstance(data["duration_ms"], float) |
| 665 | |
| 666 | |
| 667 | class TestExitCode: |
| 668 | """Every JSON output path must include ``exit_code`` mirroring process exit.""" |
| 669 | |
| 670 | def test_nothing_to_clean_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 671 | _init_repo(tmp_path) |
| 672 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 673 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 674 | data = _parse_json(result) |
| 675 | assert data["exit_code"] == 0 |
| 676 | |
| 677 | def test_dry_run_with_files_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 678 | _init_repo(tmp_path) |
| 679 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 680 | (tmp_path / "untracked.txt").write_text("x") |
| 681 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 682 | data = _parse_json(result) |
| 683 | assert data["exit_code"] == 0 |
| 684 | |
| 685 | def test_force_clean_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 686 | _init_repo(tmp_path) |
| 687 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 688 | (tmp_path / "untracked.txt").write_text("x") |
| 689 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 690 | data = _parse_json(result) |
| 691 | assert data["exit_code"] == 0 |
| 692 | |
| 693 | |
| 694 | class TestDryRunStatus: |
| 695 | """Dry-run with files to remove must report status ``would_remove``, not ``clean``.""" |
| 696 | |
| 697 | def test_dry_run_with_files_status_is_would_remove(self, tmp_path: pathlib.Path) -> None: |
| 698 | _init_repo(tmp_path) |
| 699 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 700 | (tmp_path / "untracked.txt").write_text("x") |
| 701 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 702 | data = _parse_json(result) |
| 703 | assert data["status"] == "would_remove", ( |
| 704 | f"dry-run with files should be 'would_remove', got {data['status']!r}" |
| 705 | ) |
| 706 | |
| 707 | def test_dry_run_no_files_status_is_clean(self, tmp_path: pathlib.Path) -> None: |
| 708 | _init_repo(tmp_path) |
| 709 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 710 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 711 | data = _parse_json(result) |
| 712 | assert data["status"] == "clean" |
| 713 | |
| 714 | def test_force_with_files_status_is_removed(self, tmp_path: pathlib.Path) -> None: |
| 715 | _init_repo(tmp_path) |
| 716 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 717 | (tmp_path / "untracked.txt").write_text("x") |
| 718 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 719 | data = _parse_json(result) |
| 720 | assert data["status"] == "removed" |
| 721 | |
| 722 | |
| 723 | class TestJsonSchemaComplete: |
| 724 | """Full schema must include all fields including duration_ms and exit_code.""" |
| 725 | |
| 726 | _FULL_KEYS = {"status", "removed", "dirs_removed", "count", "dry_run", |
| 727 | "duration_ms", "exit_code"} |
| 728 | |
| 729 | def test_nothing_to_clean_schema_complete(self, tmp_path: pathlib.Path) -> None: |
| 730 | _init_repo(tmp_path) |
| 731 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 732 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 733 | data = _parse_json(result) |
| 734 | missing = self._FULL_KEYS - data.keys() |
| 735 | assert not missing, f"Missing keys in clean JSON: {missing}" |
| 736 | |
| 737 | def test_dry_run_with_files_schema_complete(self, tmp_path: pathlib.Path) -> None: |
| 738 | _init_repo(tmp_path) |
| 739 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 740 | (tmp_path / "untracked.txt").write_text("x") |
| 741 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 742 | data = _parse_json(result) |
| 743 | missing = self._FULL_KEYS - data.keys() |
| 744 | assert not missing, f"Missing keys in dry-run JSON: {missing}" |
| 745 | |
| 746 | def test_force_clean_schema_complete(self, tmp_path: pathlib.Path) -> None: |
| 747 | _init_repo(tmp_path) |
| 748 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 749 | (tmp_path / "untracked.txt").write_text("x") |
| 750 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 751 | data = _parse_json(result) |
| 752 | missing = self._FULL_KEYS - data.keys() |
| 753 | assert not missing, f"Missing keys in force JSON: {missing}" |
| 754 | |
| 755 | |
| 756 | # --------------------------------------------------------------------------- |
| 757 | # Flag registration tests |
| 758 | # --------------------------------------------------------------------------- |
| 759 | |
| 760 | import argparse as _argparse |
| 761 | from muse.cli.commands.clean import register as _register_clean |
| 762 | from muse.core.paths import head_path, heads_dir, muse_dir |
| 763 | |
| 764 | |
| 765 | def _parse_clean(*args: str) -> _argparse.Namespace: |
| 766 | root_p = _argparse.ArgumentParser() |
| 767 | subs = root_p.add_subparsers(dest="cmd") |
| 768 | _register_clean(subs) |
| 769 | return root_p.parse_args(["clean", *args]) |
| 770 | |
| 771 | |
| 772 | class TestRegisterFlags: |
| 773 | def test_default_json_out_is_false(self) -> None: |
| 774 | ns = _parse_clean() |
| 775 | assert ns.json_out is False |
| 776 | |
| 777 | def test_json_flag_sets_json_out(self) -> None: |
| 778 | ns = _parse_clean("--json") |
| 779 | assert ns.json_out is True |
| 780 | |
| 781 | def test_j_shorthand_sets_json_out(self) -> None: |
| 782 | ns = _parse_clean("-j") |
| 783 | assert ns.json_out is True |
| 784 | |
| 785 | def test_force_flag(self) -> None: |
| 786 | ns = _parse_clean("--force") |
| 787 | assert ns.force is True |
| 788 | |
| 789 | def test_dry_run_n_shorthand(self) -> None: |
| 790 | ns = _parse_clean("-n") |
| 791 | assert ns.dry_run is True |
File History
2 commits
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago