"""Section 32 — Garbage Collector: 7-layer test suite. Covers: musehub/services/musehub_gc.py — GCResult, run_gc musehub/api/routes/wire.py — _run_gc_async (fire-and-forget background task) Key behaviour: - run_gc collects all branch head commit IDs then BFS through parent_ids - Commits reachable from any branch head are preserved - Orphaned commits (not reachable from any branch) are deleted - Snapshots referenced only by orphaned commits are deleted - Snapshots also referenced by a reachable commit are preserved - Repos with no branch heads → GC skips (returns empty result) - Repos already clean → GC is a no-op (returns 0 deletes) - run_gc commits the session itself """ from __future__ import annotations import secrets from datetime import datetime, timezone import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import ( MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef, ) from musehub.services.musehub_gc import GCResult, run_gc # ── helpers ─────────────────────────────────────────────────────────────────── def _uid() -> str: return secrets.token_hex(16) def _cid() -> str: return secrets.token_hex(8) def _now() -> datetime: return datetime.now(tz=timezone.utc) async def _db_repo(session: AsyncSession) -> MusehubRepo: slug = f"gc-repo-{_uid()[:8]}" created_at = _now() owner_id = compute_identity_id(b"testuser") repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, slug=slug, owner="testuser", owner_user_id=owner_id, visibility="private", created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() return repo async def _db_branch( session: AsyncSession, repo_id: str, *, name: str = "dev", head_commit_id: str | None = None, ) -> MusehubBranch: branch = MusehubBranch( branch_id=_uid(), repo_id=repo_id, name=name, head_commit_id=head_commit_id, ) session.add(branch) await session.flush() return branch async def _db_commit( session: AsyncSession, repo_id: str, *, commit_id: str | None = None, parent_ids: list[str] | None = None, snapshot_id: str | None = None, branch: str = "dev", ) -> MusehubCommit: commit = MusehubCommit( commit_id=commit_id or _cid(), branch=branch, parent_ids=parent_ids or [], message="test commit", author="testuser", timestamp=_now(), snapshot_id=snapshot_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit.commit_id)) await session.flush() return commit async def _db_snapshot( session: AsyncSession, repo_id: str, *, snapshot_id: str | None = None, ) -> MusehubSnapshot: snap = MusehubSnapshot( snapshot_id=snapshot_id or _cid(), manifest_blob=b"", created_at=_now(), ) session.add(snap) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap.snapshot_id)) await session.flush() return snap # ═══════════════════════════════════════════════════════════════════════════════ # Layer 1 — Unit # ═══════════════════════════════════════════════════════════════════════════════ class TestUnitGC: def test_gcresult_defaults(self) -> None: r = GCResult(repo_id="repo-abc") assert r.commits_deleted == 0 assert r.snapshots_deleted == 0 assert r.reachable_commit_count == 0 assert r.errors == [] def test_gcresult_is_dataclass(self) -> None: import dataclasses assert dataclasses.is_dataclass(GCResult) def test_gcresult_with_values(self) -> None: r = GCResult( repo_id="abc", commits_deleted=5, snapshots_deleted=3, reachable_commit_count=10, ) assert r.commits_deleted == 5 assert r.snapshots_deleted == 3 assert r.reachable_commit_count == 10 def test_gcresult_errors_is_list(self) -> None: r = GCResult(repo_id="x") r.errors.append("something failed") assert len(r.errors) == 1 def test_bfs_reachability_logic(self) -> None: """Verify BFS logic in isolation using the same algorithm as run_gc.""" # Simulate a simple commit graph: # head → c2 → c1 → root # ↑ # orphan (not reachable from head) all_commits = { "head": ["c2"], "c2": ["c1"], "c1": ["root"], "root": [], "orphan": ["root"], # orphan points to root but no branch points to it } heads = ["head"] reachable: set[str] = set() queue = list(heads) while queue: cid = queue.pop() if cid in reachable or cid not in all_commits: continue reachable.add(cid) queue.extend(all_commits[cid]) assert "head" in reachable assert "c2" in reachable assert "c1" in reachable assert "root" in reachable assert "orphan" not in reachable def test_bfs_handles_merge_commits(self) -> None: """Merge commits have two parents — BFS must traverse both.""" all_commits = { "merge": ["left", "right"], "left": ["base"], "right": ["base"], "base": [], } heads = ["merge"] reachable: set[str] = set() queue = list(heads) while queue: cid = queue.pop() if cid in reachable or cid not in all_commits: continue reachable.add(cid) queue.extend(all_commits[cid]) assert reachable == {"merge", "left", "right", "base"} def test_bfs_handles_cycle_guard(self) -> None: """Circular parent references must not infinite-loop (already-visited guard).""" all_commits = {"a": ["b"], "b": ["a"]} heads = ["a"] reachable: set[str] = set() queue = list(heads) while queue: cid = queue.pop() if cid in reachable or cid not in all_commits: continue reachable.add(cid) queue.extend(all_commits[cid]) assert reachable == {"a", "b"} # ═══════════════════════════════════════════════════════════════════════════════ # Layer 2 — Integration # ═══════════════════════════════════════════════════════════════════════════════ class TestIntegrationGC: async def test_gc_clean_repo_no_deletes(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) c1 = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, head_commit_id=c1.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.snapshots_deleted == 0 assert result.reachable_commit_count == 1 async def test_gc_no_branch_heads_skips(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) # Branch with no head_commit_id await _db_branch(db_session, repo.repo_id, head_commit_id=None) await _db_commit(db_session, repo.repo_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) # No heads → GC skips immediately; nothing deleted assert result.commits_deleted == 0 assert result.reachable_commit_count == 0 async def test_gc_deletes_orphaned_commit(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) live = await _db_commit(db_session, repo.repo_id) orphan = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, head_commit_id=live.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 assert result.reachable_commit_count == 1 # Verify orphan is gone row = await db_session.get(MusehubCommit, orphan.commit_id) assert row is None async def test_gc_preserves_reachable_chain(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) root_cid = _cid() mid_cid = _cid() head_cid = _cid() root = await _db_commit(db_session, repo.repo_id, commit_id=root_cid) mid = await _db_commit( db_session, repo.repo_id, commit_id=mid_cid, parent_ids=[root_cid] ) head = await _db_commit( db_session, repo.repo_id, commit_id=head_cid, parent_ids=[mid_cid] ) await _db_branch(db_session, repo.repo_id, head_commit_id=head_cid) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 3 for cid in [root_cid, mid_cid, head_cid]: row = await db_session.get(MusehubCommit, cid) assert row is not None async def test_gc_deletes_orphaned_snapshot(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) snap = await _db_snapshot(db_session, repo.repo_id) live = await _db_commit(db_session, repo.repo_id) orphan = await _db_commit( db_session, repo.repo_id, snapshot_id=snap.snapshot_id ) await _db_branch(db_session, repo.repo_id, head_commit_id=live.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 assert result.snapshots_deleted == 1 snap_row = await db_session.get(MusehubSnapshot, snap.snapshot_id) assert snap_row is None async def test_gc_preserves_snapshot_referenced_by_live_commit( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) shared_snap = await _db_snapshot(db_session, repo.repo_id) # Both reachable and orphan point to same snapshot live_cid = _cid() orphan_cid = _cid() await _db_commit( db_session, repo.repo_id, commit_id=live_cid, snapshot_id=shared_snap.snapshot_id ) await _db_commit( db_session, repo.repo_id, commit_id=orphan_cid, snapshot_id=shared_snap.snapshot_id ) await _db_branch(db_session, repo.repo_id, head_commit_id=live_cid) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 # orphan commit removed assert result.snapshots_deleted == 0 # snapshot still used by live commit snap_row = await db_session.get(MusehubSnapshot, shared_snap.snapshot_id) assert snap_row is not None async def test_gc_multiple_branches_union_of_reachable( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) c1 = await _db_commit(db_session, repo.repo_id) c2 = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, name="dev", head_commit_id=c1.commit_id) await _db_branch(db_session, repo.repo_id, name="main", head_commit_id=c2.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 2 async def test_gc_returns_gcresult(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) c = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, head_commit_id=c.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert isinstance(result, GCResult) assert result.repo_id == repo.repo_id async def test_gc_only_affects_target_repo(self, db_session: AsyncSession) -> None: repo1 = await _db_repo(db_session) repo2 = await _db_repo(db_session) live = await _db_commit(db_session, repo1.repo_id) r2_orphan = await _db_commit(db_session, repo2.repo_id) await _db_branch(db_session, repo1.repo_id, head_commit_id=live.commit_id) await _db_branch(db_session, repo2.repo_id, head_commit_id=None) await db_session.commit() # Run GC on repo1 only result = await run_gc(db_session, repo1.repo_id) assert result.commits_deleted == 0 # repo2's commit must still exist row = await db_session.get(MusehubCommit, r2_orphan.commit_id) assert row is not None # ═══════════════════════════════════════════════════════════════════════════════ # Layer 3 — End-to-End # ═══════════════════════════════════════════════════════════════════════════════ class TestE2EGC: """GC has no direct HTTP endpoint; _run_gc_async is fire-and-forget after push. We test GC end-to-end by calling run_gc directly after setting up realistic repo state and verifying full database consistency. """ async def test_e2e_gc_linear_history(self, db_session: AsyncSession) -> None: """Full pipeline: 5-commit linear chain, 2 orphans, run GC, verify state.""" repo = await _db_repo(db_session) ids = [_cid() for _ in range(7)] # Chain: 0←1←2←3←4 (reachable); 5, 6 (orphans) for i, cid in enumerate(ids[:5]): parents = [ids[i - 1]] if i > 0 else [] await _db_commit( db_session, repo.repo_id, commit_id=cid, parent_ids=parents ) for cid in ids[5:]: await _db_commit(db_session, repo.repo_id, commit_id=cid) await _db_branch(db_session, repo.repo_id, head_commit_id=ids[4]) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.reachable_commit_count == 5 assert result.commits_deleted == 2 for cid in ids[:5]: assert await db_session.get(MusehubCommit, cid) is not None for cid in ids[5:]: assert await db_session.get(MusehubCommit, cid) is None async def test_e2e_gc_empty_repo(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.snapshots_deleted == 0 async def test_e2e_gc_snapshot_lifecycle(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) live_snap = await _db_snapshot(db_session, repo.repo_id) dead_snap = await _db_snapshot(db_session, repo.repo_id) live_cid = _cid() dead_cid = _cid() await _db_commit( db_session, repo.repo_id, commit_id=live_cid, snapshot_id=live_snap.snapshot_id ) await _db_commit( db_session, repo.repo_id, commit_id=dead_cid, snapshot_id=dead_snap.snapshot_id ) await _db_branch(db_session, repo.repo_id, head_commit_id=live_cid) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 assert result.snapshots_deleted == 1 assert await db_session.get(MusehubSnapshot, live_snap.snapshot_id) is not None assert await db_session.get(MusehubSnapshot, dead_snap.snapshot_id) is None # ═══════════════════════════════════════════════════════════════════════════════ # Layer 4 — Stress # ═══════════════════════════════════════════════════════════════════════════════ class TestStressGC: async def test_gc_large_orphan_set(self, db_session: AsyncSession) -> None: """GC must handle a repo with 200 orphaned commits.""" repo = await _db_repo(db_session) live = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, head_commit_id=live.commit_id) for _ in range(200): await _db_commit(db_session, repo.repo_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 200 assert result.reachable_commit_count == 1 async def test_gc_deep_chain(self, db_session: AsyncSession) -> None: """GC must traverse a 100-commit linear chain without stack overflow.""" repo = await _db_repo(db_session) ids = [_cid() for _ in range(100)] for i, cid in enumerate(ids): parents = [ids[i - 1]] if i > 0 else [] await _db_commit( db_session, repo.repo_id, commit_id=cid, parent_ids=parents ) await _db_branch(db_session, repo.repo_id, head_commit_id=ids[-1]) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 100 async def test_gc_many_orphaned_snapshots(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) live = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, head_commit_id=live.commit_id) for _ in range(50): snap = await _db_snapshot(db_session, repo.repo_id) await _db_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 50 assert result.snapshots_deleted == 50 async def test_gc_idempotent_on_clean_repo(self, db_session: AsyncSession) -> None: """Running GC twice on an already-clean repo must be a no-op both times.""" repo = await _db_repo(db_session) c = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, head_commit_id=c.commit_id) await db_session.commit() r1 = await run_gc(db_session, repo.repo_id) r2 = await run_gc(db_session, repo.repo_id) assert r1.commits_deleted == 0 assert r2.commits_deleted == 0 # ═══════════════════════════════════════════════════════════════════════════════ # Layer 5 — Data Integrity # ═══════════════════════════════════════════════════════════════════════════════ class TestDataIntegrityGC: async def test_gc_does_not_delete_head_commit(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) head = await _db_commit(db_session, repo.repo_id) await _db_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) await db_session.commit() await run_gc(db_session, repo.repo_id) row = await db_session.get(MusehubCommit, head.commit_id) assert row is not None async def test_gc_does_not_delete_live_snapshot(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) snap = await _db_snapshot(db_session, repo.repo_id) c = await _db_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await _db_branch(db_session, repo.repo_id, head_commit_id=c.commit_id) await db_session.commit() await run_gc(db_session, repo.repo_id) snap_row = await db_session.get(MusehubSnapshot, snap.snapshot_id) assert snap_row is not None async def test_gc_counts_match_actual_deletes(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) live = await _db_commit(db_session, repo.repo_id) snaps = [await _db_snapshot(db_session, repo.repo_id) for _ in range(3)] for snap in snaps: await _db_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await _db_branch(db_session, repo.repo_id, head_commit_id=live.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 3 assert result.snapshots_deleted == 3 # Verify actual DB state matches reported counts remaining_commits = await db_session.execute( select(MusehubCommit) .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) .where(MusehubCommitRef.repo_id == repo.repo_id) ) assert len(remaining_commits.scalars().all()) == 1 remaining_snaps = await db_session.execute( select(MusehubSnapshot) .join(MusehubSnapshotRef, MusehubSnapshotRef.snapshot_id == MusehubSnapshot.snapshot_id) .where(MusehubSnapshotRef.repo_id == repo.repo_id) ) assert len(remaining_snaps.scalars().all()) == 0 async def test_gc_merge_commit_both_parents_preserved( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) base_cid = _cid() left_cid = _cid() right_cid = _cid() merge_cid = _cid() await _db_commit(db_session, repo.repo_id, commit_id=base_cid) await _db_commit( db_session, repo.repo_id, commit_id=left_cid, parent_ids=[base_cid] ) await _db_commit( db_session, repo.repo_id, commit_id=right_cid, parent_ids=[base_cid] ) await _db_commit( db_session, repo.repo_id, commit_id=merge_cid, parent_ids=[left_cid, right_cid] ) await _db_branch(db_session, repo.repo_id, head_commit_id=merge_cid) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 4 for cid in [base_cid, left_cid, right_cid, merge_cid]: assert await db_session.get(MusehubCommit, cid) is not None async def test_gc_commit_with_no_snapshot_skips_snapshot_delete( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) live = await _db_commit(db_session, repo.repo_id) # orphan has no snapshot await _db_commit(db_session, repo.repo_id, snapshot_id=None) await _db_branch(db_session, repo.repo_id, head_commit_id=live.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 assert result.snapshots_deleted == 0 # ═══════════════════════════════════════════════════════════════════════════════ # Layer 6 — Security # ═══════════════════════════════════════════════════════════════════════════════ class TestSecurityGC: async def test_gc_does_not_cross_repo_boundaries(self, db_session: AsyncSession) -> None: """GC for repo1 must never delete commits belonging to repo2.""" repo1 = await _db_repo(db_session) repo2 = await _db_repo(db_session) # repo2 has a commit not referenced by any branch r2_commit = await _db_commit(db_session, repo2.repo_id) # repo1 has a live commit r1_live = await _db_commit(db_session, repo1.repo_id) await _db_branch(db_session, repo1.repo_id, head_commit_id=r1_live.commit_id) await _db_branch(db_session, repo2.repo_id, head_commit_id=None) await db_session.commit() await run_gc(db_session, repo1.repo_id) # repo2's commit must still be there row = await db_session.get(MusehubCommit, r2_commit.commit_id) assert row is not None async def test_gc_nonexistent_repo_returns_empty_result( self, db_session: AsyncSession ) -> None: """Calling GC on a non-existent repo_id must not raise and return empty result.""" result = await run_gc(db_session, "nonexistent-repo-id") assert result.commits_deleted == 0 assert result.reachable_commit_count == 0 async def test_gc_with_unknown_parent_ids_does_not_crash( self, db_session: AsyncSession ) -> None: """Commits that reference parent IDs not in the DB (dangling refs) are handled.""" repo = await _db_repo(db_session) head_cid = _cid() # parent_ids references a commit that doesn't exist in DB await _db_commit( db_session, repo.repo_id, commit_id=head_cid, parent_ids=["phantom-commit-id-not-in-db"] ) await _db_branch(db_session, repo.repo_id, head_commit_id=head_cid) await db_session.commit() # BFS encounters unknown parent, skips it — must not raise result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 1 # ═══════════════════════════════════════════════════════════════════════════════ # Layer 7 — Performance # ═══════════════════════════════════════════════════════════════════════════════ class TestPerformanceGC: async def test_gc_completes_quickly_small_repo(self, db_session: AsyncSession) -> None: import time repo = await _db_repo(db_session) ids = [_cid() for _ in range(20)] for i, cid in enumerate(ids): parents = [ids[i - 1]] if i > 0 else [] await _db_commit(db_session, repo.repo_id, commit_id=cid, parent_ids=parents) await _db_branch(db_session, repo.repo_id, head_commit_id=ids[-1]) await db_session.commit() start = time.perf_counter() result = await run_gc(db_session, repo.repo_id) elapsed = time.perf_counter() - start assert result.commits_deleted == 0 assert elapsed < 1.0 async def test_gc_with_mixed_load(self, db_session: AsyncSession) -> None: import time repo = await _db_repo(db_session) live_ids = [_cid() for _ in range(30)] for i, cid in enumerate(live_ids): parents = [live_ids[i - 1]] if i > 0 else [] await _db_commit(db_session, repo.repo_id, commit_id=cid, parent_ids=parents) for _ in range(50): snap = await _db_snapshot(db_session, repo.repo_id) await _db_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await _db_branch(db_session, repo.repo_id, head_commit_id=live_ids[-1]) await db_session.commit() start = time.perf_counter() result = await run_gc(db_session, repo.repo_id) elapsed = time.perf_counter() - start assert result.commits_deleted == 50 assert result.snapshots_deleted == 50 assert elapsed < 2.0