"""Comprehensive tests for ``muse gc --full`` — orphaned commit + snapshot pruning. Coverage dimensions ------------------- Unit ~~~~ - ``_collect_reachable_commits``: empty repo, single branch, multi-branch, parent chain traversal, merge commits (2 parents), missing files, corrupt files, symlink guard, cycle resistance, tags included - ``_collect_reachable_snapshots``: snapshot IDs from reachable commits, blob IDs from manifests, shelf objects preserved - ``_list_stored_msgpack``: enumerates files, grace period, symlink guard, non-.msgpack files skipped - ``GcResult``: new fields default to zero Integration (run_gc) ~~~~~~~~~~~~~~~~~~~~ - Orphaned commit deleted; reachable commit preserved - Orphaned snapshot deleted; reachable snapshot preserved - Orphaned blobs from orphaned commits deleted under --full - dry_run=True never deletes commits or snapshots - Multiple branches: any-branch reachability preserved - Linear commit chain: all intermediates preserved - Merge commit (2 parents): both parent chains preserved - Grace period protects recently-written commits and snapshots - GcResult fields populated correctly - run_gc(full=False) does NOT delete orphaned commits/snapshots - Idempotency: second run collects nothing CLI ~~~ - ``muse gc --full`` text output has commits + snapshots lines - ``muse gc --full --dry-run`` text output prefixed with [dry-run] - ``muse gc --full --json`` output includes all new fields with correct types - ``muse gc --full --json --dry-run`` dry_run field is True - ``muse gc --full`` without orphans reports 0 collected - ``muse gc --json`` (no --full) schema unchanged — new fields present at 0 E2E ~~~ - Full lifecycle: orphaned commits/snapshots accumulate, gc --full reclaims them - After branch deletion, unique commits/snapshots GCed under --full - Shelf blob objects protected under --full - Rewritten history: old commits removed, new commits preserved Security ~~~~~~~~ - Symlinked commit file not deleted by --full - Symlinked snapshot file not deleted by --full - Non-.msgpack file in commits dir skipped (not deleted) - Non-.msgpack file in snapshots dir skipped (not deleted) Stress ~~~~~~ - 200 orphaned commits + 200 orphaned snapshots collected correctly - Deep 100-commit chain: all commits preserved under --full """ from __future__ import annotations type _FileStore = dict[str, bytes] import datetime import json import os import pathlib from collections.abc import Mapping import msgpack import pytest from muse.core.gc import ( GcResult, _collect_reachable_commits, _collect_reachable_snapshots, _list_stored_msgpack, run_gc, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id, fake_id, long_id, split_id from muse.core.object_store import write_object as _write_obj_atomic from muse.core.object_store import object_path from muse.core.paths import muse_dir, commits_dir, heads_dir, ref_path, shelf_dir, snapshots_dir from tests.cli_test_helper import CliRunner, InvokeResult cli = None runner = CliRunner() _EPOCH = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) for sub in ("objects", "commits", "snapshots", "refs/heads"): (muse / sub).mkdir(parents=True, exist_ok=True) (muse / "repo.json").write_text( json.dumps({"repo_id": fake_id("repo"), "domain": "code"}), encoding="utf-8", ) (muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") return tmp_path def _write_object(root: pathlib.Path, content: bytes) -> str: oid = blob_id(content) _write_obj_atomic(root, oid, content) return oid def _write_snapshot_with_objects( root: pathlib.Path, files: _FileStore ) -> tuple[str, dict[str, str]]: """Write objects + snapshot. Returns (snapshot_id, manifest).""" manifest: Manifest = {} for name, content in files.items(): manifest[name] = _write_object(root, content) snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) return snap_id, manifest def _write_commit_record( root: pathlib.Path, snapshot_id: str, *, parent1: str | None = None, parent2: str | None = None, message: str = "test", ts_offset: int = 0, ) -> str: """Write a commit object to the unified object store and return its commit_id.""" parent_ids = [p for p in [parent1, parent2] if p] ts = (_EPOCH + datetime.timedelta(seconds=ts_offset)).isoformat() commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snapshot_id, message=message, committed_at_iso=ts, ) data = { "commit_id": commit_id, "repo_id": "test-repo", "branch": "main", "snapshot_id": snapshot_id, "message": message, "committed_at": ts, "parent_commit_id": parent1, "parent2_commit_id": parent2, "author": "", "metadata": {}, } payload = json.dumps(data, separators=(",", ":")).encode() obj_file = object_path(root, commit_id) obj_file.parent.mkdir(parents=True, exist_ok=True) obj_file.write_bytes(f"commit {len(payload)}\0".encode() + payload) return commit_id def _write_shelf_entry(root: pathlib.Path, snapshot: Mapping[str, str]) -> pathlib.Path: """Write a shelf entry msgpack file under .muse/shelf/sha256/ and return its path.""" entry = {"snapshot": snapshot, "branch": "main", "created_at": "2026-01-01T00:00:00+00:00"} packed = msgpack.packb(entry, use_bin_type=True) _, hex_id = split_id(blob_id(packed)) s_dir = shelf_dir(root) / "sha256" s_dir.mkdir(parents=True, exist_ok=True) path = s_dir / f"{hex_id}.msgpack" path.write_bytes(packed) return path def _set_branch(root: pathlib.Path, branch: str, commit_id: str) -> None: branch_ref = ref_path(root, branch) branch_ref.parent.mkdir(parents=True, exist_ok=True) branch_ref.write_text(commit_id, encoding="utf-8") def _make_linear_chain( root: pathlib.Path, length: int, branch: str = "main", ) -> list[str]: """Create a linear chain of *length* commits on *branch*. Returns all commit IDs.""" snap_id, _ = _write_snapshot_with_objects(root, {f"f{i}.txt": f"v{i}".encode() for i in range(length)}) commit_ids: list[str] = [] parent: str | None = None for i in range(length): cid = _write_commit_record(root, snap_id, parent1=parent, message=f"commit {i}", ts_offset=i) commit_ids.append(cid) parent = cid _set_branch(root, branch, commit_ids[-1]) return commit_ids def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _invoke_gc(root: pathlib.Path, *extra_args: str) -> InvokeResult: args = list(extra_args) if "--grace-period" not in args: args = ["--grace-period", "0"] + args return runner.invoke(cli, ["gc"] + args, env=_env(root), catch_exceptions=False) # --------------------------------------------------------------------------- # Unit — GcResult new fields default to zero # --------------------------------------------------------------------------- class TestGcResultDefaults: def test_commits_fields_default_to_zero(self) -> None: r = GcResult() assert r.commits_reachable == 0 assert r.commits_collected == 0 assert r.commits_collected_bytes == 0 def test_snapshots_fields_default_to_zero(self) -> None: r = GcResult() assert r.snapshots_reachable == 0 assert r.snapshots_collected == 0 assert r.snapshots_collected_bytes == 0 def test_full_field_defaults_to_false(self) -> None: assert GcResult().full is False # --------------------------------------------------------------------------- # Unit — _collect_reachable_commits # --------------------------------------------------------------------------- class TestCollectReachableCommits: def test_empty_repo_returns_empty_set(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) assert _collect_reachable_commits(root) == set() def test_single_branch_single_commit(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {"a.py": b"x"}) cid = _write_commit_record(root, snap_id) _set_branch(root, "main", cid) assert _collect_reachable_commits(root) == {cid} def test_traverses_parent_chain(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) c1 = _write_commit_record(root, snap_id, message="c1", ts_offset=0) c2 = _write_commit_record(root, snap_id, parent1=c1, message="c2", ts_offset=1) c3 = _write_commit_record(root, snap_id, parent1=c2, message="c3", ts_offset=2) _set_branch(root, "main", c3) reachable = _collect_reachable_commits(root) assert {c1, c2, c3} == reachable def test_merge_commit_both_parents_reachable(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) base = _write_commit_record(root, snap_id, message="base", ts_offset=0) feat = _write_commit_record(root, snap_id, parent1=base, message="feat", ts_offset=1) merge = _write_commit_record(root, snap_id, parent1=base, parent2=feat, message="merge", ts_offset=2) _set_branch(root, "main", merge) reachable = _collect_reachable_commits(root) assert {base, feat, merge} == reachable def test_multiple_branches_union(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) c1 = _write_commit_record(root, snap_id, message="c1", ts_offset=0) c2 = _write_commit_record(root, snap_id, message="c2", ts_offset=1) _set_branch(root, "main", c1) _set_branch(root, "dev", c2) reachable = _collect_reachable_commits(root) assert {c1, c2} == reachable def test_nested_branch_ref_traversed(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) cid = _write_commit_record(root, snap_id) _set_branch(root, "feat/my-feature", cid) reachable = _collect_reachable_commits(root) assert cid in reachable def test_missing_commit_file_skipped(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) # Write a branch ref pointing to a commit that doesn't exist in store. ghost_hex = "a" * 64 _set_branch(root, "main", long_id(ghost_hex)) # Should not raise; ghost commit counted as reachable (it's the tip). reachable = _collect_reachable_commits(root) assert long_id(ghost_hex) in reachable def test_corrupt_commit_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) cid = _write_commit_record(root, snap_id) _set_branch(root, "main", cid) # Corrupt the commit file. object_path(root, cid).write_bytes(b"not msgpack") # Should not raise. reachable = _collect_reachable_commits(root) assert cid in reachable # tip is still reachable; parents can't be walked def test_symlinked_ref_file_skipped(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) # Create a symlinked ref file — should be ignored. ref_dir = heads_dir(root) link = ref_dir / "malicious" target = tmp_path / "target.txt" target.write_text("a" * 64) link.symlink_to(target) # Shouldn't crash; symlinked ref is not followed. reachable = _collect_reachable_commits(root) assert len(reachable) == 0 def test_commit_only_in_orphaned_file_not_reachable(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) orphan = _write_commit_record(root, snap_id, message="orphan") # No branch ref points to orphan. reachable = _collect_reachable_commits(root) assert orphan not in reachable def test_diamond_dag_no_duplicate_walk(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) base = _write_commit_record(root, snap_id, message="base", ts_offset=0) left = _write_commit_record(root, snap_id, parent1=base, message="left", ts_offset=1) right = _write_commit_record(root, snap_id, parent1=base, message="right", ts_offset=2) tip = _write_commit_record(root, snap_id, parent1=left, parent2=right, message="tip", ts_offset=3) _set_branch(root, "main", tip) reachable = _collect_reachable_commits(root) assert reachable == {base, left, right, tip} # --------------------------------------------------------------------------- # Unit — _collect_reachable_snapshots # --------------------------------------------------------------------------- class TestCollectReachableSnapshots: def test_returns_snapshot_ids_from_reachable_commits(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, manifest = _write_snapshot_with_objects(root, {"f.py": b"code"}) cid = _write_commit_record(root, snap_id) snaps, objs = _collect_reachable_snapshots(root, {cid}) assert snap_id in snaps def test_returns_blob_ids_from_manifest(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) obj_id = _write_object(root, b"file content") snap_id = compute_snapshot_id({"f.py": obj_id}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={"f.py": obj_id})) cid = _write_commit_record(root, snap_id) _, objs = _collect_reachable_snapshots(root, {cid}) assert obj_id in objs def test_empty_reachable_commits_returns_empty(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snaps, objs = _collect_reachable_snapshots(root, set()) assert snaps == set() assert objs == set() def test_multiple_commits_same_snapshot_deduplicated(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {"f": b"x"}) c1 = _write_commit_record(root, snap_id, message="c1", ts_offset=0) c2 = _write_commit_record(root, snap_id, message="c2", ts_offset=1) snaps, _ = _collect_reachable_snapshots(root, {c1, c2}) assert snaps == {snap_id} def test_shelf_blobs_included(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) shelf_obj = _write_object(root, b"shelved content") _write_shelf_entry(root, {"file.py": shelf_obj}) _, objs = _collect_reachable_snapshots(root, set()) assert shelf_obj in objs def test_missing_snapshot_file_skipped(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) ghost_snap_id = long_id("b" * 64) cid = _write_commit_record(root, ghost_snap_id) # No snapshot file on disk — should not crash. snaps, objs = _collect_reachable_snapshots(root, {cid}) # ghost snapshot is in snaps set but its blobs can't be collected assert ghost_snap_id in snaps assert len(objs) == 0 # --------------------------------------------------------------------------- # Unit — _list_stored_msgpack # --------------------------------------------------------------------------- class TestListStoredMsgpack: def test_returns_msgpack_files(self, tmp_path: pathlib.Path) -> None: d = tmp_path / "store" shard = d / "sha256" shard.mkdir(parents=True) (shard / "abc123.msgpack").write_bytes(b"data") (shard / "def456.msgpack").write_bytes(b"data2") pairs = _list_stored_msgpack(d, grace_period_seconds=0) stems = {stem for stem, _ in pairs} assert stems == {"abc123", "def456"} def test_non_msgpack_files_excluded(self, tmp_path: pathlib.Path) -> None: d = tmp_path / "store" d.mkdir() (d / "abc.json").write_bytes(b"data") (d / "abc.txt").write_bytes(b"data") pairs = _list_stored_msgpack(d, grace_period_seconds=0) assert pairs == [] def test_symlinked_file_excluded(self, tmp_path: pathlib.Path) -> None: d = tmp_path / "store" d.mkdir() real = tmp_path / "real.msgpack" real.write_bytes(b"data") (d / "linked.msgpack").symlink_to(real) pairs = _list_stored_msgpack(d, grace_period_seconds=0) assert pairs == [] def test_grace_period_protects_recent_files(self, tmp_path: pathlib.Path) -> None: d = tmp_path / "store" shard = d / "sha256" shard.mkdir(parents=True) (shard / "recent.msgpack").write_bytes(b"data") pairs = _list_stored_msgpack(d, grace_period_seconds=9999) assert pairs == [] def test_grace_period_zero_includes_all(self, tmp_path: pathlib.Path) -> None: d = tmp_path / "store" shard = d / "sha256" shard.mkdir(parents=True) (shard / "old.msgpack").write_bytes(b"data") pairs = _list_stored_msgpack(d, grace_period_seconds=0) assert len(pairs) == 1 def test_nonexistent_directory_returns_empty(self, tmp_path: pathlib.Path) -> None: pairs = _list_stored_msgpack(tmp_path / "does_not_exist", grace_period_seconds=0) assert pairs == [] # --------------------------------------------------------------------------- # Integration — run_gc(full=True) # --------------------------------------------------------------------------- class TestRunGcFull: def test_orphaned_commit_deleted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) orphan = _write_commit_record(root, snap_id, message="orphan") orphan_path = object_path(root, orphan) assert orphan_path.exists() result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 1 assert not orphan_path.exists() def test_reachable_commit_preserved(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) cid = _write_commit_record(root, snap_id) _set_branch(root, "main", cid) cp = object_path(root, cid) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 0 assert cp.exists() def test_orphaned_snapshot_deleted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) snap_path = object_path(root, snap_id) assert snap_path.exists() # No commit references this snapshot. result = run_gc(root, full=True, grace_period_seconds=0) assert result.snapshots_collected == 1 assert not snap_path.exists() def test_reachable_snapshot_preserved(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {"f.py": b"code"}) cid = _write_commit_record(root, snap_id) _set_branch(root, "main", cid) snap_path = object_path(root, snap_id) result = run_gc(root, full=True, grace_period_seconds=0) assert result.snapshots_collected == 0 assert snap_path.exists() def test_orphaned_blob_from_orphaned_commit_deleted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) orphan_blob = _write_object(root, b"only in orphaned commit") snap_id = compute_snapshot_id({"f": orphan_blob}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={"f": orphan_blob})) _write_commit_record(root, snap_id, message="orphan") # No branch ref → orphan commit + snapshot + blob all unreachable. blob_path = object_path(root, orphan_blob) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 1 assert result.snapshots_collected == 1 assert result.collected_count == 1 assert not blob_path.exists() def test_dry_run_never_deletes(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) orphan = _write_commit_record(root, snap_id, message="orphan") result = run_gc(root, full=True, dry_run=True, grace_period_seconds=0) assert result.dry_run is True assert result.commits_collected == 1 assert object_path(root, orphan).exists() assert object_path(root, snap_id).exists() def test_commit_reachable_from_any_branch_preserved(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) shared_base = _write_commit_record(root, snap_id, message="base", ts_offset=0) tip_main = _write_commit_record(root, snap_id, parent1=shared_base, message="main-tip", ts_offset=1) tip_dev = _write_commit_record(root, snap_id, parent1=shared_base, message="dev-tip", ts_offset=2) _set_branch(root, "main", tip_main) _set_branch(root, "dev", tip_dev) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 0 for cid in (shared_base, tip_main, tip_dev): assert object_path(root, cid).exists() def test_linear_chain_all_intermediates_preserved(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_ids = _make_linear_chain(root, 10) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 0 for cid in commit_ids: assert object_path(root, cid).exists() def test_merge_commit_both_parent_chains_preserved(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) base = _write_commit_record(root, snap_id, message="base", ts_offset=0) left = _write_commit_record(root, snap_id, parent1=base, message="left", ts_offset=1) right = _write_commit_record(root, snap_id, parent1=base, message="right", ts_offset=2) merge = _write_commit_record(root, snap_id, parent1=left, parent2=right, message="merge", ts_offset=3) _set_branch(root, "main", merge) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 0 for cid in (base, left, right, merge): assert object_path(root, cid).exists() def test_grace_period_protects_recent_commit(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) orphan = _write_commit_record(root, snap_id) # No branch ref; orphan is unreachable — but grace period protects it. result = run_gc(root, full=True, grace_period_seconds=9999) assert result.commits_collected == 0 assert object_path(root, orphan).exists() def test_grace_period_protects_recent_snapshot(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) result = run_gc(root, full=True, grace_period_seconds=9999) assert result.snapshots_collected == 0 assert object_path(root, snap_id).exists() def test_gcresult_commits_reachable_count(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) cid = _write_commit_record(root, snap_id) _set_branch(root, "main", cid) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_reachable == 1 assert result.commits_collected == 0 def test_gcresult_snapshots_reachable_count(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) cid = _write_commit_record(root, snap_id) _set_branch(root, "main", cid) result = run_gc(root, full=True, grace_period_seconds=0) assert result.snapshots_reachable == 1 assert result.snapshots_collected == 0 def test_gcresult_full_flag_set(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) result = run_gc(root, full=True, grace_period_seconds=0) assert result.full is True def test_without_full_flag_orphaned_commits_not_deleted(self, tmp_path: pathlib.Path) -> None: """Default run_gc (full=False) must NOT prune commits or snapshots.""" root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) orphan = _write_commit_record(root, snap_id, message="orphan") result = run_gc(root, full=False, grace_period_seconds=0) assert result.commits_collected == 0 assert result.snapshots_collected == 0 assert object_path(root, orphan).exists() def test_idempotent_second_run_collects_nothing(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) orphan = _write_commit_record(root, snap_id) run_gc(root, full=True, grace_period_seconds=0) result2 = run_gc(root, full=True, grace_period_seconds=0) assert result2.commits_collected == 0 assert result2.snapshots_collected == 0 assert result2.collected_count == 0 def test_collected_bytes_nonzero_for_deleted_commit(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) _write_commit_record(root, snap_id, message="orphan") result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected_bytes > 0 def test_collected_bytes_nonzero_for_deleted_snapshot(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {"f": b"content"}) result = run_gc(root, full=True, grace_period_seconds=0) assert result.snapshots_collected_bytes > 0 # --------------------------------------------------------------------------- # CLI integration # --------------------------------------------------------------------------- class TestCliGcFull: def test_full_text_output_has_three_lines(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full") assert r.exit_code == 0 lines = [ln for ln in r.output.strip().splitlines() if ln.strip()] assert len(lines) == 3 # objects, commits, snapshots def test_full_text_output_contains_commit_line(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full") assert "commit" in r.output def test_full_text_output_contains_snapshot_line(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full") assert "snapshot" in r.output def test_full_dry_run_prefix_on_all_lines(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full", "--dry-run") assert r.exit_code == 0 content_lines = [ln for ln in r.output.strip().splitlines() if ln.strip()] assert all("[dry-run]" in ln for ln in content_lines) def test_full_json_includes_commits_fields(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full", "--json") assert r.exit_code == 0 data = json.loads(r.output.strip()) assert "commits_reachable" in data assert "commits_collected" in data assert "commits_collected_bytes" in data def test_full_json_includes_snapshots_fields(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full", "--json") data = json.loads(r.output.strip()) assert "snapshots_reachable" in data assert "snapshots_collected" in data assert "snapshots_collected_bytes" in data def test_full_json_field_types(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full", "--json") data = json.loads(r.output.strip()) assert isinstance(data["commits_reachable"], int) assert isinstance(data["commits_collected"], int) assert isinstance(data["commits_collected_bytes"], int) assert isinstance(data["snapshots_reachable"], int) assert isinstance(data["snapshots_collected"], int) assert isinstance(data["snapshots_collected_bytes"], int) assert isinstance(data["full"], bool) def test_full_json_full_field_true(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full", "--json") data = json.loads(r.output.strip()) assert data["full"] is True def test_no_full_json_full_field_false(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--json") data = json.loads(r.output.strip()) assert data["full"] is False def test_full_json_dry_run_field(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) r = _invoke_gc(root, "--full", "--dry-run", "--json") data = json.loads(r.output.strip()) assert data["dry_run"] is True def test_full_zero_orphans_reports_zeros(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) cid = _write_commit_record(root, snap_id) _set_branch(root, "main", cid) r = _invoke_gc(root, "--full", "--json") data = json.loads(r.output.strip()) assert data["commits_collected"] == 0 assert data["snapshots_collected"] == 0 def test_full_reports_correct_collected_counts(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) # 3 orphaned commits, 3 orphaned snapshots for i in range(3): snap_id, _ = _write_snapshot_with_objects(root, {f"f{i}": f"v{i}".encode()}) _write_commit_record(root, snap_id, message=f"orphan-{i}", ts_offset=i) r = _invoke_gc(root, "--full", "--json") data = json.loads(r.output.strip()) assert data["commits_collected"] == 3 assert data["snapshots_collected"] == 3 def test_no_full_json_schema_unchanged(self, tmp_path: pathlib.Path) -> None: """Without --full, the new fields are present but zero.""" root = _make_repo(tmp_path) r = _invoke_gc(root, "--json") data = json.loads(r.output.strip()) # Old fields still present. assert "collected_count" in data assert "reachable_count" in data # New fields present but zero. assert data["commits_collected"] == 0 assert data["snapshots_collected"] == 0 # --------------------------------------------------------------------------- # E2E tests # --------------------------------------------------------------------------- class TestGcFullE2E: def test_full_lifecycle_orphans_accumulate_then_freed(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) # Create a live commit on main. live_snap, _ = _write_snapshot_with_objects(root, {"app.py": b"app code"}) live_cid = _write_commit_record(root, live_snap) _set_branch(root, "main", live_cid) # Simulate abandoned work: write orphaned commits/snapshots. for i in range(5): snap_id, _ = _write_snapshot_with_objects(root, {f"draft{i}.py": f"draft{i}".encode()}) _write_commit_record(root, snap_id, message=f"abandoned-{i}", ts_offset=i + 10) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 5 assert result.snapshots_collected == 5 assert result.commits_reachable == 1 assert result.snapshots_reachable == 1 # Live commit and snapshot still intact. assert object_path(root, live_cid).exists() assert object_path(root, live_snap).exists() def test_shelf_blobs_protected_under_full(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) shelf_obj = _write_object(root, b"shelved work") _write_shelf_entry(root, {"work.py": shelf_obj}) result = run_gc(root, full=True, grace_period_seconds=0) assert result.collected_count == 0 blob_path = object_path(root, shelf_obj) assert blob_path.exists() def test_rewrite_history_old_commits_removed(self, tmp_path: pathlib.Path) -> None: """Simulate a history rewrite: old commits orphaned, new commits on branch.""" root = _make_repo(tmp_path) snap1, _ = _write_snapshot_with_objects(root, {"v1.py": b"v1"}) old_cid = _write_commit_record(root, snap1, message="old") snap2, _ = _write_snapshot_with_objects(root, {"v2.py": b"v2"}) new_cid = _write_commit_record(root, snap2, message="new (rewrite)") _set_branch(root, "main", new_cid) # old_cid is now orphaned. result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 1 assert result.snapshots_collected == 1 assert not object_path(root, old_cid).exists() assert object_path(root, new_cid).exists() def test_two_branches_then_one_deleted_unique_commits_freed( self, tmp_path: pathlib.Path ) -> None: root = _make_repo(tmp_path) shared_snap, _ = _write_snapshot_with_objects(root, {"base.py": b"base"}) base_cid = _write_commit_record(root, shared_snap, message="base", ts_offset=0) feat_snap, _ = _write_snapshot_with_objects(root, {"feat.py": b"feat"}) feat_cid = _write_commit_record(root, feat_snap, parent1=base_cid, message="feat", ts_offset=1) _set_branch(root, "main", base_cid) _set_branch(root, "dev", feat_cid) # "Delete" feat branch by removing its ref. (heads_dir(root) / "dev").unlink() result = run_gc(root, full=True, grace_period_seconds=0) # feat_cid unique to dev is now orphaned. assert result.commits_collected == 1 assert not object_path(root, feat_cid).exists() # base_cid still on main is preserved. assert object_path(root, base_cid).exists() # --------------------------------------------------------------------------- # Security tests # --------------------------------------------------------------------------- class TestGcFullSecurity: def test_symlinked_commit_file_not_deleted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) real_file = tmp_path / "real_commit.msgpack" real_file.write_bytes(msgpack.packb({"commit_id": "a" * 64}, use_bin_type=True)) link = commits_dir(root) / "linked.msgpack" link.symlink_to(real_file) run_gc(root, full=True, grace_period_seconds=0) assert real_file.exists(), "Target of symlink must not be deleted" def test_symlinked_snapshot_file_not_deleted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) real_file = tmp_path / "real_snap.msgpack" real_file.write_bytes(msgpack.packb({"snapshot_id": "b" * 64}, use_bin_type=True)) link = snapshots_dir(root) / "linked.msgpack" link.symlink_to(real_file) run_gc(root, full=True, grace_period_seconds=0) assert real_file.exists(), "Target of snapshot symlink must not be deleted" def test_non_msgpack_file_in_commits_dir_not_deleted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) stray = commits_dir(root) / "README.txt" stray.write_text("not a commit") run_gc(root, full=True, grace_period_seconds=0) assert stray.exists() def test_non_msgpack_file_in_snapshots_dir_not_deleted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) stray = snapshots_dir(root) / ".DS_Store" stray.write_bytes(b"junk") run_gc(root, full=True, grace_period_seconds=0) assert stray.exists() # --------------------------------------------------------------------------- # Stress tests # --------------------------------------------------------------------------- class TestGcFullStress: def test_200_orphaned_commits_and_snapshots_collected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) # Live commit that must survive. live_snap, _ = _write_snapshot_with_objects(root, {"live.py": b"live"}) live_cid = _write_commit_record(root, live_snap) _set_branch(root, "main", live_cid) # 200 orphaned commit+snapshot pairs. for i in range(200): snap_id, _ = _write_snapshot_with_objects(root, {f"f{i}.py": f"v{i}".encode()}) _write_commit_record(root, snap_id, message=f"orphan-{i}", ts_offset=i + 1) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 200 assert result.snapshots_collected == 200 assert result.commits_reachable == 1 assert result.snapshots_reachable == 1 # Live commit and snapshot intact. assert object_path(root, live_cid).exists() assert object_path(root, live_snap).exists() def test_deep_100_commit_chain_all_preserved(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_ids = _make_linear_chain(root, 100) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 0 assert result.commits_reachable == 100 for cid in commit_ids: assert object_path(root, cid).exists() def test_50_branches_all_commits_preserved(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) snap_id, _ = _write_snapshot_with_objects(root, {}) for i in range(50): cid = _write_commit_record(root, snap_id, message=f"branch-{i}", ts_offset=i) _set_branch(root, f"feat/branch-{i:02d}", cid) result = run_gc(root, full=True, grace_period_seconds=0) assert result.commits_collected == 0 assert result.commits_reachable == 50