test_cmd_clean_hardening.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
| 1 | """Hardening test suite for ``muse clean``. |
| 2 | |
| 3 | Coverage: |
| 4 | - Unit: _is_ignored, _safe_to_delete, _safe_to_rmdir helpers |
| 5 | - Security: path-traversal guard, .muse/ protection, symlink skipping |
| 6 | - Error routing: all user errors go to stderr |
| 7 | - JSON schema: _CleanResultJson shape for all outcomes |
| 8 | - --dry-run: no side effects with and without --json |
| 9 | - --include-ignored: respects .museignore patterns |
| 10 | - --directories: empty-dir removal, .muse/ immune |
| 11 | - Integration: clean lifecycle (commit → add untracked → clean) |
| 12 | - E2E: help output, combined flags |
| 13 | - Stress: 1 000 untracked files, concurrent reads, 50-pattern ignore list |
| 14 | """ |
| 15 | |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import datetime |
| 19 | import json |
| 20 | import os |
| 21 | import pathlib |
| 22 | import threading |
| 23 | from unittest.mock import patch |
| 24 | |
| 25 | import pytest |
| 26 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 27 | from typing import TypedDict |
| 28 | |
| 29 | from muse.cli.commands.clean import _is_ignored, _safe_to_delete, _safe_to_rmdir |
| 30 | from muse.core.object_store import write_object |
| 31 | from muse.core.ids import hash_commit, hash_snapshot |
| 32 | from muse.core.commits import ( |
| 33 | CommitRecord, |
| 34 | write_commit, |
| 35 | ) |
| 36 | from muse.core.snapshots import ( |
| 37 | SnapshotRecord, |
| 38 | write_snapshot, |
| 39 | ) |
| 40 | from muse.core.types import Manifest, blob_id |
| 41 | |
| 42 | runner = CliRunner() |
| 43 | |
| 44 | |
| 45 | # --------------------------------------------------------------------------- |
| 46 | # Typed output shape (mirrors _CleanResultJson in clean.py) |
| 47 | # --------------------------------------------------------------------------- |
| 48 | |
| 49 | |
| 50 | class _CleanOut(TypedDict, total=False): |
| 51 | status: str |
| 52 | removed: list[str] |
| 53 | dirs_removed: list[str] |
| 54 | count: int |
| 55 | dry_run: bool |
| 56 | duration_ms: float |
| 57 | exit_code: int |
| 58 | |
| 59 | |
| 60 | # --------------------------------------------------------------------------- |
| 61 | # Helpers |
| 62 | # --------------------------------------------------------------------------- |
| 63 | |
| 64 | |
| 65 | def _init_repo(path: pathlib.Path, *, domain: str = "midi") -> pathlib.Path: |
| 66 | muse = muse_dir(path) |
| 67 | for sub in ("commits", "snapshots", "objects", "refs/heads"): |
| 68 | (muse / sub).mkdir(parents=True, exist_ok=True) |
| 69 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 70 | (muse / "repo.json").write_text( |
| 71 | json.dumps({"repo_id": "clean-hard-test", "domain": domain}), |
| 72 | encoding="utf-8", |
| 73 | ) |
| 74 | return path |
| 75 | |
| 76 | |
| 77 | def _commit_file(root: pathlib.Path, rel_path: str, content: bytes) -> str: |
| 78 | """Write *content* to *rel_path*, store the object and commit it.""" |
| 79 | obj_id = blob_id(content) |
| 80 | write_object(root, obj_id, content) |
| 81 | (root / rel_path).write_bytes(content) |
| 82 | manifest = {rel_path: obj_id} |
| 83 | snap_id = hash_snapshot(manifest) |
| 84 | snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest) |
| 85 | write_snapshot(root, snap) |
| 86 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 87 | commit_id = hash_commit( parent_ids=[], |
| 88 | snapshot_id=snap_id, |
| 89 | message="initial", |
| 90 | committed_at_iso=committed_at.isoformat(), |
| 91 | ) |
| 92 | write_commit( |
| 93 | root, |
| 94 | CommitRecord( |
| 95 | commit_id=commit_id, |
| 96 | branch="main", |
| 97 | snapshot_id=snap_id, |
| 98 | message="initial", |
| 99 | committed_at=committed_at, |
| 100 | ), |
| 101 | ) |
| 102 | (heads_dir(root) / "main").write_text( |
| 103 | commit_id, encoding="utf-8" |
| 104 | ) |
| 105 | return commit_id |
| 106 | |
| 107 | |
| 108 | def _env(repo: pathlib.Path) -> Manifest: |
| 109 | return {"MUSE_REPO_ROOT": str(repo)} |
| 110 | |
| 111 | |
| 112 | def _invoke(args: list[str], env: Manifest) -> InvokeResult: |
| 113 | return runner.invoke(None, args, env=env) |
| 114 | |
| 115 | |
| 116 | def _parse_json(result: InvokeResult) -> _CleanOut: |
| 117 | for line in result.output.splitlines(): |
| 118 | line = line.strip() |
| 119 | if line.startswith("{"): |
| 120 | raw = json.loads(line) |
| 121 | out = _CleanOut( |
| 122 | status=raw["status"], |
| 123 | removed=raw["removed"], |
| 124 | dirs_removed=raw["dirs_removed"], |
| 125 | count=raw["count"], |
| 126 | dry_run=raw["dry_run"], |
| 127 | ) |
| 128 | if "duration_ms" in raw: |
| 129 | out["duration_ms"] = raw["duration_ms"] |
| 130 | if "exit_code" in raw: |
| 131 | out["exit_code"] = raw["exit_code"] |
| 132 | return out |
| 133 | raise AssertionError(f"No JSON line found in output:\n{result.output}") |
| 134 | |
| 135 | |
| 136 | # --------------------------------------------------------------------------- |
| 137 | # Unit: _is_ignored |
| 138 | # --------------------------------------------------------------------------- |
| 139 | |
| 140 | |
| 141 | def test_is_ignored_exact_match() -> None: |
| 142 | assert _is_ignored("build/out.o", ["build/*"]) is True |
| 143 | |
| 144 | |
| 145 | def test_is_ignored_basename_match() -> None: |
| 146 | assert _is_ignored("deep/nested/file.pyc", ["*.pyc"]) is True |
| 147 | |
| 148 | |
| 149 | def test_is_ignored_no_match() -> None: |
| 150 | assert _is_ignored("src/main.py", ["*.pyc", "build/*"]) is False |
| 151 | |
| 152 | |
| 153 | def test_is_ignored_negation_unignores() -> None: |
| 154 | # First pattern ignores all .log, second un-ignores keep.log. |
| 155 | assert _is_ignored("keep.log", ["*.log", "!keep.log"]) is False |
| 156 | |
| 157 | |
| 158 | def test_is_ignored_negation_last_match_wins() -> None: |
| 159 | # !keep.log then *.log — last match re-ignores. |
| 160 | assert _is_ignored("keep.log", ["!keep.log", "*.log"]) is True |
| 161 | |
| 162 | |
| 163 | def test_is_ignored_empty_patterns() -> None: |
| 164 | assert _is_ignored("anything.txt", []) is False |
| 165 | |
| 166 | |
| 167 | def test_is_ignored_deep_path() -> None: |
| 168 | assert _is_ignored("a/b/c/d.tmp", ["*.tmp"]) is True |
| 169 | |
| 170 | |
| 171 | # --------------------------------------------------------------------------- |
| 172 | # Unit: _safe_to_delete |
| 173 | # --------------------------------------------------------------------------- |
| 174 | |
| 175 | |
| 176 | def test_safe_to_delete_normal_file(tmp_path: pathlib.Path) -> None: |
| 177 | _init_repo(tmp_path) |
| 178 | target = tmp_path / "file.txt" |
| 179 | target.write_text("x", encoding="utf-8") |
| 180 | assert _safe_to_delete(tmp_path, target) is True |
| 181 | |
| 182 | |
| 183 | def test_safe_to_delete_blocks_muse_dir(tmp_path: pathlib.Path) -> None: |
| 184 | _init_repo(tmp_path) |
| 185 | target = head_path(tmp_path) |
| 186 | assert _safe_to_delete(tmp_path, target) is False |
| 187 | |
| 188 | |
| 189 | def test_safe_to_delete_blocks_deep_muse(tmp_path: pathlib.Path) -> None: |
| 190 | _init_repo(tmp_path) |
| 191 | target = heads_dir(tmp_path) / "main" |
| 192 | assert _safe_to_delete(tmp_path, target) is False |
| 193 | |
| 194 | |
| 195 | # --------------------------------------------------------------------------- |
| 196 | # Unit: _safe_to_rmdir |
| 197 | # --------------------------------------------------------------------------- |
| 198 | |
| 199 | |
| 200 | def test_safe_to_rmdir_normal_dir(tmp_path: pathlib.Path) -> None: |
| 201 | _init_repo(tmp_path) |
| 202 | d = tmp_path / "empty_dir" |
| 203 | d.mkdir() |
| 204 | assert _safe_to_rmdir(tmp_path, d) is True |
| 205 | |
| 206 | |
| 207 | def test_safe_to_rmdir_blocks_root(tmp_path: pathlib.Path) -> None: |
| 208 | _init_repo(tmp_path) |
| 209 | assert _safe_to_rmdir(tmp_path, tmp_path) is False |
| 210 | |
| 211 | |
| 212 | def test_safe_to_rmdir_blocks_muse(tmp_path: pathlib.Path) -> None: |
| 213 | _init_repo(tmp_path) |
| 214 | assert _safe_to_rmdir(tmp_path, muse_dir(tmp_path)) is False |
| 215 | |
| 216 | |
| 217 | def test_safe_to_rmdir_blocks_muse_subtree(tmp_path: pathlib.Path) -> None: |
| 218 | _init_repo(tmp_path) |
| 219 | assert _safe_to_rmdir(tmp_path, muse_dir(tmp_path) / "refs") is False |
| 220 | |
| 221 | |
| 222 | # --------------------------------------------------------------------------- |
| 223 | # Security: path traversal guard |
| 224 | # --------------------------------------------------------------------------- |
| 225 | |
| 226 | |
| 227 | def test_path_traversal_skipped(tmp_path: pathlib.Path) -> None: |
| 228 | """walk_workdir returning a path that resolves outside root is skipped.""" |
| 229 | _init_repo(tmp_path) |
| 230 | outside = tmp_path.parent / "outside_target.txt" |
| 231 | outside.write_text("secret", encoding="utf-8") |
| 232 | |
| 233 | fake_workdir: Manifest = {"../outside_target.txt": "deadbeef"} |
| 234 | |
| 235 | with patch("muse.cli.commands.clean.walk_workdir", return_value=fake_workdir): |
| 236 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 237 | |
| 238 | # Exit 0 — skipped file is not treated as an error. |
| 239 | assert result.exit_code == 0 |
| 240 | # The outside file must still exist. |
| 241 | assert outside.exists() |
| 242 | |
| 243 | |
| 244 | def test_muse_dir_protected_even_if_listed(tmp_path: pathlib.Path) -> None: |
| 245 | """Even if walk_workdir incorrectly lists .muse/HEAD, it must not be deleted.""" |
| 246 | _init_repo(tmp_path) |
| 247 | fake_workdir: Manifest = {".muse/HEAD": "deadbeef"} |
| 248 | |
| 249 | with patch("muse.cli.commands.clean.walk_workdir", return_value=fake_workdir): |
| 250 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 251 | |
| 252 | assert (head_path(tmp_path)).exists() |
| 253 | |
| 254 | |
| 255 | # --------------------------------------------------------------------------- |
| 256 | # Error routing: all user errors go to stderr |
| 257 | # --------------------------------------------------------------------------- |
| 258 | |
| 259 | |
| 260 | def test_no_flags_error_on_stderr(tmp_path: pathlib.Path) -> None: |
| 261 | _init_repo(tmp_path) |
| 262 | (tmp_path / "junk.txt").write_text("junk", encoding="utf-8") |
| 263 | result = _invoke(["clean"], _env(tmp_path)) |
| 264 | assert result.exit_code != 0 |
| 265 | assert "force" in result.stderr.lower() or "force" in result.output.lower() |
| 266 | |
| 267 | |
| 268 | def test_ignore_load_failure_logs_warning_not_crash(tmp_path: pathlib.Path) -> None: |
| 269 | """OSError from load_ignore_config must not abort the command.""" |
| 270 | _init_repo(tmp_path) |
| 271 | (tmp_path / "junk.txt").write_text("junk", encoding="utf-8") |
| 272 | |
| 273 | with patch( |
| 274 | "muse.cli.commands.clean.load_ignore_config", |
| 275 | side_effect=OSError("disk full"), |
| 276 | ): |
| 277 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 278 | |
| 279 | assert result.exit_code == 0 |
| 280 | assert "junk.txt" in result.output |
| 281 | |
| 282 | |
| 283 | # --------------------------------------------------------------------------- |
| 284 | # JSON schema: _CleanResultJson |
| 285 | # --------------------------------------------------------------------------- |
| 286 | |
| 287 | |
| 288 | def test_json_nothing_to_clean(tmp_path: pathlib.Path) -> None: |
| 289 | _init_repo(tmp_path) |
| 290 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 291 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 292 | assert result.exit_code == 0 |
| 293 | data = _parse_json(result) |
| 294 | assert data["status"] == "clean" |
| 295 | assert data["removed"] == [] |
| 296 | assert data["dirs_removed"] == [] |
| 297 | assert data["count"] == 0 |
| 298 | assert data["dry_run"] is False |
| 299 | |
| 300 | |
| 301 | def test_json_dry_run_shows_files(tmp_path: pathlib.Path) -> None: |
| 302 | _init_repo(tmp_path) |
| 303 | (tmp_path / "ghost.txt").write_text("ghost", encoding="utf-8") |
| 304 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 305 | assert result.exit_code == 0 |
| 306 | data = _parse_json(result) |
| 307 | assert data["status"] == "would_remove" |
| 308 | assert "ghost.txt" in data["removed"] |
| 309 | assert data["count"] == 1 |
| 310 | assert data["dry_run"] is True |
| 311 | assert (tmp_path / "ghost.txt").exists() # not deleted |
| 312 | |
| 313 | |
| 314 | def test_json_removed_files(tmp_path: pathlib.Path) -> None: |
| 315 | _init_repo(tmp_path) |
| 316 | _commit_file(tmp_path, "kept.txt", b"kept") |
| 317 | (tmp_path / "remove_me.txt").write_text("bye", encoding="utf-8") |
| 318 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 319 | assert result.exit_code == 0 |
| 320 | data = _parse_json(result) |
| 321 | assert data["status"] == "removed" |
| 322 | assert "remove_me.txt" in data["removed"] |
| 323 | assert data["count"] == 1 |
| 324 | assert data["dry_run"] is False |
| 325 | |
| 326 | |
| 327 | def test_json_dirs_removed(tmp_path: pathlib.Path) -> None: |
| 328 | _init_repo(tmp_path) |
| 329 | _commit_file(tmp_path, "kept.txt", b"kept") |
| 330 | d = tmp_path / "empty_subdir" |
| 331 | d.mkdir() |
| 332 | (d / "junk.txt").write_text("junk", encoding="utf-8") |
| 333 | result = _invoke(["clean", "-f", "-d", "--json"], _env(tmp_path)) |
| 334 | assert result.exit_code == 0 |
| 335 | data = _parse_json(result) |
| 336 | assert "empty_subdir/junk.txt" in data["removed"] |
| 337 | assert "empty_subdir" in data["dirs_removed"] |
| 338 | |
| 339 | |
| 340 | def test_json_schema_fields_present(tmp_path: pathlib.Path) -> None: |
| 341 | _init_repo(tmp_path) |
| 342 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 343 | assert result.exit_code == 0 |
| 344 | data = _parse_json(result) |
| 345 | for key in ("status", "removed", "dirs_removed", "count", "dry_run"): |
| 346 | assert key in data, f"Missing key: {key}" |
| 347 | |
| 348 | |
| 349 | # --------------------------------------------------------------------------- |
| 350 | # --dry-run: no side effects |
| 351 | # --------------------------------------------------------------------------- |
| 352 | |
| 353 | |
| 354 | def test_dry_run_no_deletion(tmp_path: pathlib.Path) -> None: |
| 355 | _init_repo(tmp_path) |
| 356 | (tmp_path / "ephemeral.txt").write_text("keep me", encoding="utf-8") |
| 357 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 358 | assert result.exit_code == 0 |
| 359 | assert (tmp_path / "ephemeral.txt").exists() |
| 360 | |
| 361 | |
| 362 | def test_dry_run_shows_count(tmp_path: pathlib.Path) -> None: |
| 363 | _init_repo(tmp_path) |
| 364 | for i in range(5): |
| 365 | (tmp_path / f"file_{i}.txt").write_text(str(i), encoding="utf-8") |
| 366 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 367 | assert result.exit_code == 0 |
| 368 | assert "5" in result.output |
| 369 | |
| 370 | |
| 371 | def test_dry_run_json_reports_all(tmp_path: pathlib.Path) -> None: |
| 372 | _init_repo(tmp_path) |
| 373 | for i in range(3): |
| 374 | (tmp_path / f"tmp_{i}.txt").write_text(str(i), encoding="utf-8") |
| 375 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 376 | assert result.exit_code == 0 |
| 377 | data = _parse_json(result) |
| 378 | assert data["count"] == 3 |
| 379 | assert len(data["removed"]) == 3 |
| 380 | assert data["dry_run"] is True |
| 381 | |
| 382 | |
| 383 | # --------------------------------------------------------------------------- |
| 384 | # --include-ignored: respects and overrides .museignore |
| 385 | # --------------------------------------------------------------------------- |
| 386 | |
| 387 | |
| 388 | def test_include_ignored_deletes_ignored_files(tmp_path: pathlib.Path) -> None: |
| 389 | _init_repo(tmp_path) |
| 390 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 391 | (tmp_path / "debug.log").write_text("log", encoding="utf-8") |
| 392 | |
| 393 | fake_patterns = ["*.log"] |
| 394 | with patch("muse.cli.commands.clean.resolve_patterns", return_value=fake_patterns): |
| 395 | # Without -x, the .log file is excluded from cleaning. |
| 396 | result_no_x = _invoke(["clean", "-n"], _env(tmp_path)) |
| 397 | assert "debug.log" not in result_no_x.output |
| 398 | |
| 399 | # With -x, the file is included. |
| 400 | result_x = _invoke(["clean", "-n", "-x"], _env(tmp_path)) |
| 401 | assert "debug.log" in result_x.output |
| 402 | |
| 403 | |
| 404 | # --------------------------------------------------------------------------- |
| 405 | # --directories: empty-dir removal |
| 406 | # --------------------------------------------------------------------------- |
| 407 | |
| 408 | |
| 409 | def test_directories_removes_empty_dir_after_file_deletion( |
| 410 | tmp_path: pathlib.Path, |
| 411 | ) -> None: |
| 412 | _init_repo(tmp_path) |
| 413 | _commit_file(tmp_path, "kept.txt", b"kept") |
| 414 | subdir = tmp_path / "subdir" |
| 415 | subdir.mkdir() |
| 416 | (subdir / "junk.txt").write_text("junk", encoding="utf-8") |
| 417 | |
| 418 | result = _invoke(["clean", "-f", "-d"], _env(tmp_path)) |
| 419 | assert result.exit_code == 0 |
| 420 | assert not subdir.exists() |
| 421 | |
| 422 | |
| 423 | def test_directories_leaves_non_empty_dir(tmp_path: pathlib.Path) -> None: |
| 424 | _init_repo(tmp_path) |
| 425 | subdir = tmp_path / "mixed" |
| 426 | subdir.mkdir() |
| 427 | (subdir / "untracked.txt").write_text("bye", encoding="utf-8") |
| 428 | (subdir / "kept.txt").write_bytes(b"keep me") |
| 429 | _commit_file(tmp_path, "mixed/kept.txt", b"keep me") |
| 430 | |
| 431 | result = _invoke(["clean", "-f", "-d"], _env(tmp_path)) |
| 432 | assert result.exit_code == 0 |
| 433 | # Directory still exists (kept.txt is inside it and tracked). |
| 434 | assert subdir.is_dir() |
| 435 | |
| 436 | |
| 437 | def test_directories_dry_run_does_not_remove_dir(tmp_path: pathlib.Path) -> None: |
| 438 | _init_repo(tmp_path) |
| 439 | subdir = tmp_path / "dry_subdir" |
| 440 | subdir.mkdir() |
| 441 | (subdir / "junk.txt").write_text("junk", encoding="utf-8") |
| 442 | |
| 443 | result = _invoke(["clean", "-n", "-d"], _env(tmp_path)) |
| 444 | assert result.exit_code == 0 |
| 445 | assert subdir.is_dir() |
| 446 | |
| 447 | |
| 448 | # --------------------------------------------------------------------------- |
| 449 | # Integration: full lifecycle |
| 450 | # --------------------------------------------------------------------------- |
| 451 | |
| 452 | |
| 453 | def test_integration_commit_then_clean(tmp_path: pathlib.Path) -> None: |
| 454 | _init_repo(tmp_path) |
| 455 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 456 | (tmp_path / "untracked.txt").write_text("bye", encoding="utf-8") |
| 457 | |
| 458 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 459 | assert result.exit_code == 0 |
| 460 | assert not (tmp_path / "untracked.txt").exists() |
| 461 | assert (tmp_path / "tracked.txt").exists() |
| 462 | |
| 463 | |
| 464 | def test_integration_already_clean(tmp_path: pathlib.Path) -> None: |
| 465 | _init_repo(tmp_path) |
| 466 | _commit_file(tmp_path, "everything.txt", b"all tracked") |
| 467 | |
| 468 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 469 | assert result.exit_code == 0 |
| 470 | assert "nothing" in result.output.lower() |
| 471 | |
| 472 | |
| 473 | def test_integration_no_commits_cleans_all(tmp_path: pathlib.Path) -> None: |
| 474 | """With no HEAD commit every file is untracked.""" |
| 475 | _init_repo(tmp_path) |
| 476 | (tmp_path / "orphan.txt").write_text("orphan", encoding="utf-8") |
| 477 | |
| 478 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 479 | assert result.exit_code == 0 |
| 480 | assert not (tmp_path / "orphan.txt").exists() |
| 481 | |
| 482 | |
| 483 | def test_integration_json_full_cycle(tmp_path: pathlib.Path) -> None: |
| 484 | _init_repo(tmp_path) |
| 485 | _commit_file(tmp_path, "a.txt", b"a") |
| 486 | (tmp_path / "b.txt").write_text("b", encoding="utf-8") |
| 487 | (tmp_path / "c.txt").write_text("c", encoding="utf-8") |
| 488 | |
| 489 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 490 | assert result.exit_code == 0 |
| 491 | data = _parse_json(result) |
| 492 | assert data["count"] == 2 |
| 493 | assert set(data["removed"]) == {"b.txt", "c.txt"} |
| 494 | assert not (tmp_path / "b.txt").exists() |
| 495 | assert not (tmp_path / "c.txt").exists() |
| 496 | assert (tmp_path / "a.txt").exists() |
| 497 | |
| 498 | |
| 499 | # --------------------------------------------------------------------------- |
| 500 | # E2E: help output |
| 501 | # --------------------------------------------------------------------------- |
| 502 | |
| 503 | |
| 504 | def test_help_output() -> None: |
| 505 | result = _invoke(["clean", "--help"], {}) |
| 506 | assert result.exit_code == 0 |
| 507 | for flag in ("-f", "--force", "-n", "--dry-run", "--json"): |
| 508 | assert flag in result.output |
| 509 | |
| 510 | |
| 511 | def test_help_describes_json_flag() -> None: |
| 512 | result = _invoke(["clean", "--help"], {}) |
| 513 | assert "json" in result.output.lower() |
| 514 | |
| 515 | |
| 516 | # --------------------------------------------------------------------------- |
| 517 | # Stress: 1 000 untracked files |
| 518 | # --------------------------------------------------------------------------- |
| 519 | |
| 520 | |
| 521 | def test_stress_1000_untracked(tmp_path: pathlib.Path) -> None: |
| 522 | _init_repo(tmp_path) |
| 523 | for i in range(1_000): |
| 524 | (tmp_path / f"stress_{i:04d}.dat").write_bytes(b"x" * 64) |
| 525 | |
| 526 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 527 | assert result.exit_code == 0 |
| 528 | data = _parse_json(result) |
| 529 | assert data["count"] == 1_000 |
| 530 | remaining = list(tmp_path.glob("stress_*.dat")) |
| 531 | assert len(remaining) == 0 |
| 532 | |
| 533 | |
| 534 | def test_stress_1000_dry_run(tmp_path: pathlib.Path) -> None: |
| 535 | _init_repo(tmp_path) |
| 536 | for i in range(1_000): |
| 537 | (tmp_path / f"dry_{i:04d}.dat").write_bytes(b"y" * 64) |
| 538 | |
| 539 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 540 | assert result.exit_code == 0 |
| 541 | data = _parse_json(result) |
| 542 | assert data["count"] == 1_000 |
| 543 | assert data["dry_run"] is True |
| 544 | # Nothing deleted. |
| 545 | remaining = list(tmp_path.glob("dry_*.dat")) |
| 546 | assert len(remaining) == 1_000 |
| 547 | |
| 548 | |
| 549 | def test_stress_50_ignore_patterns(tmp_path: pathlib.Path) -> None: |
| 550 | """_is_ignored with 50 patterns must not crash and must filter correctly.""" |
| 551 | patterns = [f"*.ext{i}" for i in range(50)] |
| 552 | assert _is_ignored("file.ext25", patterns) is True |
| 553 | assert _is_ignored("file.py", patterns) is False |
| 554 | |
| 555 | |
| 556 | def test_stress_concurrent_json_reads(tmp_path: pathlib.Path) -> None: |
| 557 | """Concurrent dry-run invocations must all exit 0 without data races. |
| 558 | |
| 559 | CliRunner serialises stdout capture per invocation, so we guard each call |
| 560 | with a lock and check only the exit code and JSON parse-ability rather |
| 561 | than racing on the shared capture buffer. |
| 562 | """ |
| 563 | _init_repo(tmp_path) |
| 564 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 565 | for i in range(20): |
| 566 | (tmp_path / f"concurrent_{i}.txt").write_text(str(i), encoding="utf-8") |
| 567 | |
| 568 | invoke_lock = threading.Lock() |
| 569 | errors: list[str] = [] |
| 570 | |
| 571 | def _worker() -> None: |
| 572 | with invoke_lock: |
| 573 | r = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 574 | try: |
| 575 | assert r.exit_code == 0 |
| 576 | data = _parse_json(r) |
| 577 | assert data["count"] == 20 |
| 578 | except Exception as exc: |
| 579 | errors.append(str(exc)) |
| 580 | |
| 581 | threads = [threading.Thread(target=_worker) for _ in range(8)] |
| 582 | for t in threads: |
| 583 | t.start() |
| 584 | for t in threads: |
| 585 | t.join() |
| 586 | |
| 587 | assert errors == [], f"Concurrent read failures: {errors}" |
| 588 | |
| 589 | |
| 590 | # --------------------------------------------------------------------------- |
| 591 | # Edge cases |
| 592 | # --------------------------------------------------------------------------- |
| 593 | |
| 594 | |
| 595 | def test_force_and_dry_run_together_dry_wins(tmp_path: pathlib.Path) -> None: |
| 596 | """When both -f and -n are given, -n wins (no deletion).""" |
| 597 | _init_repo(tmp_path) |
| 598 | (tmp_path / "both_flags.txt").write_text("keep", encoding="utf-8") |
| 599 | result = _invoke(["clean", "-f", "-n"], _env(tmp_path)) |
| 600 | assert result.exit_code == 0 |
| 601 | assert (tmp_path / "both_flags.txt").exists() |
| 602 | |
| 603 | |
| 604 | def test_ansi_in_filename_sanitized(tmp_path: pathlib.Path) -> None: |
| 605 | """ANSI escape codes embedded in filenames must not leak to output.""" |
| 606 | _init_repo(tmp_path) |
| 607 | # Use a filename that contains ANSI escape chars encoded in the name. |
| 608 | malicious_name = "malicious\x1b[31mred\x1b[0m.txt" |
| 609 | try: |
| 610 | (tmp_path / malicious_name).write_text("malicious", encoding="utf-8") |
| 611 | except (OSError, ValueError): |
| 612 | pytest.skip("filesystem does not support ANSI chars in filenames") |
| 613 | |
| 614 | result = _invoke(["clean", "-n"], _env(tmp_path)) |
| 615 | assert "\x1b[31m" not in result.output |
| 616 | |
| 617 | |
| 618 | def test_clean_respects_muse_dir_immune(tmp_path: pathlib.Path) -> None: |
| 619 | """Under no circumstances should clean delete anything inside .muse/.""" |
| 620 | _init_repo(tmp_path) |
| 621 | head_before = (head_path(tmp_path)).read_text() |
| 622 | |
| 623 | with patch( |
| 624 | "muse.cli.commands.clean.walk_workdir", |
| 625 | return_value={ |
| 626 | ".muse/HEAD": "abc", |
| 627 | ".muse/repo.json": "def", |
| 628 | }, |
| 629 | ): |
| 630 | result = _invoke(["clean", "-f"], _env(tmp_path)) |
| 631 | |
| 632 | assert result.exit_code == 0 |
| 633 | assert (head_path(tmp_path)).read_text() == head_before |
| 634 | |
| 635 | |
| 636 | # --------------------------------------------------------------------------- |
| 637 | # Agent supercharge — duration_ms and exit_code in every JSON output |
| 638 | # --------------------------------------------------------------------------- |
| 639 | |
| 640 | |
| 641 | class TestElapsed: |
| 642 | """Every JSON output path must include ``duration_ms`` as a float.""" |
| 643 | |
| 644 | def test_nothing_to_clean_has_elapsed(self, tmp_path: pathlib.Path) -> None: |
| 645 | _init_repo(tmp_path) |
| 646 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 647 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 648 | assert result.exit_code == 0 |
| 649 | data = _parse_json(result) |
| 650 | assert "duration_ms" in data |
| 651 | assert isinstance(data["duration_ms"], float) |
| 652 | |
| 653 | def test_dry_run_with_files_has_elapsed(self, tmp_path: pathlib.Path) -> None: |
| 654 | _init_repo(tmp_path) |
| 655 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 656 | (tmp_path / "untracked.txt").write_text("x") |
| 657 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 658 | assert result.exit_code == 0 |
| 659 | data = _parse_json(result) |
| 660 | assert "duration_ms" in data |
| 661 | assert isinstance(data["duration_ms"], float) |
| 662 | |
| 663 | def test_force_clean_has_elapsed(self, tmp_path: pathlib.Path) -> None: |
| 664 | _init_repo(tmp_path) |
| 665 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 666 | (tmp_path / "untracked.txt").write_text("x") |
| 667 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 668 | assert result.exit_code == 0 |
| 669 | data = _parse_json(result) |
| 670 | assert "duration_ms" in data |
| 671 | assert isinstance(data["duration_ms"], float) |
| 672 | |
| 673 | |
| 674 | class TestExitCode: |
| 675 | """Every JSON output path must include ``exit_code`` mirroring process exit.""" |
| 676 | |
| 677 | def test_nothing_to_clean_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 678 | _init_repo(tmp_path) |
| 679 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 680 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 681 | data = _parse_json(result) |
| 682 | assert data["exit_code"] == 0 |
| 683 | |
| 684 | def test_dry_run_with_files_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 685 | _init_repo(tmp_path) |
| 686 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 687 | (tmp_path / "untracked.txt").write_text("x") |
| 688 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 689 | data = _parse_json(result) |
| 690 | assert data["exit_code"] == 0 |
| 691 | |
| 692 | def test_force_clean_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 693 | _init_repo(tmp_path) |
| 694 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 695 | (tmp_path / "untracked.txt").write_text("x") |
| 696 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 697 | data = _parse_json(result) |
| 698 | assert data["exit_code"] == 0 |
| 699 | |
| 700 | |
| 701 | class TestDryRunStatus: |
| 702 | """Dry-run with files to remove must report status ``would_remove``, not ``clean``.""" |
| 703 | |
| 704 | def test_dry_run_with_files_status_is_would_remove(self, tmp_path: pathlib.Path) -> None: |
| 705 | _init_repo(tmp_path) |
| 706 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 707 | (tmp_path / "untracked.txt").write_text("x") |
| 708 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 709 | data = _parse_json(result) |
| 710 | assert data["status"] == "would_remove", ( |
| 711 | f"dry-run with files should be 'would_remove', got {data['status']!r}" |
| 712 | ) |
| 713 | |
| 714 | def test_dry_run_no_files_status_is_clean(self, tmp_path: pathlib.Path) -> None: |
| 715 | _init_repo(tmp_path) |
| 716 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 717 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 718 | data = _parse_json(result) |
| 719 | assert data["status"] == "clean" |
| 720 | |
| 721 | def test_force_with_files_status_is_removed(self, tmp_path: pathlib.Path) -> None: |
| 722 | _init_repo(tmp_path) |
| 723 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 724 | (tmp_path / "untracked.txt").write_text("x") |
| 725 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 726 | data = _parse_json(result) |
| 727 | assert data["status"] == "removed" |
| 728 | |
| 729 | |
| 730 | class TestJsonSchemaComplete: |
| 731 | """Full schema must include all fields including duration_ms and exit_code.""" |
| 732 | |
| 733 | _FULL_KEYS = {"status", "removed", "dirs_removed", "count", "dry_run", |
| 734 | "duration_ms", "exit_code"} |
| 735 | |
| 736 | def test_nothing_to_clean_schema_complete(self, tmp_path: pathlib.Path) -> None: |
| 737 | _init_repo(tmp_path) |
| 738 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 739 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 740 | data = _parse_json(result) |
| 741 | missing = self._FULL_KEYS - data.keys() |
| 742 | assert not missing, f"Missing keys in clean JSON: {missing}" |
| 743 | |
| 744 | def test_dry_run_with_files_schema_complete(self, tmp_path: pathlib.Path) -> None: |
| 745 | _init_repo(tmp_path) |
| 746 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 747 | (tmp_path / "untracked.txt").write_text("x") |
| 748 | result = _invoke(["clean", "-n", "--json"], _env(tmp_path)) |
| 749 | data = _parse_json(result) |
| 750 | missing = self._FULL_KEYS - data.keys() |
| 751 | assert not missing, f"Missing keys in dry-run JSON: {missing}" |
| 752 | |
| 753 | def test_force_clean_schema_complete(self, tmp_path: pathlib.Path) -> None: |
| 754 | _init_repo(tmp_path) |
| 755 | _commit_file(tmp_path, "tracked.txt", b"tracked") |
| 756 | (tmp_path / "untracked.txt").write_text("x") |
| 757 | result = _invoke(["clean", "-f", "--json"], _env(tmp_path)) |
| 758 | data = _parse_json(result) |
| 759 | missing = self._FULL_KEYS - data.keys() |
| 760 | assert not missing, f"Missing keys in force JSON: {missing}" |
| 761 | |
| 762 | |
| 763 | # --------------------------------------------------------------------------- |
| 764 | # Flag registration tests |
| 765 | # --------------------------------------------------------------------------- |
| 766 | |
| 767 | import argparse as _argparse |
| 768 | from muse.cli.commands.clean import register as _register_clean |
| 769 | from muse.core.paths import head_path, heads_dir, muse_dir |
| 770 | |
| 771 | |
| 772 | def _parse_clean(*args: str) -> _argparse.Namespace: |
| 773 | root_p = _argparse.ArgumentParser() |
| 774 | subs = root_p.add_subparsers(dest="cmd") |
| 775 | _register_clean(subs) |
| 776 | return root_p.parse_args(["clean", *args]) |
| 777 | |
| 778 | |
| 779 | class TestRegisterFlags: |
| 780 | def test_default_json_out_is_false(self) -> None: |
| 781 | ns = _parse_clean() |
| 782 | assert ns.json_out is False |
| 783 | |
| 784 | def test_json_flag_sets_json_out(self) -> None: |
| 785 | ns = _parse_clean("--json") |
| 786 | assert ns.json_out is True |
| 787 | |
| 788 | def test_j_shorthand_sets_json_out(self) -> None: |
| 789 | ns = _parse_clean("-j") |
| 790 | assert ns.json_out is True |
| 791 | |
| 792 | def test_force_flag(self) -> None: |
| 793 | ns = _parse_clean("--force") |
| 794 | assert ns.force is True |
| 795 | |
| 796 | def test_dry_run_n_shorthand(self) -> None: |
| 797 | ns = _parse_clean("-n") |
| 798 | assert ns.dry_run is True |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago