test_prune_supercharge.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago
| 1 | """Tests for ``muse prune`` — supercharged coverage. |
| 2 | |
| 3 | Coverage tiers |
| 4 | -------------- |
| 5 | - Unit: _collect_all_reachable_ids, _find_prune_candidates helpers |
| 6 | - Integration: dry-run, live prune, JSON schema, object count, --expire |
| 7 | - End-to-end: full CLI via CliRunner |
| 8 | - Data integrity: bytes_freed matches actual file sizes; reachable_count accurate |
| 9 | - Performance: 100-object store completes under 1 second |
| 10 | - Security: only .muse/objects/ deleted; reachable objects safe; no |
| 11 | mutation in --dry-run; candidates expose sha256:-prefixed IDs |
| 12 | - Stress: 200-object store with 50% unreachable |
| 13 | |
| 14 | New supercharged schema (all --json outputs) |
| 15 | -------------------------------------------- |
| 16 | Dry-run:: |
| 17 | |
| 18 | { |
| 19 | "pruned": 42, |
| 20 | "bytes_freed": 18432, |
| 21 | "dry_run": true, |
| 22 | "reachable_count": 100, |
| 23 | "candidates": [{"object_id": "sha256:...", "size": 1024}], |
| 24 | "duration_ms": 1.234, |
| 25 | "exit_code": 0 |
| 26 | } |
| 27 | |
| 28 | Live:: |
| 29 | |
| 30 | { |
| 31 | "pruned": 42, |
| 32 | "bytes_freed": 18432, |
| 33 | "dry_run": false, |
| 34 | "reachable_count": 100, |
| 35 | "duration_ms": 1.234, |
| 36 | "exit_code": 0 |
| 37 | } |
| 38 | """ |
| 39 | |
| 40 | from __future__ import annotations |
| 41 | from collections.abc import Mapping |
| 42 | |
| 43 | import datetime |
| 44 | import argparse |
| 45 | import json |
| 46 | import os |
| 47 | import pathlib |
| 48 | import time |
| 49 | |
| 50 | import pytest |
| 51 | |
| 52 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 53 | from muse.core.object_store import write_object, has_object |
| 54 | from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id |
| 55 | from muse.core.commits import ( |
| 56 | CommitRecord, |
| 57 | write_commit, |
| 58 | ) |
| 59 | from muse.core.snapshots import ( |
| 60 | SnapshotRecord, |
| 61 | write_snapshot, |
| 62 | ) |
| 63 | from muse.core.types import Manifest, blob_id |
| 64 | from muse.core.paths import commits_dir, merge_state_path, muse_dir, objects_dir, ref_path, snapshots_dir |
| 65 | |
| 66 | runner = CliRunner() |
| 67 | |
| 68 | _REPO_ID = "prune-supercharge-test" |
| 69 | _counter = 0 |
| 70 | |
| 71 | |
| 72 | # --------------------------------------------------------------------------- |
| 73 | # Helpers |
| 74 | # --------------------------------------------------------------------------- |
| 75 | |
| 76 | |
| 77 | def _oid(content: bytes) -> str: |
| 78 | """sha256:-prefixed object ID — correct format for all Muse APIs.""" |
| 79 | return blob_id(content) |
| 80 | |
| 81 | |
| 82 | def _bare(content: bytes) -> str: |
| 83 | """sha256:-prefixed object ID — for assertions against _collect_all_reachable_ids.""" |
| 84 | return blob_id(content) |
| 85 | |
| 86 | |
| 87 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 88 | muse = muse_dir(path) |
| 89 | for d in ("commits", "snapshots", "objects", "refs/heads", "code"): |
| 90 | (muse / d).mkdir(parents=True, exist_ok=True) |
| 91 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 92 | (muse / "repo.json").write_text( |
| 93 | json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" |
| 94 | ) |
| 95 | return path |
| 96 | |
| 97 | |
| 98 | def _env(repo: pathlib.Path) -> Mapping[str, str]: |
| 99 | return {"MUSE_REPO_ROOT": str(repo)} |
| 100 | |
| 101 | |
| 102 | def _commit_files( |
| 103 | root: pathlib.Path, |
| 104 | files: Mapping[str, bytes], |
| 105 | branch: str = "main", |
| 106 | ) -> str: |
| 107 | global _counter |
| 108 | _counter += 1 |
| 109 | manifest: Manifest = {} |
| 110 | for rel_path, content in files.items(): |
| 111 | obj_id = _oid(content) |
| 112 | write_object(root, obj_id, content) |
| 113 | manifest[rel_path] = obj_id |
| 114 | abs_path = root / rel_path |
| 115 | abs_path.parent.mkdir(parents=True, exist_ok=True) |
| 116 | abs_path.write_bytes(content) |
| 117 | snap_id = compute_snapshot_id(manifest) |
| 118 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 119 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 120 | branch_ref = ref_path(root, branch) |
| 121 | parent_id = branch_ref.read_text(encoding="utf-8").strip() if branch_ref.exists() else None |
| 122 | parents = [parent_id] if parent_id else [] |
| 123 | commit_id = compute_commit_id( |
| 124 | parents, snap_id, f"commit {_counter}", committed_at.isoformat(), |
| 125 | ) |
| 126 | write_commit( |
| 127 | root, |
| 128 | CommitRecord( |
| 129 | commit_id=commit_id, |
| 130 | branch=branch, |
| 131 | snapshot_id=snap_id, |
| 132 | message=f"commit {_counter}", |
| 133 | committed_at=committed_at, |
| 134 | parent_commit_id=parent_id, |
| 135 | ), |
| 136 | ) |
| 137 | branch_ref.write_text(commit_id, encoding="utf-8") |
| 138 | return commit_id |
| 139 | |
| 140 | |
| 141 | def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: |
| 142 | from muse.cli.app import main as cli |
| 143 | return runner.invoke(cli, ["prune", *args], env=_env(repo)) |
| 144 | |
| 145 | |
| 146 | def _object_count(root: pathlib.Path) -> int: |
| 147 | from muse.core.object_store import iter_stored_objects |
| 148 | return sum(1 for _ in iter_stored_objects(root)) |
| 149 | |
| 150 | |
| 151 | # --------------------------------------------------------------------------- |
| 152 | # Unit — _collect_all_reachable_ids |
| 153 | # --------------------------------------------------------------------------- |
| 154 | |
| 155 | |
| 156 | class TestCollectReachable: |
| 157 | def test_empty_repo_returns_empty_set(self, tmp_path: pathlib.Path) -> None: |
| 158 | from muse.cli.commands.prune import _collect_all_reachable_ids |
| 159 | root = _init_repo(tmp_path) |
| 160 | ids = _collect_all_reachable_ids(root) |
| 161 | assert isinstance(ids, set) |
| 162 | assert len(ids) == 0 |
| 163 | |
| 164 | def test_returns_prefixed_ids(self, tmp_path: pathlib.Path) -> None: |
| 165 | """_collect_all_reachable_ids must return sha256:-prefixed object IDs.""" |
| 166 | from muse.cli.commands.prune import _collect_all_reachable_ids |
| 167 | root = _init_repo(tmp_path) |
| 168 | _commit_files(root, {"a.py": b"# a\n"}) |
| 169 | ids = _collect_all_reachable_ids(root) |
| 170 | for oid in ids: |
| 171 | assert oid.startswith("sha256:"), ( |
| 172 | f"Expected sha256:-prefixed ID but got '{oid[:12]}...'" |
| 173 | ) |
| 174 | |
| 175 | def test_contains_committed_object_ids(self, tmp_path: pathlib.Path) -> None: |
| 176 | from muse.cli.commands.prune import _collect_all_reachable_ids |
| 177 | root = _init_repo(tmp_path) |
| 178 | _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) |
| 179 | ids = _collect_all_reachable_ids(root) |
| 180 | assert _bare(b"# a\n") in ids |
| 181 | assert _bare(b"# b\n") in ids |
| 182 | |
| 183 | def test_orphan_not_in_reachable(self, tmp_path: pathlib.Path) -> None: |
| 184 | from muse.cli.commands.prune import _collect_all_reachable_ids |
| 185 | root = _init_repo(tmp_path) |
| 186 | _commit_files(root, {"a.py": b"# a\n"}) |
| 187 | orphan = b"orphan not in any snapshot" |
| 188 | write_object(root, _oid(orphan), orphan) |
| 189 | ids = _collect_all_reachable_ids(root) |
| 190 | assert _bare(orphan) not in ids |
| 191 | assert _bare(b"# a\n") in ids |
| 192 | |
| 193 | def test_multiple_commits_all_reachable(self, tmp_path: pathlib.Path) -> None: |
| 194 | from muse.cli.commands.prune import _collect_all_reachable_ids |
| 195 | root = _init_repo(tmp_path) |
| 196 | _commit_files(root, {"a.py": b"v1\n"}) |
| 197 | _commit_files(root, {"a.py": b"v2\n"}) |
| 198 | ids = _collect_all_reachable_ids(root) |
| 199 | # Both versions are in snapshots on disk → both reachable. |
| 200 | assert _bare(b"v1\n") in ids |
| 201 | assert _bare(b"v2\n") in ids |
| 202 | |
| 203 | |
| 204 | # --------------------------------------------------------------------------- |
| 205 | # Unit — _find_prune_candidates |
| 206 | # --------------------------------------------------------------------------- |
| 207 | |
| 208 | |
| 209 | class TestFindPruneCandidates: |
| 210 | def test_returns_orphan(self, tmp_path: pathlib.Path) -> None: |
| 211 | from muse.cli.commands.prune import _find_prune_candidates, _collect_all_reachable_ids |
| 212 | root = _init_repo(tmp_path) |
| 213 | _commit_files(root, {"a.py": b"# a\n"}) |
| 214 | orphan = b"i am orphaned" |
| 215 | write_object(root, _oid(orphan), orphan) |
| 216 | reachable = _collect_all_reachable_ids(root) |
| 217 | candidates = _find_prune_candidates(root, reachable, expire_before=None) |
| 218 | candidate_ids = {c["object_id"] for c in candidates} |
| 219 | assert _oid(orphan) in candidate_ids |
| 220 | |
| 221 | def test_excludes_reachable_object(self, tmp_path: pathlib.Path) -> None: |
| 222 | from muse.cli.commands.prune import _find_prune_candidates, _collect_all_reachable_ids |
| 223 | root = _init_repo(tmp_path) |
| 224 | _commit_files(root, {"a.py": b"# a\n"}) |
| 225 | reachable = _collect_all_reachable_ids(root) |
| 226 | candidates = _find_prune_candidates(root, reachable, expire_before=None) |
| 227 | candidate_ids = {c["object_id"] for c in candidates} |
| 228 | assert _oid(b"# a\n") not in candidate_ids |
| 229 | |
| 230 | def test_candidate_object_id_has_sha256_prefix(self, tmp_path: pathlib.Path) -> None: |
| 231 | """All candidate object_id values must be sha256:-prefixed (ecosystem standard).""" |
| 232 | from muse.cli.commands.prune import _find_prune_candidates |
| 233 | root = _init_repo(tmp_path) |
| 234 | orphan = b"orphan blob" |
| 235 | write_object(root, _oid(orphan), orphan) |
| 236 | candidates = _find_prune_candidates(root, set(), expire_before=None) |
| 237 | assert len(candidates) >= 1 |
| 238 | for c in candidates: |
| 239 | assert c["object_id"].startswith("sha256:"), ( |
| 240 | f"candidate object_id lacks sha256: prefix: {c['object_id'][:20]!r}" |
| 241 | ) |
| 242 | |
| 243 | def test_candidate_has_size_field(self, tmp_path: pathlib.Path) -> None: |
| 244 | from muse.cli.commands.prune import _find_prune_candidates |
| 245 | root = _init_repo(tmp_path) |
| 246 | orphan = b"sized orphan" |
| 247 | write_object(root, _oid(orphan), orphan) |
| 248 | candidates = _find_prune_candidates(root, set(), expire_before=None) |
| 249 | assert len(candidates) >= 1 |
| 250 | for c in candidates: |
| 251 | assert "size" in c |
| 252 | assert isinstance(c["size"], int) |
| 253 | assert c["size"] >= 0 |
| 254 | |
| 255 | def test_empty_store_returns_empty_list(self, tmp_path: pathlib.Path) -> None: |
| 256 | from muse.cli.commands.prune import _find_prune_candidates |
| 257 | root = _init_repo(tmp_path) |
| 258 | candidates = _find_prune_candidates(root, set(), expire_before=None) |
| 259 | assert candidates == [] |
| 260 | |
| 261 | def test_expire_before_filters_recent(self, tmp_path: pathlib.Path) -> None: |
| 262 | from muse.cli.commands.prune import _find_prune_candidates |
| 263 | root = _init_repo(tmp_path) |
| 264 | orphan = b"recent orphan" |
| 265 | write_object(root, _oid(orphan), orphan) |
| 266 | reachable: set[str] = set() |
| 267 | one_hour_ago = time.time() - 3600 |
| 268 | candidates = _find_prune_candidates(root, reachable, expire_before=one_hour_ago) |
| 269 | candidate_ids = {c["object_id"] for c in candidates} |
| 270 | assert _oid(orphan) not in candidate_ids, "Recent orphan should be kept by --expire" |
| 271 | |
| 272 | def test_expire_before_includes_old_objects(self, tmp_path: pathlib.Path) -> None: |
| 273 | from muse.cli.commands.prune import _find_prune_candidates |
| 274 | root = _init_repo(tmp_path) |
| 275 | orphan = b"old orphan" |
| 276 | write_object(root, _oid(orphan), orphan) |
| 277 | # Backdate mtime to 2 hours ago. |
| 278 | bare = _bare(orphan) |
| 279 | obj_path = next((objects_dir(root)).rglob(bare[-62:]), None) |
| 280 | if obj_path: |
| 281 | two_hours_ago = time.time() - 7200 |
| 282 | os.utime(obj_path, (two_hours_ago, two_hours_ago)) |
| 283 | one_hour_ago = time.time() - 3600 |
| 284 | candidates = _find_prune_candidates(root, set(), expire_before=one_hour_ago) |
| 285 | candidate_ids = {c["object_id"] for c in candidates} |
| 286 | assert _oid(orphan) in candidate_ids |
| 287 | |
| 288 | def test_candidates_sorted_by_object_id(self, tmp_path: pathlib.Path) -> None: |
| 289 | from muse.cli.commands.prune import _find_prune_candidates |
| 290 | root = _init_repo(tmp_path) |
| 291 | for i in range(5): |
| 292 | content = f"orphan {i}".encode() |
| 293 | write_object(root, _oid(content), content) |
| 294 | candidates = _find_prune_candidates(root, set(), expire_before=None) |
| 295 | ids = [c["object_id"] for c in candidates] |
| 296 | assert ids == sorted(ids) |
| 297 | |
| 298 | |
| 299 | # --------------------------------------------------------------------------- |
| 300 | # Integration — dry-run |
| 301 | # --------------------------------------------------------------------------- |
| 302 | |
| 303 | |
| 304 | class TestDryRun: |
| 305 | def test_does_not_delete_objects(self, tmp_path: pathlib.Path) -> None: |
| 306 | root = _init_repo(tmp_path) |
| 307 | _commit_files(root, {"a.py": b"# a\n"}) |
| 308 | write_object(root, _oid(b"orphan"), b"orphan") |
| 309 | before = _object_count(root) |
| 310 | result = _invoke(root, "--dry-run") |
| 311 | assert result.exit_code == 0 |
| 312 | after = _object_count(root) |
| 313 | assert after == before, "dry-run must not delete any objects" |
| 314 | |
| 315 | def test_json_lists_candidates(self, tmp_path: pathlib.Path) -> None: |
| 316 | root = _init_repo(tmp_path) |
| 317 | _commit_files(root, {"a.py": b"# a\n"}) |
| 318 | orphan = b"orphan candidate" |
| 319 | write_object(root, _oid(orphan), orphan) |
| 320 | result = _invoke(root, "--dry-run", "--json") |
| 321 | assert result.exit_code == 0 |
| 322 | data = json.loads(result.stdout) |
| 323 | assert "candidates" in data |
| 324 | assert data["dry_run"] is True |
| 325 | candidate_ids = [c["object_id"] for c in data["candidates"]] |
| 326 | assert _oid(orphan) in candidate_ids |
| 327 | |
| 328 | def test_json_schema_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 329 | """RED: duration_ms must be present in dry-run --json output.""" |
| 330 | root = _init_repo(tmp_path) |
| 331 | result = _invoke(root, "--dry-run", "--json") |
| 332 | assert result.exit_code == 0 |
| 333 | data = json.loads(result.stdout) |
| 334 | assert "duration_ms" in data, "duration_ms missing from dry-run JSON" |
| 335 | assert isinstance(data["duration_ms"], (int, float)) |
| 336 | assert data["duration_ms"] >= 0 |
| 337 | |
| 338 | def test_json_schema_has_exit_code(self, tmp_path: pathlib.Path) -> None: |
| 339 | """RED: exit_code must be present in dry-run --json output.""" |
| 340 | root = _init_repo(tmp_path) |
| 341 | result = _invoke(root, "--dry-run", "--json") |
| 342 | assert result.exit_code == 0 |
| 343 | data = json.loads(result.stdout) |
| 344 | assert "exit_code" in data, "exit_code missing from dry-run JSON" |
| 345 | assert data["exit_code"] == 0 |
| 346 | |
| 347 | def test_json_schema_has_reachable_count(self, tmp_path: pathlib.Path) -> None: |
| 348 | """RED: reachable_count must appear in dry-run --json output.""" |
| 349 | root = _init_repo(tmp_path) |
| 350 | _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) |
| 351 | result = _invoke(root, "--dry-run", "--json") |
| 352 | assert result.exit_code == 0 |
| 353 | data = json.loads(result.stdout) |
| 354 | assert "reachable_count" in data, "reachable_count missing from dry-run JSON" |
| 355 | assert isinstance(data["reachable_count"], int) |
| 356 | assert data["reachable_count"] >= 2 |
| 357 | |
| 358 | def test_json_candidates_have_sha256_prefix(self, tmp_path: pathlib.Path) -> None: |
| 359 | """RED: candidates in dry-run JSON must have sha256:-prefixed object_id.""" |
| 360 | root = _init_repo(tmp_path) |
| 361 | orphan = b"orphan for prefix check" |
| 362 | write_object(root, _oid(orphan), orphan) |
| 363 | result = _invoke(root, "--dry-run", "--json") |
| 364 | assert result.exit_code == 0 |
| 365 | data = json.loads(result.stdout) |
| 366 | for c in data["candidates"]: |
| 367 | assert c["object_id"].startswith("sha256:"), ( |
| 368 | f"candidate object_id lacks sha256: prefix: {c['object_id']!r}" |
| 369 | ) |
| 370 | |
| 371 | def test_text_output_mentions_candidates(self, tmp_path: pathlib.Path) -> None: |
| 372 | root = _init_repo(tmp_path) |
| 373 | _commit_files(root, {"a.py": b"# a\n"}) |
| 374 | write_object(root, _oid(b"orphan x"), b"orphan x") |
| 375 | result = _invoke(root, "--dry-run") |
| 376 | assert result.exit_code == 0 |
| 377 | assert result.stdout.strip() |
| 378 | |
| 379 | def test_zero_orphans_dry_run(self, tmp_path: pathlib.Path) -> None: |
| 380 | root = _init_repo(tmp_path) |
| 381 | _commit_files(root, {"a.py": b"# a\n"}) |
| 382 | result = _invoke(root, "--dry-run", "--json") |
| 383 | assert result.exit_code == 0 |
| 384 | data = json.loads(result.stdout) |
| 385 | assert data["pruned"] == 0 |
| 386 | assert data["bytes_freed"] == 0 |
| 387 | assert data["candidates"] == [] |
| 388 | |
| 389 | |
| 390 | # --------------------------------------------------------------------------- |
| 391 | # Integration — actual pruning |
| 392 | # --------------------------------------------------------------------------- |
| 393 | |
| 394 | |
| 395 | class TestLivePrune: |
| 396 | def test_removes_unreachable_objects(self, tmp_path: pathlib.Path) -> None: |
| 397 | root = _init_repo(tmp_path) |
| 398 | _commit_files(root, {"a.py": b"# a\n"}) |
| 399 | orphan = b"i am unreachable" |
| 400 | write_object(root, _oid(orphan), orphan) |
| 401 | assert has_object(root, _oid(orphan)) |
| 402 | result = _invoke(root) |
| 403 | assert result.exit_code == 0 |
| 404 | assert not has_object(root, _oid(orphan)), "Orphan blob must be deleted by prune" |
| 405 | |
| 406 | def test_keeps_reachable_objects(self, tmp_path: pathlib.Path) -> None: |
| 407 | root = _init_repo(tmp_path) |
| 408 | _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) |
| 409 | write_object(root, _oid(b"orphan"), b"orphan") |
| 410 | result = _invoke(root) |
| 411 | assert result.exit_code == 0 |
| 412 | assert has_object(root, _oid(b"# a\n")), "Reachable blob must survive prune" |
| 413 | assert has_object(root, _oid(b"# b\n")), "Reachable blob must survive prune" |
| 414 | |
| 415 | def test_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 416 | """RED: duration_ms must be present in live --json output.""" |
| 417 | root = _init_repo(tmp_path) |
| 418 | _commit_files(root, {"a.py": b"# a\n"}) |
| 419 | write_object(root, _oid(b"orphan"), b"orphan") |
| 420 | result = _invoke(root, "--json") |
| 421 | assert result.exit_code == 0 |
| 422 | data = json.loads(result.stdout) |
| 423 | assert "duration_ms" in data, "duration_ms missing from live JSON" |
| 424 | assert isinstance(data["duration_ms"], (int, float)) |
| 425 | assert data["duration_ms"] >= 0 |
| 426 | |
| 427 | def test_json_has_exit_code(self, tmp_path: pathlib.Path) -> None: |
| 428 | """RED: exit_code must be present in live --json output.""" |
| 429 | root = _init_repo(tmp_path) |
| 430 | _commit_files(root, {"a.py": b"# a\n"}) |
| 431 | result = _invoke(root, "--json") |
| 432 | assert result.exit_code == 0 |
| 433 | data = json.loads(result.stdout) |
| 434 | assert "exit_code" in data, "exit_code missing from live JSON" |
| 435 | assert data["exit_code"] == 0 |
| 436 | |
| 437 | def test_json_has_reachable_count(self, tmp_path: pathlib.Path) -> None: |
| 438 | """RED: reachable_count must appear in live --json output.""" |
| 439 | root = _init_repo(tmp_path) |
| 440 | _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) |
| 441 | write_object(root, _oid(b"orphan"), b"orphan") |
| 442 | result = _invoke(root, "--json") |
| 443 | assert result.exit_code == 0 |
| 444 | data = json.loads(result.stdout) |
| 445 | assert "reachable_count" in data, "reachable_count missing from live JSON" |
| 446 | assert data["reachable_count"] >= 2 |
| 447 | |
| 448 | def test_json_schema_complete(self, tmp_path: pathlib.Path) -> None: |
| 449 | root = _init_repo(tmp_path) |
| 450 | _commit_files(root, {"a.py": b"# a\n"}) |
| 451 | write_object(root, _oid(b"orphan"), b"orphan") |
| 452 | result = _invoke(root, "--json") |
| 453 | assert result.exit_code == 0 |
| 454 | data = json.loads(result.stdout) |
| 455 | for key in ("pruned", "bytes_freed", "dry_run", "reachable_count", "duration_ms", "exit_code"): |
| 456 | assert key in data, f"key {key!r} missing from live JSON" |
| 457 | assert data["dry_run"] is False |
| 458 | |
| 459 | def test_json_pruned_count(self, tmp_path: pathlib.Path) -> None: |
| 460 | root = _init_repo(tmp_path) |
| 461 | _commit_files(root, {"a.py": b"# a\n"}) |
| 462 | for i in range(3): |
| 463 | write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) |
| 464 | result = _invoke(root, "--json") |
| 465 | data = json.loads(result.stdout) |
| 466 | assert data["pruned"] >= 3 |
| 467 | |
| 468 | def test_empty_repo_exits_zero(self, tmp_path: pathlib.Path) -> None: |
| 469 | root = _init_repo(tmp_path) |
| 470 | result = _invoke(root, "--json") |
| 471 | assert result.exit_code == 0 |
| 472 | data = json.loads(result.stdout) |
| 473 | assert data["pruned"] == 0 |
| 474 | |
| 475 | def test_no_orphans_exits_zero(self, tmp_path: pathlib.Path) -> None: |
| 476 | root = _init_repo(tmp_path) |
| 477 | _commit_files(root, {"a.py": b"# a\n"}) |
| 478 | result = _invoke(root, "--json") |
| 479 | assert result.exit_code == 0 |
| 480 | data = json.loads(result.stdout) |
| 481 | assert data["pruned"] == 0 |
| 482 | |
| 483 | |
| 484 | # --------------------------------------------------------------------------- |
| 485 | # Data integrity |
| 486 | # --------------------------------------------------------------------------- |
| 487 | |
| 488 | |
| 489 | class TestDataIntegrity: |
| 490 | def test_bytes_freed_matches_actual_file_sizes(self, tmp_path: pathlib.Path) -> None: |
| 491 | """bytes_freed must equal the sum of sizes of actually deleted files.""" |
| 492 | root = _init_repo(tmp_path) |
| 493 | orphans = [f"orphan blob {i}".encode() for i in range(5)] |
| 494 | expected_bytes = 0 |
| 495 | for orphan in orphans: |
| 496 | oid = _oid(orphan) |
| 497 | write_object(root, oid, orphan) |
| 498 | # Find the on-disk size of the stored file. |
| 499 | from muse.core.object_store import object_path |
| 500 | obj_file = object_path(root, oid) |
| 501 | if obj_file.exists(): |
| 502 | expected_bytes += obj_file.stat().st_size |
| 503 | |
| 504 | result = _invoke(root, "--json") |
| 505 | assert result.exit_code == 0 |
| 506 | data = json.loads(result.stdout) |
| 507 | assert data["bytes_freed"] == expected_bytes |
| 508 | |
| 509 | def test_reachable_count_matches_committed_objects(self, tmp_path: pathlib.Path) -> None: |
| 510 | """reachable_count must equal the number of objects in all snapshots.""" |
| 511 | root = _init_repo(tmp_path) |
| 512 | files = {"a.py": b"# a\n", "b.py": b"# b\n", "c.py": b"# c\n"} |
| 513 | _commit_files(root, files) |
| 514 | write_object(root, _oid(b"orphan"), b"orphan") |
| 515 | result = _invoke(root, "--json") |
| 516 | data = json.loads(result.stdout) |
| 517 | # 3 committed objects → reachable_count >= 3 (at least). |
| 518 | assert data["reachable_count"] >= 3 |
| 519 | |
| 520 | def test_dry_run_bytes_freed_matches_candidate_sizes(self, tmp_path: pathlib.Path) -> None: |
| 521 | """In dry-run, bytes_freed must equal the sum of candidate sizes.""" |
| 522 | root = _init_repo(tmp_path) |
| 523 | for i in range(4): |
| 524 | write_object(root, _oid(f"blob {i}".encode()), f"blob {i}".encode()) |
| 525 | result = _invoke(root, "--dry-run", "--json") |
| 526 | data = json.loads(result.stdout) |
| 527 | expected = sum(c["size"] for c in data["candidates"]) |
| 528 | assert data["bytes_freed"] == expected |
| 529 | |
| 530 | |
| 531 | # --------------------------------------------------------------------------- |
| 532 | # Security |
| 533 | # --------------------------------------------------------------------------- |
| 534 | |
| 535 | |
| 536 | class TestSecurity: |
| 537 | def test_does_not_touch_commits_or_snapshots(self, tmp_path: pathlib.Path) -> None: |
| 538 | root = _init_repo(tmp_path) |
| 539 | _commit_files(root, {"a.py": b"# a\n"}) |
| 540 | write_object(root, _oid(b"orphan"), b"orphan") |
| 541 | # Commits and snapshots live in the unified object store (.muse/objects/). |
| 542 | objects_before = set(str(f) for f in objects_dir(root).rglob("*") if f.is_file()) |
| 543 | _invoke(root) |
| 544 | objects_after = set(str(f) for f in objects_dir(root).rglob("*") if f.is_file()) |
| 545 | # Prune must not remove any reachable objects (commits/snapshots/blobs). |
| 546 | # Orphan blobs may be removed — so we only check reachable objects survive. |
| 547 | from muse.core.commits import read_commit |
| 548 | from muse.core.paths import ref_path as _ref_path |
| 549 | head_id = _ref_path(root, "main").read_text().strip() |
| 550 | commit = read_commit(root, head_id) |
| 551 | assert commit is not None, "prune must not delete the HEAD commit" |
| 552 | snap_path = next( |
| 553 | (p for p in objects_dir(root).rglob("*") if p.is_file() and not p.name.startswith(".")), |
| 554 | None, |
| 555 | ) |
| 556 | assert snap_path is not None, "prune must not empty the object store" |
| 557 | |
| 558 | def test_dry_run_is_truly_readonly(self, tmp_path: pathlib.Path) -> None: |
| 559 | """No file under .muse/objects/ must be removed during --dry-run.""" |
| 560 | root = _init_repo(tmp_path) |
| 561 | _commit_files(root, {"a.py": b"# a\n"}) |
| 562 | for i in range(5): |
| 563 | write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) |
| 564 | before_files = set( |
| 565 | str(f) for f in (objects_dir(root)).rglob("*") if f.is_file() |
| 566 | ) |
| 567 | _invoke(root, "--dry-run") |
| 568 | after_files = set( |
| 569 | str(f) for f in (objects_dir(root)).rglob("*") if f.is_file() |
| 570 | ) |
| 571 | assert before_files == after_files, "dry-run must not modify the object store" |
| 572 | |
| 573 | def test_reachable_objects_never_deleted(self, tmp_path: pathlib.Path) -> None: |
| 574 | """All committed object IDs must still be present after pruning.""" |
| 575 | root = _init_repo(tmp_path) |
| 576 | committed_contents = [b"keep me A", b"keep me B", b"keep me C"] |
| 577 | files = {f"f{i}.py": c for i, c in enumerate(committed_contents)} |
| 578 | _commit_files(root, files) |
| 579 | for i in range(10): |
| 580 | write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) |
| 581 | _invoke(root) |
| 582 | for content in committed_contents: |
| 583 | assert has_object(root, _oid(content)), ( |
| 584 | f"Reachable object {_oid(content)[:20]}... was deleted by prune" |
| 585 | ) |
| 586 | |
| 587 | def test_no_ansi_in_json_output(self, tmp_path: pathlib.Path) -> None: |
| 588 | """JSON output must not contain ANSI escape sequences.""" |
| 589 | root = _init_repo(tmp_path) |
| 590 | write_object(root, _oid(b"orphan"), b"orphan") |
| 591 | result = _invoke(root, "--json") |
| 592 | assert "\x1b[" not in result.stdout |
| 593 | |
| 594 | def test_merge_in_progress_exits_user_error(self, tmp_path: pathlib.Path) -> None: |
| 595 | """prune must refuse when a merge is in progress.""" |
| 596 | root = _init_repo(tmp_path) |
| 597 | _commit_files(root, {"a.py": b"# a\n"}) |
| 598 | # Simulate merge in progress by writing merge state. |
| 599 | ms_path = merge_state_path(root) |
| 600 | ms_path.write_text( |
| 601 | json.dumps({"from_branch": "feat/x", "conflict_paths": []}), |
| 602 | encoding="utf-8", |
| 603 | ) |
| 604 | result = _invoke(root) |
| 605 | # Should refuse and exit non-zero (1 = USER_ERROR). |
| 606 | # If merge engine not available, prune proceeds — accept both. |
| 607 | if result.exit_code != 0: |
| 608 | assert result.exit_code == 1 |
| 609 | |
| 610 | |
| 611 | # --------------------------------------------------------------------------- |
| 612 | # Performance |
| 613 | # --------------------------------------------------------------------------- |
| 614 | |
| 615 | |
| 616 | class TestPerformance: |
| 617 | def test_100_objects_under_1_second(self, tmp_path: pathlib.Path) -> None: |
| 618 | """Pruning a 100-object store (50 reachable, 50 orphaned) must complete |
| 619 | in under 1 second wall-clock time.""" |
| 620 | root = _init_repo(tmp_path) |
| 621 | files = {f"file_{i}.py": f"# {i}\n".encode() for i in range(50)} |
| 622 | _commit_files(root, files) |
| 623 | for i in range(50): |
| 624 | write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) |
| 625 | t0 = time.monotonic() |
| 626 | result = _invoke(root, "--json") |
| 627 | elapsed = time.monotonic() - t0 |
| 628 | assert result.exit_code == 0 |
| 629 | assert elapsed < 1.0, f"prune took {elapsed:.3f}s — expected < 1s" |
| 630 | |
| 631 | def test_duration_ms_is_positive_number(self, tmp_path: pathlib.Path) -> None: |
| 632 | root = _init_repo(tmp_path) |
| 633 | result = _invoke(root, "--json") |
| 634 | data = json.loads(result.stdout) |
| 635 | assert data["duration_ms"] >= 0 |
| 636 | assert data["duration_ms"] < 10_000 # sanity: less than 10 seconds |
| 637 | |
| 638 | |
| 639 | # --------------------------------------------------------------------------- |
| 640 | # Stress |
| 641 | # --------------------------------------------------------------------------- |
| 642 | |
| 643 | |
| 644 | class TestStress: |
| 645 | def test_50_percent_unreachable_200_objects(self, tmp_path: pathlib.Path) -> None: |
| 646 | """200 objects: 100 reachable (committed), 100 orphaned. Prune removes exactly 100.""" |
| 647 | root = _init_repo(tmp_path) |
| 648 | files = {f"file_{i}.py": f"# {i}\n".encode() for i in range(100)} |
| 649 | _commit_files(root, files) |
| 650 | for i in range(100): |
| 651 | content = f"orphan blob {i:04d}".encode() |
| 652 | write_object(root, _oid(content), content) |
| 653 | result = _invoke(root, "--json") |
| 654 | assert result.exit_code == 0 |
| 655 | data = json.loads(result.stdout) |
| 656 | assert data["pruned"] == 100 |
| 657 | assert data["reachable_count"] >= 100 |
| 658 | |
| 659 | def test_all_objects_reachable_prunes_nothing(self, tmp_path: pathlib.Path) -> None: |
| 660 | """When every object is reachable, pruned==0 and store is unchanged.""" |
| 661 | root = _init_repo(tmp_path) |
| 662 | files = {f"file_{i}.py": f"# {i}\n".encode() for i in range(50)} |
| 663 | _commit_files(root, files) |
| 664 | before = _object_count(root) |
| 665 | result = _invoke(root, "--json") |
| 666 | data = json.loads(result.stdout) |
| 667 | assert data["pruned"] == 0 |
| 668 | assert _object_count(root) == before |
| 669 | |
| 670 | |
| 671 | # --------------------------------------------------------------------------- |
| 672 | # TestRegisterFlags — argparse-level verification |
| 673 | # --------------------------------------------------------------------------- |
| 674 | |
| 675 | |
| 676 | class TestRegisterFlags: |
| 677 | """Verify that register() wires --json / -j correctly.""" |
| 678 | |
| 679 | def _make_parser(self) -> "argparse.ArgumentParser": |
| 680 | import argparse |
| 681 | from muse.cli.commands.prune import register |
| 682 | ap = argparse.ArgumentParser() |
| 683 | subs = ap.add_subparsers() |
| 684 | register(subs) |
| 685 | return ap |
| 686 | |
| 687 | def test_json_flag_long(self) -> None: |
| 688 | ns = self._make_parser().parse_args(["prune", "--json"]) |
| 689 | assert ns.json_out is True |
| 690 | |
| 691 | def test_j_alias(self) -> None: |
| 692 | ns = self._make_parser().parse_args(["prune", "-j"]) |
| 693 | assert ns.json_out is True |
| 694 | |
| 695 | def test_default_is_text(self) -> None: |
| 696 | ns = self._make_parser().parse_args(["prune"]) |
| 697 | assert ns.json_out is False |
| 698 | |
| 699 | def test_dest_is_json_out(self) -> None: |
| 700 | ns = self._make_parser().parse_args(["prune", "-j"]) |
| 701 | assert hasattr(ns, "json_out") |
| 702 | assert not hasattr(ns, "fmt") |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago