"""Tests for ``muse prune`` — supercharged coverage. Coverage tiers -------------- - Unit: _collect_all_reachable_ids, _find_prune_candidates helpers - Integration: dry-run, live prune, JSON schema, object count, --expire - End-to-end: full CLI via CliRunner - Data integrity: bytes_freed matches actual file sizes; reachable_count accurate - Performance: 100-object store completes under 1 second - Security: only .muse/objects/ deleted; reachable objects safe; no mutation in --dry-run; candidates expose sha256:-prefixed IDs - Stress: 200-object store with 50% unreachable New supercharged schema (all --json outputs) -------------------------------------------- Dry-run:: { "pruned": 42, "bytes_freed": 18432, "dry_run": true, "reachable_count": 100, "candidates": [{"object_id": "sha256:...", "size": 1024}], "duration_ms": 1.234, "exit_code": 0 } Live:: { "pruned": 42, "bytes_freed": 18432, "dry_run": false, "reachable_count": 100, "duration_ms": 1.234, "exit_code": 0 } """ from __future__ import annotations from collections.abc import Mapping import datetime import argparse import json import os import pathlib import time import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.object_store import write_object, has_object from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot from muse.core.types import Manifest, blob_id from muse.core.paths import commits_dir, merge_state_path, muse_dir, objects_dir, ref_path, snapshots_dir runner = CliRunner() _REPO_ID = "prune-supercharge-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _oid(content: bytes) -> str: """sha256:-prefixed object ID — correct format for all Muse APIs.""" return blob_id(content) def _bare(content: bytes) -> str: """sha256:-prefixed object ID — for assertions against _collect_all_reachable_ids.""" return blob_id(content) def _init_repo(path: pathlib.Path) -> pathlib.Path: muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _commit_files( root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main", ) -> str: global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): obj_id = _oid(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) branch_ref = ref_path(root, branch) parent_id = branch_ref.read_text(encoding="utf-8").strip() if branch_ref.exists() else None parents = [parent_id] if parent_id else [] commit_id = compute_commit_id( parents, snap_id, f"commit {_counter}", committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=f"commit {_counter}", committed_at=committed_at, parent_commit_id=parent_id, ), ) branch_ref.write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["prune", *args], env=_env(repo)) def _object_count(root: pathlib.Path) -> int: from muse.core.object_store import iter_stored_objects return sum(1 for _ in iter_stored_objects(root)) # --------------------------------------------------------------------------- # Unit — _collect_all_reachable_ids # --------------------------------------------------------------------------- class TestCollectReachable: def test_empty_repo_returns_empty_set(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _collect_all_reachable_ids root = _init_repo(tmp_path) ids = _collect_all_reachable_ids(root) assert isinstance(ids, set) assert len(ids) == 0 def test_returns_prefixed_ids(self, tmp_path: pathlib.Path) -> None: """_collect_all_reachable_ids must return sha256:-prefixed object IDs.""" from muse.cli.commands.prune import _collect_all_reachable_ids root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) ids = _collect_all_reachable_ids(root) for oid in ids: assert oid.startswith("sha256:"), ( f"Expected sha256:-prefixed ID but got '{oid[:12]}...'" ) def test_contains_committed_object_ids(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _collect_all_reachable_ids root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) ids = _collect_all_reachable_ids(root) assert _bare(b"# a\n") in ids assert _bare(b"# b\n") in ids def test_orphan_not_in_reachable(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _collect_all_reachable_ids root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) orphan = b"orphan not in any snapshot" write_object(root, _oid(orphan), orphan) ids = _collect_all_reachable_ids(root) assert _bare(orphan) not in ids assert _bare(b"# a\n") in ids def test_multiple_commits_all_reachable(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _collect_all_reachable_ids root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"v1\n"}) _commit_files(root, {"a.py": b"v2\n"}) ids = _collect_all_reachable_ids(root) # Both versions are in snapshots on disk → both reachable. assert _bare(b"v1\n") in ids assert _bare(b"v2\n") in ids # --------------------------------------------------------------------------- # Unit — _find_prune_candidates # --------------------------------------------------------------------------- class TestFindPruneCandidates: def test_returns_orphan(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _find_prune_candidates, _collect_all_reachable_ids root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) orphan = b"i am orphaned" write_object(root, _oid(orphan), orphan) reachable = _collect_all_reachable_ids(root) candidates = _find_prune_candidates(root, reachable, expire_before=None) candidate_ids = {c["object_id"] for c in candidates} assert _oid(orphan) in candidate_ids def test_excludes_reachable_object(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _find_prune_candidates, _collect_all_reachable_ids root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) reachable = _collect_all_reachable_ids(root) candidates = _find_prune_candidates(root, reachable, expire_before=None) candidate_ids = {c["object_id"] for c in candidates} assert _oid(b"# a\n") not in candidate_ids def test_candidate_object_id_has_sha256_prefix(self, tmp_path: pathlib.Path) -> None: """All candidate object_id values must be sha256:-prefixed (ecosystem standard).""" from muse.cli.commands.prune import _find_prune_candidates root = _init_repo(tmp_path) orphan = b"orphan blob" write_object(root, _oid(orphan), orphan) candidates = _find_prune_candidates(root, set(), expire_before=None) assert len(candidates) >= 1 for c in candidates: assert c["object_id"].startswith("sha256:"), ( f"candidate object_id lacks sha256: prefix: {c['object_id'][:20]!r}" ) def test_candidate_has_size_field(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _find_prune_candidates root = _init_repo(tmp_path) orphan = b"sized orphan" write_object(root, _oid(orphan), orphan) candidates = _find_prune_candidates(root, set(), expire_before=None) assert len(candidates) >= 1 for c in candidates: assert "size" in c assert isinstance(c["size"], int) assert c["size"] >= 0 def test_empty_store_returns_empty_list(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _find_prune_candidates root = _init_repo(tmp_path) candidates = _find_prune_candidates(root, set(), expire_before=None) assert candidates == [] def test_expire_before_filters_recent(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _find_prune_candidates root = _init_repo(tmp_path) orphan = b"recent orphan" write_object(root, _oid(orphan), orphan) reachable: set[str] = set() one_hour_ago = time.time() - 3600 candidates = _find_prune_candidates(root, reachable, expire_before=one_hour_ago) candidate_ids = {c["object_id"] for c in candidates} assert _oid(orphan) not in candidate_ids, "Recent orphan should be kept by --expire" def test_expire_before_includes_old_objects(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _find_prune_candidates root = _init_repo(tmp_path) orphan = b"old orphan" write_object(root, _oid(orphan), orphan) # Backdate mtime to 2 hours ago. bare = _bare(orphan) obj_path = next((objects_dir(root)).rglob(bare[-62:]), None) if obj_path: two_hours_ago = time.time() - 7200 os.utime(obj_path, (two_hours_ago, two_hours_ago)) one_hour_ago = time.time() - 3600 candidates = _find_prune_candidates(root, set(), expire_before=one_hour_ago) candidate_ids = {c["object_id"] for c in candidates} assert _oid(orphan) in candidate_ids def test_candidates_sorted_by_object_id(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.prune import _find_prune_candidates root = _init_repo(tmp_path) for i in range(5): content = f"orphan {i}".encode() write_object(root, _oid(content), content) candidates = _find_prune_candidates(root, set(), expire_before=None) ids = [c["object_id"] for c in candidates] assert ids == sorted(ids) # --------------------------------------------------------------------------- # Integration — dry-run # --------------------------------------------------------------------------- class TestDryRun: def test_does_not_delete_objects(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) write_object(root, _oid(b"orphan"), b"orphan") before = _object_count(root) result = _invoke(root, "--dry-run") assert result.exit_code == 0 after = _object_count(root) assert after == before, "dry-run must not delete any objects" def test_json_lists_candidates(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) orphan = b"orphan candidate" write_object(root, _oid(orphan), orphan) result = _invoke(root, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "candidates" in data assert data["dry_run"] is True candidate_ids = [c["object_id"] for c in data["candidates"]] assert _oid(orphan) in candidate_ids def test_json_schema_has_duration_ms(self, tmp_path: pathlib.Path) -> None: """RED: duration_ms must be present in dry-run --json output.""" root = _init_repo(tmp_path) result = _invoke(root, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "duration_ms" in data, "duration_ms missing from dry-run JSON" assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_json_schema_has_exit_code(self, tmp_path: pathlib.Path) -> None: """RED: exit_code must be present in dry-run --json output.""" root = _init_repo(tmp_path) result = _invoke(root, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "exit_code" in data, "exit_code missing from dry-run JSON" assert data["exit_code"] == 0 def test_json_schema_has_reachable_count(self, tmp_path: pathlib.Path) -> None: """RED: reachable_count must appear in dry-run --json output.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) result = _invoke(root, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "reachable_count" in data, "reachable_count missing from dry-run JSON" assert isinstance(data["reachable_count"], int) assert data["reachable_count"] >= 2 def test_json_candidates_have_sha256_prefix(self, tmp_path: pathlib.Path) -> None: """RED: candidates in dry-run JSON must have sha256:-prefixed object_id.""" root = _init_repo(tmp_path) orphan = b"orphan for prefix check" write_object(root, _oid(orphan), orphan) result = _invoke(root, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) for c in data["candidates"]: assert c["object_id"].startswith("sha256:"), ( f"candidate object_id lacks sha256: prefix: {c['object_id']!r}" ) def test_text_output_mentions_candidates(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) write_object(root, _oid(b"orphan x"), b"orphan x") result = _invoke(root, "--dry-run") assert result.exit_code == 0 assert result.stdout.strip() def test_zero_orphans_dry_run(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["pruned"] == 0 assert data["bytes_freed"] == 0 assert data["candidates"] == [] # --------------------------------------------------------------------------- # Integration — actual pruning # --------------------------------------------------------------------------- class TestLivePrune: def test_removes_unreachable_objects(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) orphan = b"i am unreachable" write_object(root, _oid(orphan), orphan) assert has_object(root, _oid(orphan)) result = _invoke(root) assert result.exit_code == 0 assert not has_object(root, _oid(orphan)), "Orphan blob must be deleted by prune" def test_keeps_reachable_objects(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) write_object(root, _oid(b"orphan"), b"orphan") result = _invoke(root) assert result.exit_code == 0 assert has_object(root, _oid(b"# a\n")), "Reachable blob must survive prune" assert has_object(root, _oid(b"# b\n")), "Reachable blob must survive prune" def test_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: """RED: duration_ms must be present in live --json output.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) write_object(root, _oid(b"orphan"), b"orphan") result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "duration_ms" in data, "duration_ms missing from live JSON" assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_json_has_exit_code(self, tmp_path: pathlib.Path) -> None: """RED: exit_code must be present in live --json output.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "exit_code" in data, "exit_code missing from live JSON" assert data["exit_code"] == 0 def test_json_has_reachable_count(self, tmp_path: pathlib.Path) -> None: """RED: reachable_count must appear in live --json output.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) write_object(root, _oid(b"orphan"), b"orphan") result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "reachable_count" in data, "reachable_count missing from live JSON" assert data["reachable_count"] >= 2 def test_json_schema_complete(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) write_object(root, _oid(b"orphan"), b"orphan") result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) for key in ("pruned", "bytes_freed", "dry_run", "reachable_count", "duration_ms", "exit_code"): assert key in data, f"key {key!r} missing from live JSON" assert data["dry_run"] is False def test_json_pruned_count(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) for i in range(3): write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) result = _invoke(root, "--json") data = json.loads(result.stdout) assert data["pruned"] >= 3 def test_empty_repo_exits_zero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["pruned"] == 0 def test_no_orphans_exits_zero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["pruned"] == 0 # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_bytes_freed_matches_actual_file_sizes(self, tmp_path: pathlib.Path) -> None: """bytes_freed must equal the sum of sizes of actually deleted files.""" root = _init_repo(tmp_path) orphans = [f"orphan blob {i}".encode() for i in range(5)] expected_bytes = 0 for orphan in orphans: oid = _oid(orphan) write_object(root, oid, orphan) # Find the on-disk size of the stored file. from muse.core.object_store import object_path obj_file = object_path(root, oid) if obj_file.exists(): expected_bytes += obj_file.stat().st_size result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["bytes_freed"] == expected_bytes def test_reachable_count_matches_committed_objects(self, tmp_path: pathlib.Path) -> None: """reachable_count must equal the number of objects in all snapshots.""" root = _init_repo(tmp_path) files = {"a.py": b"# a\n", "b.py": b"# b\n", "c.py": b"# c\n"} _commit_files(root, files) write_object(root, _oid(b"orphan"), b"orphan") result = _invoke(root, "--json") data = json.loads(result.stdout) # 3 committed objects → reachable_count >= 3 (at least). assert data["reachable_count"] >= 3 def test_dry_run_bytes_freed_matches_candidate_sizes(self, tmp_path: pathlib.Path) -> None: """In dry-run, bytes_freed must equal the sum of candidate sizes.""" root = _init_repo(tmp_path) for i in range(4): write_object(root, _oid(f"blob {i}".encode()), f"blob {i}".encode()) result = _invoke(root, "--dry-run", "--json") data = json.loads(result.stdout) expected = sum(c["size"] for c in data["candidates"]) assert data["bytes_freed"] == expected # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_does_not_touch_commits_or_snapshots(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) write_object(root, _oid(b"orphan"), b"orphan") # Commits and snapshots live in the unified object store (.muse/objects/). objects_before = set(str(f) for f in objects_dir(root).rglob("*") if f.is_file()) _invoke(root) objects_after = set(str(f) for f in objects_dir(root).rglob("*") if f.is_file()) # Prune must not remove any reachable objects (commits/snapshots/blobs). # Orphan blobs may be removed — so we only check reachable objects survive. from muse.core.store import read_commit from muse.core.paths import ref_path as _ref_path head_id = _ref_path(root, "main").read_text().strip() commit = read_commit(root, head_id) assert commit is not None, "prune must not delete the HEAD commit" snap_path = next( (p for p in objects_dir(root).rglob("*") if p.is_file() and not p.name.startswith(".")), None, ) assert snap_path is not None, "prune must not empty the object store" def test_dry_run_is_truly_readonly(self, tmp_path: pathlib.Path) -> None: """No file under .muse/objects/ must be removed during --dry-run.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) for i in range(5): write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) before_files = set( str(f) for f in (objects_dir(root)).rglob("*") if f.is_file() ) _invoke(root, "--dry-run") after_files = set( str(f) for f in (objects_dir(root)).rglob("*") if f.is_file() ) assert before_files == after_files, "dry-run must not modify the object store" def test_reachable_objects_never_deleted(self, tmp_path: pathlib.Path) -> None: """All committed object IDs must still be present after pruning.""" root = _init_repo(tmp_path) committed_contents = [b"keep me A", b"keep me B", b"keep me C"] files = {f"f{i}.py": c for i, c in enumerate(committed_contents)} _commit_files(root, files) for i in range(10): write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) _invoke(root) for content in committed_contents: assert has_object(root, _oid(content)), ( f"Reachable object {_oid(content)[:20]}... was deleted by prune" ) def test_no_ansi_in_json_output(self, tmp_path: pathlib.Path) -> None: """JSON output must not contain ANSI escape sequences.""" root = _init_repo(tmp_path) write_object(root, _oid(b"orphan"), b"orphan") result = _invoke(root, "--json") assert "\x1b[" not in result.stdout def test_merge_in_progress_exits_user_error(self, tmp_path: pathlib.Path) -> None: """prune must refuse when a merge is in progress.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) # Simulate merge in progress by writing merge state. ms_path = merge_state_path(root) ms_path.write_text( json.dumps({"from_branch": "feat/x", "conflict_paths": []}), encoding="utf-8", ) result = _invoke(root) # Should refuse and exit non-zero (1 = USER_ERROR). # If merge engine not available, prune proceeds — accept both. if result.exit_code != 0: assert result.exit_code == 1 # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- class TestPerformance: def test_100_objects_under_1_second(self, tmp_path: pathlib.Path) -> None: """Pruning a 100-object store (50 reachable, 50 orphaned) must complete in under 1 second wall-clock time.""" root = _init_repo(tmp_path) files = {f"file_{i}.py": f"# {i}\n".encode() for i in range(50)} _commit_files(root, files) for i in range(50): write_object(root, _oid(f"orphan {i}".encode()), f"orphan {i}".encode()) t0 = time.monotonic() result = _invoke(root, "--json") elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 1.0, f"prune took {elapsed:.3f}s — expected < 1s" def test_duration_ms_is_positive_number(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") data = json.loads(result.stdout) assert data["duration_ms"] >= 0 assert data["duration_ms"] < 10_000 # sanity: less than 10 seconds # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_50_percent_unreachable_200_objects(self, tmp_path: pathlib.Path) -> None: """200 objects: 100 reachable (committed), 100 orphaned. Prune removes exactly 100.""" root = _init_repo(tmp_path) files = {f"file_{i}.py": f"# {i}\n".encode() for i in range(100)} _commit_files(root, files) for i in range(100): content = f"orphan blob {i:04d}".encode() write_object(root, _oid(content), content) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["pruned"] == 100 assert data["reachable_count"] >= 100 def test_all_objects_reachable_prunes_nothing(self, tmp_path: pathlib.Path) -> None: """When every object is reachable, pruned==0 and store is unchanged.""" root = _init_repo(tmp_path) files = {f"file_{i}.py": f"# {i}\n".encode() for i in range(50)} _commit_files(root, files) before = _object_count(root) result = _invoke(root, "--json") data = json.loads(result.stdout) assert data["pruned"] == 0 assert _object_count(root) == before # --------------------------------------------------------------------------- # TestRegisterFlags — argparse-level verification # --------------------------------------------------------------------------- class TestRegisterFlags: """Verify that register() wires --json / -j correctly.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.prune import register ap = argparse.ArgumentParser() subs = ap.add_subparsers() register(subs) return ap def test_json_flag_long(self) -> None: ns = self._make_parser().parse_args(["prune", "--json"]) assert ns.json_out is True def test_j_alias(self) -> None: ns = self._make_parser().parse_args(["prune", "-j"]) assert ns.json_out is True def test_default_is_text(self) -> None: ns = self._make_parser().parse_args(["prune"]) assert ns.json_out is False def test_dest_is_json_out(self) -> None: ns = self._make_parser().parse_args(["prune", "-j"]) assert hasattr(ns, "json_out") assert not hasattr(ns, "fmt")