gabriel / muse public

test_gc_full.py file-level

at sha256:8 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Comprehensive tests for ``muse gc --full`` — orphaned commit + snapshot pruning.
2
3 Coverage dimensions
4 -------------------
5
6 Unit
7 ~~~~
8 - ``_collect_reachable_commits``: empty repo, single branch, multi-branch,
9 parent chain traversal, merge commits (2 parents), missing files, corrupt
10 files, symlink guard, cycle resistance, tags included
11 - ``_collect_reachable_snapshots``: snapshot IDs from reachable commits,
12 blob IDs from manifests, shelf objects preserved
13 - ``_list_stored_msgpack``: enumerates files, grace period, symlink guard,
14 non-.msgpack files skipped
15 - ``GcResult``: new fields default to zero
16
17 Integration (run_gc)
18 ~~~~~~~~~~~~~~~~~~~~
19 - Orphaned commit deleted; reachable commit preserved
20 - Orphaned snapshot deleted; reachable snapshot preserved
21 - Orphaned blobs from orphaned commits deleted under --full
22 - dry_run=True never deletes commits or snapshots
23 - Multiple branches: any-branch reachability preserved
24 - Linear commit chain: all intermediates preserved
25 - Merge commit (2 parents): both parent chains preserved
26 - Grace period protects recently-written commits and snapshots
27 - GcResult fields populated correctly
28 - run_gc(full=False) does NOT delete orphaned commits/snapshots
29 - Idempotency: second run collects nothing
30
31 CLI
32 ~~~
33 - ``muse gc --full`` text output has commits + snapshots lines
34 - ``muse gc --full --dry-run`` text output prefixed with [dry-run]
35 - ``muse gc --full --json`` output includes all new fields with correct types
36 - ``muse gc --full --json --dry-run`` dry_run field is True
37 - ``muse gc --full`` without orphans reports 0 collected
38 - ``muse gc --json`` (no --full) schema unchanged — new fields present at 0
39
40 E2E
41 ~~~
42 - Full lifecycle: orphaned commits/snapshots accumulate, gc --full reclaims them
43 - After branch deletion, unique commits/snapshots GCed under --full
44 - Shelf blob objects protected under --full
45 - Rewritten history: old commits removed, new commits preserved
46
47 Security
48 ~~~~~~~~
49 - Symlinked commit file not deleted by --full
50 - Symlinked snapshot file not deleted by --full
51 - Non-.msgpack file in commits dir skipped (not deleted)
52 - Non-.msgpack file in snapshots dir skipped (not deleted)
53
54 Stress
55 ~~~~~~
56 - 200 orphaned commits + 200 orphaned snapshots collected correctly
57 - Deep 100-commit chain: all commits preserved under --full
58 """
59
60 from __future__ import annotations
61
62 type _FileStore = dict[str, bytes]
63
64 import datetime
65 import json
66 import os
67 import pathlib
68 from collections.abc import Mapping
69
70 import msgpack
71 import pytest
72
73 from muse.core.gc import (
74 GcResult,
75 _collect_reachable_commits,
76 _collect_reachable_snapshots,
77 _list_stored_msgpack,
78 run_gc,
79 )
80 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
81 from muse.core.snapshots import (
82 SnapshotRecord,
83 write_snapshot,
84 )
85 from muse.core.types import Manifest, blob_id, fake_id, long_id, split_id
86 from muse.core.object_store import write_object as _write_obj_atomic
87 from muse.core.object_store import object_path
88 from muse.core.paths import muse_dir, commits_dir, heads_dir, ref_path, shelf_dir, snapshots_dir
89
90 from tests.cli_test_helper import CliRunner, InvokeResult
91
92 cli = None
93 runner = CliRunner()
94
95 _EPOCH = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
96
97
98 # ---------------------------------------------------------------------------
99 # Helpers
100 # ---------------------------------------------------------------------------
101
102
103 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
104 muse = muse_dir(tmp_path)
105 for sub in ("objects", "commits", "snapshots", "refs/heads"):
106 (muse / sub).mkdir(parents=True, exist_ok=True)
107 (muse / "repo.json").write_text(
108 json.dumps({"repo_id": fake_id("repo"), "domain": "code"}),
109 encoding="utf-8",
110 )
111 (muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8")
112 return tmp_path
113
114
115 def _write_object(root: pathlib.Path, content: bytes) -> str:
116 oid = blob_id(content)
117 _write_obj_atomic(root, oid, content)
118 return oid
119
120
121 def _write_snapshot_with_objects(
122 root: pathlib.Path, files: _FileStore
123 ) -> tuple[str, dict[str, str]]:
124 """Write objects + snapshot. Returns (snapshot_id, manifest)."""
125 manifest: Manifest = {}
126 for name, content in files.items():
127 manifest[name] = _write_object(root, content)
128 snap_id = compute_snapshot_id(manifest)
129 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
130 return snap_id, manifest
131
132
133 def _write_commit_record(
134 root: pathlib.Path,
135 snapshot_id: str,
136 *,
137 parent1: str | None = None,
138 parent2: str | None = None,
139 message: str = "test",
140 ts_offset: int = 0,
141 ) -> str:
142 """Write a commit object to the unified object store and return its commit_id."""
143 parent_ids = [p for p in [parent1, parent2] if p]
144 ts = (_EPOCH + datetime.timedelta(seconds=ts_offset)).isoformat()
145 commit_id = compute_commit_id(
146 parent_ids=parent_ids,
147 snapshot_id=snapshot_id,
148 message=message,
149 committed_at_iso=ts,
150 )
151 data = {
152 "commit_id": commit_id,
153 "repo_id": "test-repo",
154 "branch": "main",
155 "snapshot_id": snapshot_id,
156 "message": message,
157 "committed_at": ts,
158 "parent_commit_id": parent1,
159 "parent2_commit_id": parent2,
160 "author": "",
161 "metadata": {},
162 }
163 payload = json.dumps(data, separators=(",", ":")).encode()
164 obj_file = object_path(root, commit_id)
165 obj_file.parent.mkdir(parents=True, exist_ok=True)
166 obj_file.write_bytes(f"commit {len(payload)}\0".encode() + payload)
167 return commit_id
168
169
170 def _write_shelf_entry(root: pathlib.Path, snapshot: Mapping[str, str]) -> pathlib.Path:
171 """Write a shelf entry msgpack file under .muse/shelf/sha256/ and return its path."""
172 entry = {"snapshot": snapshot, "branch": "main", "created_at": "2026-01-01T00:00:00+00:00"}
173 packed = msgpack.packb(entry, use_bin_type=True)
174 _, hex_id = split_id(blob_id(packed))
175 s_dir = shelf_dir(root) / "sha256"
176 s_dir.mkdir(parents=True, exist_ok=True)
177 path = s_dir / f"{hex_id}.msgpack"
178 path.write_bytes(packed)
179 return path
180
181
182 def _set_branch(root: pathlib.Path, branch: str, commit_id: str) -> None:
183 branch_ref = ref_path(root, branch)
184 branch_ref.parent.mkdir(parents=True, exist_ok=True)
185 branch_ref.write_text(commit_id, encoding="utf-8")
186
187
188 def _make_linear_chain(
189 root: pathlib.Path,
190 length: int,
191 branch: str = "main",
192 ) -> list[str]:
193 """Create a linear chain of *length* commits on *branch*. Returns all commit IDs."""
194 snap_id, _ = _write_snapshot_with_objects(root, {f"f{i}.txt": f"v{i}".encode() for i in range(length)})
195 commit_ids: list[str] = []
196 parent: str | None = None
197 for i in range(length):
198 cid = _write_commit_record(root, snap_id, parent1=parent, message=f"commit {i}", ts_offset=i)
199 commit_ids.append(cid)
200 parent = cid
201 _set_branch(root, branch, commit_ids[-1])
202 return commit_ids
203
204
205 def _env(root: pathlib.Path) -> Manifest:
206 return {"MUSE_REPO_ROOT": str(root)}
207
208
209 def _invoke_gc(root: pathlib.Path, *extra_args: str) -> InvokeResult:
210 args = list(extra_args)
211 if "--grace-period" not in args:
212 args = ["--grace-period", "0"] + args
213 return runner.invoke(cli, ["gc"] + args, env=_env(root), catch_exceptions=False)
214
215
216 # ---------------------------------------------------------------------------
217 # Unit — GcResult new fields default to zero
218 # ---------------------------------------------------------------------------
219
220
221 class TestGcResultDefaults:
222 def test_commits_fields_default_to_zero(self) -> None:
223 r = GcResult()
224 assert r.commits_reachable == 0
225 assert r.commits_collected == 0
226 assert r.commits_collected_bytes == 0
227
228 def test_snapshots_fields_default_to_zero(self) -> None:
229 r = GcResult()
230 assert r.snapshots_reachable == 0
231 assert r.snapshots_collected == 0
232 assert r.snapshots_collected_bytes == 0
233
234 def test_full_field_defaults_to_false(self) -> None:
235 assert GcResult().full is False
236
237
238 # ---------------------------------------------------------------------------
239 # Unit — _collect_reachable_commits
240 # ---------------------------------------------------------------------------
241
242
243 class TestCollectReachableCommits:
244 def test_empty_repo_returns_empty_set(self, tmp_path: pathlib.Path) -> None:
245 root = _make_repo(tmp_path)
246 assert _collect_reachable_commits(root) == set()
247
248 def test_single_branch_single_commit(self, tmp_path: pathlib.Path) -> None:
249 root = _make_repo(tmp_path)
250 snap_id, _ = _write_snapshot_with_objects(root, {"a.py": b"x"})
251 cid = _write_commit_record(root, snap_id)
252 _set_branch(root, "main", cid)
253 assert _collect_reachable_commits(root) == {cid}
254
255 def test_traverses_parent_chain(self, tmp_path: pathlib.Path) -> None:
256 root = _make_repo(tmp_path)
257 snap_id, _ = _write_snapshot_with_objects(root, {})
258 c1 = _write_commit_record(root, snap_id, message="c1", ts_offset=0)
259 c2 = _write_commit_record(root, snap_id, parent1=c1, message="c2", ts_offset=1)
260 c3 = _write_commit_record(root, snap_id, parent1=c2, message="c3", ts_offset=2)
261 _set_branch(root, "main", c3)
262 reachable = _collect_reachable_commits(root)
263 assert {c1, c2, c3} == reachable
264
265 def test_merge_commit_both_parents_reachable(self, tmp_path: pathlib.Path) -> None:
266 root = _make_repo(tmp_path)
267 snap_id, _ = _write_snapshot_with_objects(root, {})
268 base = _write_commit_record(root, snap_id, message="base", ts_offset=0)
269 feat = _write_commit_record(root, snap_id, parent1=base, message="feat", ts_offset=1)
270 merge = _write_commit_record(root, snap_id, parent1=base, parent2=feat, message="merge", ts_offset=2)
271 _set_branch(root, "main", merge)
272 reachable = _collect_reachable_commits(root)
273 assert {base, feat, merge} == reachable
274
275 def test_multiple_branches_union(self, tmp_path: pathlib.Path) -> None:
276 root = _make_repo(tmp_path)
277 snap_id, _ = _write_snapshot_with_objects(root, {})
278 c1 = _write_commit_record(root, snap_id, message="c1", ts_offset=0)
279 c2 = _write_commit_record(root, snap_id, message="c2", ts_offset=1)
280 _set_branch(root, "main", c1)
281 _set_branch(root, "dev", c2)
282 reachable = _collect_reachable_commits(root)
283 assert {c1, c2} == reachable
284
285 def test_nested_branch_ref_traversed(self, tmp_path: pathlib.Path) -> None:
286 root = _make_repo(tmp_path)
287 snap_id, _ = _write_snapshot_with_objects(root, {})
288 cid = _write_commit_record(root, snap_id)
289 _set_branch(root, "feat/my-feature", cid)
290 reachable = _collect_reachable_commits(root)
291 assert cid in reachable
292
293 def test_missing_commit_file_skipped(self, tmp_path: pathlib.Path) -> None:
294 root = _make_repo(tmp_path)
295 # Write a branch ref pointing to a commit that doesn't exist in store.
296 ghost_hex = "a" * 64
297 _set_branch(root, "main", long_id(ghost_hex))
298 # Should not raise; ghost commit counted as reachable (it's the tip).
299 reachable = _collect_reachable_commits(root)
300 assert long_id(ghost_hex) in reachable
301
302 def test_corrupt_commit_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None:
303 root = _make_repo(tmp_path)
304 snap_id, _ = _write_snapshot_with_objects(root, {})
305 cid = _write_commit_record(root, snap_id)
306 _set_branch(root, "main", cid)
307 # Corrupt the commit file.
308 object_path(root, cid).write_bytes(b"not msgpack")
309 # Should not raise.
310 reachable = _collect_reachable_commits(root)
311 assert cid in reachable # tip is still reachable; parents can't be walked
312
313 def test_symlinked_ref_file_skipped(self, tmp_path: pathlib.Path) -> None:
314 root = _make_repo(tmp_path)
315 # Create a symlinked ref file — should be ignored.
316 ref_dir = heads_dir(root)
317 link = ref_dir / "malicious"
318 target = tmp_path / "target.txt"
319 target.write_text("a" * 64)
320 link.symlink_to(target)
321 # Shouldn't crash; symlinked ref is not followed.
322 reachable = _collect_reachable_commits(root)
323 assert len(reachable) == 0
324
325 def test_commit_only_in_orphaned_file_not_reachable(self, tmp_path: pathlib.Path) -> None:
326 root = _make_repo(tmp_path)
327 snap_id, _ = _write_snapshot_with_objects(root, {})
328 orphan = _write_commit_record(root, snap_id, message="orphan")
329 # No branch ref points to orphan.
330 reachable = _collect_reachable_commits(root)
331 assert orphan not in reachable
332
333 def test_diamond_dag_no_duplicate_walk(self, tmp_path: pathlib.Path) -> None:
334 root = _make_repo(tmp_path)
335 snap_id, _ = _write_snapshot_with_objects(root, {})
336 base = _write_commit_record(root, snap_id, message="base", ts_offset=0)
337 left = _write_commit_record(root, snap_id, parent1=base, message="left", ts_offset=1)
338 right = _write_commit_record(root, snap_id, parent1=base, message="right", ts_offset=2)
339 tip = _write_commit_record(root, snap_id, parent1=left, parent2=right, message="tip", ts_offset=3)
340 _set_branch(root, "main", tip)
341 reachable = _collect_reachable_commits(root)
342 assert reachable == {base, left, right, tip}
343
344
345 # ---------------------------------------------------------------------------
346 # Unit — _collect_reachable_snapshots
347 # ---------------------------------------------------------------------------
348
349
350 class TestCollectReachableSnapshots:
351 def test_returns_snapshot_ids_from_reachable_commits(self, tmp_path: pathlib.Path) -> None:
352 root = _make_repo(tmp_path)
353 snap_id, manifest = _write_snapshot_with_objects(root, {"f.py": b"code"})
354 cid = _write_commit_record(root, snap_id)
355 snaps, objs = _collect_reachable_snapshots(root, {cid})
356 assert snap_id in snaps
357
358 def test_returns_blob_ids_from_manifest(self, tmp_path: pathlib.Path) -> None:
359 root = _make_repo(tmp_path)
360 obj_id = _write_object(root, b"file content")
361 snap_id = compute_snapshot_id({"f.py": obj_id})
362 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={"f.py": obj_id}))
363 cid = _write_commit_record(root, snap_id)
364 _, objs = _collect_reachable_snapshots(root, {cid})
365 assert obj_id in objs
366
367 def test_empty_reachable_commits_returns_empty(self, tmp_path: pathlib.Path) -> None:
368 root = _make_repo(tmp_path)
369 snaps, objs = _collect_reachable_snapshots(root, set())
370 assert snaps == set()
371 assert objs == set()
372
373 def test_multiple_commits_same_snapshot_deduplicated(self, tmp_path: pathlib.Path) -> None:
374 root = _make_repo(tmp_path)
375 snap_id, _ = _write_snapshot_with_objects(root, {"f": b"x"})
376 c1 = _write_commit_record(root, snap_id, message="c1", ts_offset=0)
377 c2 = _write_commit_record(root, snap_id, message="c2", ts_offset=1)
378 snaps, _ = _collect_reachable_snapshots(root, {c1, c2})
379 assert snaps == {snap_id}
380
381 def test_shelf_blobs_included(self, tmp_path: pathlib.Path) -> None:
382 root = _make_repo(tmp_path)
383 shelf_obj = _write_object(root, b"shelved content")
384 _write_shelf_entry(root, {"file.py": shelf_obj})
385 _, objs = _collect_reachable_snapshots(root, set())
386 assert shelf_obj in objs
387
388 def test_missing_snapshot_file_skipped(self, tmp_path: pathlib.Path) -> None:
389 root = _make_repo(tmp_path)
390 ghost_snap_id = long_id("b" * 64)
391 cid = _write_commit_record(root, ghost_snap_id)
392 # No snapshot file on disk — should not crash.
393 snaps, objs = _collect_reachable_snapshots(root, {cid})
394 # ghost snapshot is in snaps set but its blobs can't be collected
395 assert ghost_snap_id in snaps
396 assert len(objs) == 0
397
398
399 # ---------------------------------------------------------------------------
400 # Unit — _list_stored_msgpack
401 # ---------------------------------------------------------------------------
402
403
404 class TestListStoredMsgpack:
405 def test_returns_msgpack_files(self, tmp_path: pathlib.Path) -> None:
406 d = tmp_path / "store"
407 shard = d / "sha256"
408 shard.mkdir(parents=True)
409 (shard / "abc123.msgpack").write_bytes(b"data")
410 (shard / "def456.msgpack").write_bytes(b"data2")
411 pairs = _list_stored_msgpack(d, grace_period_seconds=0)
412 stems = {stem for stem, _ in pairs}
413 assert stems == {"abc123", "def456"}
414
415 def test_non_msgpack_files_excluded(self, tmp_path: pathlib.Path) -> None:
416 d = tmp_path / "store"
417 d.mkdir()
418 (d / "abc.json").write_bytes(b"data")
419 (d / "abc.txt").write_bytes(b"data")
420 pairs = _list_stored_msgpack(d, grace_period_seconds=0)
421 assert pairs == []
422
423 def test_symlinked_file_excluded(self, tmp_path: pathlib.Path) -> None:
424 d = tmp_path / "store"
425 d.mkdir()
426 real = tmp_path / "real.msgpack"
427 real.write_bytes(b"data")
428 (d / "linked.msgpack").symlink_to(real)
429 pairs = _list_stored_msgpack(d, grace_period_seconds=0)
430 assert pairs == []
431
432 def test_grace_period_protects_recent_files(self, tmp_path: pathlib.Path) -> None:
433 d = tmp_path / "store"
434 shard = d / "sha256"
435 shard.mkdir(parents=True)
436 (shard / "recent.msgpack").write_bytes(b"data")
437 pairs = _list_stored_msgpack(d, grace_period_seconds=9999)
438 assert pairs == []
439
440 def test_grace_period_zero_includes_all(self, tmp_path: pathlib.Path) -> None:
441 d = tmp_path / "store"
442 shard = d / "sha256"
443 shard.mkdir(parents=True)
444 (shard / "old.msgpack").write_bytes(b"data")
445 pairs = _list_stored_msgpack(d, grace_period_seconds=0)
446 assert len(pairs) == 1
447
448 def test_nonexistent_directory_returns_empty(self, tmp_path: pathlib.Path) -> None:
449 pairs = _list_stored_msgpack(tmp_path / "does_not_exist", grace_period_seconds=0)
450 assert pairs == []
451
452
453 # ---------------------------------------------------------------------------
454 # Integration — run_gc(full=True)
455 # ---------------------------------------------------------------------------
456
457
458 class TestRunGcFull:
459 def test_orphaned_commit_deleted(self, tmp_path: pathlib.Path) -> None:
460 root = _make_repo(tmp_path)
461 snap_id, _ = _write_snapshot_with_objects(root, {})
462 orphan = _write_commit_record(root, snap_id, message="orphan")
463 orphan_path = object_path(root, orphan)
464 assert orphan_path.exists()
465
466 result = run_gc(root, full=True, grace_period_seconds=0)
467 assert result.commits_collected == 1
468 assert not orphan_path.exists()
469
470 def test_reachable_commit_preserved(self, tmp_path: pathlib.Path) -> None:
471 root = _make_repo(tmp_path)
472 snap_id, _ = _write_snapshot_with_objects(root, {})
473 cid = _write_commit_record(root, snap_id)
474 _set_branch(root, "main", cid)
475 cp = object_path(root, cid)
476
477 result = run_gc(root, full=True, grace_period_seconds=0)
478 assert result.commits_collected == 0
479 assert cp.exists()
480
481 def test_orphaned_snapshot_deleted(self, tmp_path: pathlib.Path) -> None:
482 root = _make_repo(tmp_path)
483 snap_id, _ = _write_snapshot_with_objects(root, {})
484 snap_path = object_path(root, snap_id)
485 assert snap_path.exists()
486 # No commit references this snapshot.
487
488 result = run_gc(root, full=True, grace_period_seconds=0)
489 assert result.snapshots_collected == 1
490 assert not snap_path.exists()
491
492 def test_reachable_snapshot_preserved(self, tmp_path: pathlib.Path) -> None:
493 root = _make_repo(tmp_path)
494 snap_id, _ = _write_snapshot_with_objects(root, {"f.py": b"code"})
495 cid = _write_commit_record(root, snap_id)
496 _set_branch(root, "main", cid)
497 snap_path = object_path(root, snap_id)
498
499 result = run_gc(root, full=True, grace_period_seconds=0)
500 assert result.snapshots_collected == 0
501 assert snap_path.exists()
502
503 def test_orphaned_blob_from_orphaned_commit_deleted(self, tmp_path: pathlib.Path) -> None:
504 root = _make_repo(tmp_path)
505 orphan_blob = _write_object(root, b"only in orphaned commit")
506 snap_id = compute_snapshot_id({"f": orphan_blob})
507 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={"f": orphan_blob}))
508 _write_commit_record(root, snap_id, message="orphan")
509 # No branch ref → orphan commit + snapshot + blob all unreachable.
510 blob_path = object_path(root, orphan_blob)
511
512 result = run_gc(root, full=True, grace_period_seconds=0)
513 assert result.commits_collected == 1
514 assert result.snapshots_collected == 1
515 assert result.collected_count == 1
516 assert not blob_path.exists()
517
518 def test_dry_run_never_deletes(self, tmp_path: pathlib.Path) -> None:
519 root = _make_repo(tmp_path)
520 snap_id, _ = _write_snapshot_with_objects(root, {})
521 orphan = _write_commit_record(root, snap_id, message="orphan")
522
523 result = run_gc(root, full=True, dry_run=True, grace_period_seconds=0)
524 assert result.dry_run is True
525 assert result.commits_collected == 1
526 assert object_path(root, orphan).exists()
527 assert object_path(root, snap_id).exists()
528
529 def test_commit_reachable_from_any_branch_preserved(self, tmp_path: pathlib.Path) -> None:
530 root = _make_repo(tmp_path)
531 snap_id, _ = _write_snapshot_with_objects(root, {})
532 shared_base = _write_commit_record(root, snap_id, message="base", ts_offset=0)
533 tip_main = _write_commit_record(root, snap_id, parent1=shared_base, message="main-tip", ts_offset=1)
534 tip_dev = _write_commit_record(root, snap_id, parent1=shared_base, message="dev-tip", ts_offset=2)
535 _set_branch(root, "main", tip_main)
536 _set_branch(root, "dev", tip_dev)
537
538 result = run_gc(root, full=True, grace_period_seconds=0)
539 assert result.commits_collected == 0
540 for cid in (shared_base, tip_main, tip_dev):
541 assert object_path(root, cid).exists()
542
543 def test_linear_chain_all_intermediates_preserved(self, tmp_path: pathlib.Path) -> None:
544 root = _make_repo(tmp_path)
545 commit_ids = _make_linear_chain(root, 10)
546
547 result = run_gc(root, full=True, grace_period_seconds=0)
548 assert result.commits_collected == 0
549 for cid in commit_ids:
550 assert object_path(root, cid).exists()
551
552 def test_merge_commit_both_parent_chains_preserved(self, tmp_path: pathlib.Path) -> None:
553 root = _make_repo(tmp_path)
554 snap_id, _ = _write_snapshot_with_objects(root, {})
555 base = _write_commit_record(root, snap_id, message="base", ts_offset=0)
556 left = _write_commit_record(root, snap_id, parent1=base, message="left", ts_offset=1)
557 right = _write_commit_record(root, snap_id, parent1=base, message="right", ts_offset=2)
558 merge = _write_commit_record(root, snap_id, parent1=left, parent2=right, message="merge", ts_offset=3)
559 _set_branch(root, "main", merge)
560
561 result = run_gc(root, full=True, grace_period_seconds=0)
562 assert result.commits_collected == 0
563 for cid in (base, left, right, merge):
564 assert object_path(root, cid).exists()
565
566 def test_grace_period_protects_recent_commit(self, tmp_path: pathlib.Path) -> None:
567 root = _make_repo(tmp_path)
568 snap_id, _ = _write_snapshot_with_objects(root, {})
569 orphan = _write_commit_record(root, snap_id)
570 # No branch ref; orphan is unreachable — but grace period protects it.
571 result = run_gc(root, full=True, grace_period_seconds=9999)
572 assert result.commits_collected == 0
573 assert object_path(root, orphan).exists()
574
575 def test_grace_period_protects_recent_snapshot(self, tmp_path: pathlib.Path) -> None:
576 root = _make_repo(tmp_path)
577 snap_id, _ = _write_snapshot_with_objects(root, {})
578 result = run_gc(root, full=True, grace_period_seconds=9999)
579 assert result.snapshots_collected == 0
580 assert object_path(root, snap_id).exists()
581
582 def test_gcresult_commits_reachable_count(self, tmp_path: pathlib.Path) -> None:
583 root = _make_repo(tmp_path)
584 snap_id, _ = _write_snapshot_with_objects(root, {})
585 cid = _write_commit_record(root, snap_id)
586 _set_branch(root, "main", cid)
587
588 result = run_gc(root, full=True, grace_period_seconds=0)
589 assert result.commits_reachable == 1
590 assert result.commits_collected == 0
591
592 def test_gcresult_snapshots_reachable_count(self, tmp_path: pathlib.Path) -> None:
593 root = _make_repo(tmp_path)
594 snap_id, _ = _write_snapshot_with_objects(root, {})
595 cid = _write_commit_record(root, snap_id)
596 _set_branch(root, "main", cid)
597
598 result = run_gc(root, full=True, grace_period_seconds=0)
599 assert result.snapshots_reachable == 1
600 assert result.snapshots_collected == 0
601
602 def test_gcresult_full_flag_set(self, tmp_path: pathlib.Path) -> None:
603 root = _make_repo(tmp_path)
604 result = run_gc(root, full=True, grace_period_seconds=0)
605 assert result.full is True
606
607 def test_without_full_flag_orphaned_commits_not_deleted(self, tmp_path: pathlib.Path) -> None:
608 """Default run_gc (full=False) must NOT prune commits or snapshots."""
609 root = _make_repo(tmp_path)
610 snap_id, _ = _write_snapshot_with_objects(root, {})
611 orphan = _write_commit_record(root, snap_id, message="orphan")
612
613 result = run_gc(root, full=False, grace_period_seconds=0)
614 assert result.commits_collected == 0
615 assert result.snapshots_collected == 0
616 assert object_path(root, orphan).exists()
617
618 def test_idempotent_second_run_collects_nothing(self, tmp_path: pathlib.Path) -> None:
619 root = _make_repo(tmp_path)
620 snap_id, _ = _write_snapshot_with_objects(root, {})
621 orphan = _write_commit_record(root, snap_id)
622
623 run_gc(root, full=True, grace_period_seconds=0)
624 result2 = run_gc(root, full=True, grace_period_seconds=0)
625 assert result2.commits_collected == 0
626 assert result2.snapshots_collected == 0
627 assert result2.collected_count == 0
628
629 def test_collected_bytes_nonzero_for_deleted_commit(self, tmp_path: pathlib.Path) -> None:
630 root = _make_repo(tmp_path)
631 snap_id, _ = _write_snapshot_with_objects(root, {})
632 _write_commit_record(root, snap_id, message="orphan")
633
634 result = run_gc(root, full=True, grace_period_seconds=0)
635 assert result.commits_collected_bytes > 0
636
637 def test_collected_bytes_nonzero_for_deleted_snapshot(self, tmp_path: pathlib.Path) -> None:
638 root = _make_repo(tmp_path)
639 snap_id, _ = _write_snapshot_with_objects(root, {"f": b"content"})
640
641 result = run_gc(root, full=True, grace_period_seconds=0)
642 assert result.snapshots_collected_bytes > 0
643
644
645 # ---------------------------------------------------------------------------
646 # CLI integration
647 # ---------------------------------------------------------------------------
648
649
650 class TestCliGcFull:
651 def test_full_text_output_has_three_lines(self, tmp_path: pathlib.Path) -> None:
652 root = _make_repo(tmp_path)
653 r = _invoke_gc(root, "--full")
654 assert r.exit_code == 0
655 lines = [ln for ln in r.output.strip().splitlines() if ln.strip()]
656 assert len(lines) == 3 # objects, commits, snapshots
657
658 def test_full_text_output_contains_commit_line(self, tmp_path: pathlib.Path) -> None:
659 root = _make_repo(tmp_path)
660 r = _invoke_gc(root, "--full")
661 assert "commit" in r.output
662
663 def test_full_text_output_contains_snapshot_line(self, tmp_path: pathlib.Path) -> None:
664 root = _make_repo(tmp_path)
665 r = _invoke_gc(root, "--full")
666 assert "snapshot" in r.output
667
668 def test_full_dry_run_prefix_on_all_lines(self, tmp_path: pathlib.Path) -> None:
669 root = _make_repo(tmp_path)
670 r = _invoke_gc(root, "--full", "--dry-run")
671 assert r.exit_code == 0
672 content_lines = [ln for ln in r.output.strip().splitlines() if ln.strip()]
673 assert all("[dry-run]" in ln for ln in content_lines)
674
675 def test_full_json_includes_commits_fields(self, tmp_path: pathlib.Path) -> None:
676 root = _make_repo(tmp_path)
677 r = _invoke_gc(root, "--full", "--json")
678 assert r.exit_code == 0
679 data = json.loads(r.output.strip())
680 assert "commits_reachable" in data
681 assert "commits_collected" in data
682 assert "commits_collected_bytes" in data
683
684 def test_full_json_includes_snapshots_fields(self, tmp_path: pathlib.Path) -> None:
685 root = _make_repo(tmp_path)
686 r = _invoke_gc(root, "--full", "--json")
687 data = json.loads(r.output.strip())
688 assert "snapshots_reachable" in data
689 assert "snapshots_collected" in data
690 assert "snapshots_collected_bytes" in data
691
692 def test_full_json_field_types(self, tmp_path: pathlib.Path) -> None:
693 root = _make_repo(tmp_path)
694 r = _invoke_gc(root, "--full", "--json")
695 data = json.loads(r.output.strip())
696 assert isinstance(data["commits_reachable"], int)
697 assert isinstance(data["commits_collected"], int)
698 assert isinstance(data["commits_collected_bytes"], int)
699 assert isinstance(data["snapshots_reachable"], int)
700 assert isinstance(data["snapshots_collected"], int)
701 assert isinstance(data["snapshots_collected_bytes"], int)
702 assert isinstance(data["full"], bool)
703
704 def test_full_json_full_field_true(self, tmp_path: pathlib.Path) -> None:
705 root = _make_repo(tmp_path)
706 r = _invoke_gc(root, "--full", "--json")
707 data = json.loads(r.output.strip())
708 assert data["full"] is True
709
710 def test_no_full_json_full_field_false(self, tmp_path: pathlib.Path) -> None:
711 root = _make_repo(tmp_path)
712 r = _invoke_gc(root, "--json")
713 data = json.loads(r.output.strip())
714 assert data["full"] is False
715
716 def test_full_json_dry_run_field(self, tmp_path: pathlib.Path) -> None:
717 root = _make_repo(tmp_path)
718 r = _invoke_gc(root, "--full", "--dry-run", "--json")
719 data = json.loads(r.output.strip())
720 assert data["dry_run"] is True
721
722 def test_full_zero_orphans_reports_zeros(self, tmp_path: pathlib.Path) -> None:
723 root = _make_repo(tmp_path)
724 snap_id, _ = _write_snapshot_with_objects(root, {})
725 cid = _write_commit_record(root, snap_id)
726 _set_branch(root, "main", cid)
727 r = _invoke_gc(root, "--full", "--json")
728 data = json.loads(r.output.strip())
729 assert data["commits_collected"] == 0
730 assert data["snapshots_collected"] == 0
731
732 def test_full_reports_correct_collected_counts(self, tmp_path: pathlib.Path) -> None:
733 root = _make_repo(tmp_path)
734 # 3 orphaned commits, 3 orphaned snapshots
735 for i in range(3):
736 snap_id, _ = _write_snapshot_with_objects(root, {f"f{i}": f"v{i}".encode()})
737 _write_commit_record(root, snap_id, message=f"orphan-{i}", ts_offset=i)
738 r = _invoke_gc(root, "--full", "--json")
739 data = json.loads(r.output.strip())
740 assert data["commits_collected"] == 3
741 assert data["snapshots_collected"] == 3
742
743 def test_no_full_json_schema_unchanged(self, tmp_path: pathlib.Path) -> None:
744 """Without --full, the new fields are present but zero."""
745 root = _make_repo(tmp_path)
746 r = _invoke_gc(root, "--json")
747 data = json.loads(r.output.strip())
748 # Old fields still present.
749 assert "collected_count" in data
750 assert "reachable_count" in data
751 # New fields present but zero.
752 assert data["commits_collected"] == 0
753 assert data["snapshots_collected"] == 0
754
755
756 # ---------------------------------------------------------------------------
757 # E2E tests
758 # ---------------------------------------------------------------------------
759
760
761 class TestGcFullE2E:
762 def test_full_lifecycle_orphans_accumulate_then_freed(self, tmp_path: pathlib.Path) -> None:
763 root = _make_repo(tmp_path)
764
765 # Create a live commit on main.
766 live_snap, _ = _write_snapshot_with_objects(root, {"app.py": b"app code"})
767 live_cid = _write_commit_record(root, live_snap)
768 _set_branch(root, "main", live_cid)
769
770 # Simulate abandoned work: write orphaned commits/snapshots.
771 for i in range(5):
772 snap_id, _ = _write_snapshot_with_objects(root, {f"draft{i}.py": f"draft{i}".encode()})
773 _write_commit_record(root, snap_id, message=f"abandoned-{i}", ts_offset=i + 10)
774
775 result = run_gc(root, full=True, grace_period_seconds=0)
776 assert result.commits_collected == 5
777 assert result.snapshots_collected == 5
778 assert result.commits_reachable == 1
779 assert result.snapshots_reachable == 1
780
781 # Live commit and snapshot still intact.
782 assert object_path(root, live_cid).exists()
783 assert object_path(root, live_snap).exists()
784
785 def test_shelf_blobs_protected_under_full(self, tmp_path: pathlib.Path) -> None:
786 root = _make_repo(tmp_path)
787 shelf_obj = _write_object(root, b"shelved work")
788 _write_shelf_entry(root, {"work.py": shelf_obj})
789
790 result = run_gc(root, full=True, grace_period_seconds=0)
791 assert result.collected_count == 0
792 blob_path = object_path(root, shelf_obj)
793 assert blob_path.exists()
794
795 def test_rewrite_history_old_commits_removed(self, tmp_path: pathlib.Path) -> None:
796 """Simulate a history rewrite: old commits orphaned, new commits on branch."""
797 root = _make_repo(tmp_path)
798 snap1, _ = _write_snapshot_with_objects(root, {"v1.py": b"v1"})
799 old_cid = _write_commit_record(root, snap1, message="old")
800
801 snap2, _ = _write_snapshot_with_objects(root, {"v2.py": b"v2"})
802 new_cid = _write_commit_record(root, snap2, message="new (rewrite)")
803 _set_branch(root, "main", new_cid)
804 # old_cid is now orphaned.
805
806 result = run_gc(root, full=True, grace_period_seconds=0)
807 assert result.commits_collected == 1
808 assert result.snapshots_collected == 1
809 assert not object_path(root, old_cid).exists()
810 assert object_path(root, new_cid).exists()
811
812 def test_two_branches_then_one_deleted_unique_commits_freed(
813 self, tmp_path: pathlib.Path
814 ) -> None:
815 root = _make_repo(tmp_path)
816 shared_snap, _ = _write_snapshot_with_objects(root, {"base.py": b"base"})
817 base_cid = _write_commit_record(root, shared_snap, message="base", ts_offset=0)
818
819 feat_snap, _ = _write_snapshot_with_objects(root, {"feat.py": b"feat"})
820 feat_cid = _write_commit_record(root, feat_snap, parent1=base_cid, message="feat", ts_offset=1)
821
822 _set_branch(root, "main", base_cid)
823 _set_branch(root, "dev", feat_cid)
824
825 # "Delete" feat branch by removing its ref.
826 (heads_dir(root) / "dev").unlink()
827
828 result = run_gc(root, full=True, grace_period_seconds=0)
829 # feat_cid unique to dev is now orphaned.
830 assert result.commits_collected == 1
831 assert not object_path(root, feat_cid).exists()
832 # base_cid still on main is preserved.
833 assert object_path(root, base_cid).exists()
834
835
836 # ---------------------------------------------------------------------------
837 # Security tests
838 # ---------------------------------------------------------------------------
839
840
841 class TestGcFullSecurity:
842 def test_symlinked_commit_file_not_deleted(self, tmp_path: pathlib.Path) -> None:
843 root = _make_repo(tmp_path)
844 real_file = tmp_path / "real_commit.msgpack"
845 real_file.write_bytes(msgpack.packb({"commit_id": "a" * 64}, use_bin_type=True))
846 link = commits_dir(root) / "linked.msgpack"
847 link.symlink_to(real_file)
848
849 run_gc(root, full=True, grace_period_seconds=0)
850 assert real_file.exists(), "Target of symlink must not be deleted"
851
852 def test_symlinked_snapshot_file_not_deleted(self, tmp_path: pathlib.Path) -> None:
853 root = _make_repo(tmp_path)
854 real_file = tmp_path / "real_snap.msgpack"
855 real_file.write_bytes(msgpack.packb({"snapshot_id": "b" * 64}, use_bin_type=True))
856 link = snapshots_dir(root) / "linked.msgpack"
857 link.symlink_to(real_file)
858
859 run_gc(root, full=True, grace_period_seconds=0)
860 assert real_file.exists(), "Target of snapshot symlink must not be deleted"
861
862 def test_non_msgpack_file_in_commits_dir_not_deleted(self, tmp_path: pathlib.Path) -> None:
863 root = _make_repo(tmp_path)
864 stray = commits_dir(root) / "README.txt"
865 stray.write_text("not a commit")
866
867 run_gc(root, full=True, grace_period_seconds=0)
868 assert stray.exists()
869
870 def test_non_msgpack_file_in_snapshots_dir_not_deleted(self, tmp_path: pathlib.Path) -> None:
871 root = _make_repo(tmp_path)
872 stray = snapshots_dir(root) / ".DS_Store"
873 stray.write_bytes(b"junk")
874
875 run_gc(root, full=True, grace_period_seconds=0)
876 assert stray.exists()
877
878
879 # ---------------------------------------------------------------------------
880 # Stress tests
881 # ---------------------------------------------------------------------------
882
883
884 class TestGcFullStress:
885 def test_200_orphaned_commits_and_snapshots_collected(self, tmp_path: pathlib.Path) -> None:
886 root = _make_repo(tmp_path)
887 # Live commit that must survive.
888 live_snap, _ = _write_snapshot_with_objects(root, {"live.py": b"live"})
889 live_cid = _write_commit_record(root, live_snap)
890 _set_branch(root, "main", live_cid)
891
892 # 200 orphaned commit+snapshot pairs.
893 for i in range(200):
894 snap_id, _ = _write_snapshot_with_objects(root, {f"f{i}.py": f"v{i}".encode()})
895 _write_commit_record(root, snap_id, message=f"orphan-{i}", ts_offset=i + 1)
896
897 result = run_gc(root, full=True, grace_period_seconds=0)
898 assert result.commits_collected == 200
899 assert result.snapshots_collected == 200
900 assert result.commits_reachable == 1
901 assert result.snapshots_reachable == 1
902 # Live commit and snapshot intact.
903 assert object_path(root, live_cid).exists()
904 assert object_path(root, live_snap).exists()
905
906 def test_deep_100_commit_chain_all_preserved(self, tmp_path: pathlib.Path) -> None:
907 root = _make_repo(tmp_path)
908 commit_ids = _make_linear_chain(root, 100)
909
910 result = run_gc(root, full=True, grace_period_seconds=0)
911 assert result.commits_collected == 0
912 assert result.commits_reachable == 100
913 for cid in commit_ids:
914 assert object_path(root, cid).exists()
915
916 def test_50_branches_all_commits_preserved(self, tmp_path: pathlib.Path) -> None:
917 root = _make_repo(tmp_path)
918 snap_id, _ = _write_snapshot_with_objects(root, {})
919 for i in range(50):
920 cid = _write_commit_record(root, snap_id, message=f"branch-{i}", ts_offset=i)
921 _set_branch(root, f"feat/branch-{i:02d}", cid)
922
923 result = run_gc(root, full=True, grace_period_seconds=0)
924 assert result.commits_collected == 0
925 assert result.commits_reachable == 50