"""Section 41 — GC & Background Tasks: 7-layer test suite. Covers: - musehub/services/musehub_gc.py::GCResult, run_gc Background jobs (symbol indexing, GC) are now dispatched via enqueue_job and executed by the job runner — there are no standalone _build_symbol_index_async or _run_gc_async coroutines in wire.py. """ from __future__ import annotations import asyncio import secrets import time from datetime import UTC, datetime import msgpack import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import ( MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef, ) from musehub.services.musehub_gc import GCResult, run_gc # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _uid() -> str: return secrets.token_hex(16) def _hex(n: int = 32) -> str: return secrets.token_hex(n) async def _mk_repo(session: AsyncSession, suffix: str = "") -> MusehubRepo: slug = f"gc-test{suffix}-{_uid()[:8]}" created_at = datetime.now(tz=UTC) owner_id = compute_identity_id(b"testuser") repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=f"gc-test{suffix}", owner="testuser", slug=slug, owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() return repo async def _mk_branch( session: AsyncSession, repo_id: str, head_commit_id: str | None = None, name: str = "main", ) -> MusehubBranch: branch = MusehubBranch( branch_id=_uid(), repo_id=repo_id, name=name, head_commit_id=head_commit_id, ) session.add(branch) await session.flush() return branch async def _mk_commit( session: AsyncSession, repo_id: str, commit_id: str | None = None, parent_ids: list[str] | None = None, snapshot_id: str | None = None, ) -> MusehubCommit: commit = MusehubCommit( commit_id=commit_id or _hex(), branch="main", parent_ids=parent_ids or [], message="test commit", author="testuser", timestamp=datetime.now(UTC), snapshot_id=snapshot_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit.commit_id)) await session.flush() return commit async def _mk_snapshot( session: AsyncSession, repo_id: str, snapshot_id: str | None = None ) -> MusehubSnapshot: snap = MusehubSnapshot( snapshot_id=snapshot_id or _hex(), manifest_blob=msgpack.packb({}, use_bin_type=True), entry_count=0, ) session.add(snap) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap.snapshot_id)) await session.flush() return snap async def _count_commits(session: AsyncSession, repo_id: str) -> int: result = await session.execute( select(MusehubCommit) .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) .where(MusehubCommitRef.repo_id == repo_id) ) return len(result.scalars().all()) # ───────────────────────────────────────────────────────────────────────────── # LAYER 1 — UNIT # ───────────────────────────────────────────────────────────────────────────── class TestGCResultUnit: """Unit: GCResult dataclass shape and defaults.""" def test_default_counts_are_zero(self) -> None: r = GCResult(repo_id="abc") assert r.commits_deleted == 0 assert r.snapshots_deleted == 0 assert r.reachable_commit_count == 0 def test_default_errors_is_empty_list(self) -> None: r = GCResult(repo_id="abc") assert r.errors == [] def test_errors_is_independent_per_instance(self) -> None: a = GCResult(repo_id="a") b = GCResult(repo_id="b") a.errors.append("x") assert b.errors == [] def test_fields_set_correctly(self) -> None: r = GCResult(repo_id="x", commits_deleted=3, snapshots_deleted=1, reachable_commit_count=5) assert r.repo_id == "x" assert r.commits_deleted == 3 assert r.snapshots_deleted == 1 assert r.reachable_commit_count == 5 class TestRunGcNoBranches: """Unit: run_gc early-exit when repo has no branches.""" async def test_no_branches_returns_zero_deletions(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 0 async def test_branch_with_null_head_is_ignored(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) await _mk_branch(db_session, repo.repo_id, head_commit_id=None) result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 async def test_unknown_repo_id_returns_empty_result(self, db_session: AsyncSession) -> None: result = await run_gc(db_session, "nonexistent-repo-id") assert result.commits_deleted == 0 assert result.reachable_commit_count == 0 # ───────────────────────────────────────────────────────────────────────────── # LAYER 2 — INTEGRATION # ───────────────────────────────────────────────────────────────────────────── class TestRunGcIntegration: """Integration: run_gc with real in-memory DB.""" async def test_clean_repo_deletes_nothing(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) commit = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=commit.commit_id) result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 1 async def test_orphaned_commit_is_deleted(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) # reachable commit head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) # orphaned commit — not on any branch await _mk_commit(db_session, repo.repo_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 assert result.reachable_commit_count == 1 async def test_orphaned_chain_all_deleted(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) # chain of 3 orphaned commits c1 = await _mk_commit(db_session, repo.repo_id) c2 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c1.commit_id]) c3 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c2.commit_id]) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 3 assert result.reachable_commit_count == 1 async def test_reachable_commit_chain_untouched(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) c1 = await _mk_commit(db_session, repo.repo_id) c2 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c1.commit_id]) c3 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c2.commit_id]) await _mk_branch(db_session, repo.repo_id, head_commit_id=c3.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 3 async def test_gc_scoped_to_repo(self, db_session: AsyncSession) -> None: """Orphaned commits in repo_a are not touched when GC runs on repo_b.""" repo_a = await _mk_repo(db_session, "-a") repo_b = await _mk_repo(db_session, "-b") # repo_b: clean head_b = await _mk_commit(db_session, repo_b.repo_id) await _mk_branch(db_session, repo_b.repo_id, head_commit_id=head_b.commit_id) # repo_a: orphaned commit await _mk_commit(db_session, repo_a.repo_id) await db_session.commit() result = await run_gc(db_session, repo_b.repo_id) assert result.commits_deleted == 0 # repo_a's orphan still exists assert await _count_commits(db_session, repo_a.repo_id) == 1 async def test_orphaned_snapshot_deleted_with_commit(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) snap = await _mk_snapshot(db_session, repo.repo_id) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) # orphaned commit references a snapshot await _mk_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 assert result.snapshots_deleted == 1 # ───────────────────────────────────────────────────────────────────────────── # LAYER 3 — E2E # ───────────────────────────────────────────────────────────────────────────── class TestBackgroundTaskE2E: """E2E: run_gc runs to completion against the test DB.""" async def test_run_gc_completes_on_real_db(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 async def test_run_gc_with_orphan_completes(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) await _mk_commit(db_session, repo.repo_id) # orphan await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 # ───────────────────────────────────────────────────────────────────────────── # LAYER 4 — STRESS # ───────────────────────────────────────────────────────────────────────────── class TestGCStress: """Stress: GC under large commit volumes.""" async def test_gc_100_commit_linear_chain_all_reachable(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) parent_id: str | None = None commits = [] for _ in range(100): c = await _mk_commit(db_session, repo.repo_id, parent_ids=[parent_id] if parent_id else []) commits.append(c) parent_id = c.commit_id await _mk_branch(db_session, repo.repo_id, head_commit_id=commits[-1].commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 100 async def test_gc_50_orphaned_commits_all_deleted(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) for _ in range(50): await _mk_commit(db_session, repo.repo_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 50 async def test_gc_diamond_merge_topology(self, db_session: AsyncSession) -> None: """Diamond: base → left + right → merge; all 4 commits are reachable.""" repo = await _mk_repo(db_session) base = await _mk_commit(db_session, repo.repo_id) left = await _mk_commit(db_session, repo.repo_id, parent_ids=[base.commit_id]) right = await _mk_commit(db_session, repo.repo_id, parent_ids=[base.commit_id]) merge = await _mk_commit( db_session, repo.repo_id, parent_ids=[left.commit_id, right.commit_id] ) await _mk_branch(db_session, repo.repo_id, head_commit_id=merge.commit_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 4 async def test_sequential_gc_calls_idempotent(self, db_session: AsyncSession) -> None: """Two sequential GC calls on the same repo both complete without error.""" repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) await db_session.commit() first = await run_gc(db_session, repo.repo_id) second = await run_gc(db_session, repo.repo_id) assert first.commits_deleted == 0 assert second.commits_deleted == 0 # ───────────────────────────────────────────────────────────────────────────── # LAYER 5 — DATA INTEGRITY # ───────────────────────────────────────────────────────────────────────────── class TestGCDataIntegrity: """Data Integrity: GC preserves reachable objects and correctly tracks counts.""" async def test_reachable_commits_still_in_db_after_gc(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) c1 = await _mk_commit(db_session, repo.repo_id) c2 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c1.commit_id]) await _mk_branch(db_session, repo.repo_id, head_commit_id=c2.commit_id) await _mk_commit(db_session, repo.repo_id) # orphan await db_session.commit() await run_gc(db_session, repo.repo_id) remaining = await _count_commits(db_session, repo.repo_id) assert remaining == 2 async def test_orphaned_commits_gone_after_gc(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) orphan = await _mk_commit(db_session, repo.repo_id) orphan_id = orphan.commit_id await db_session.commit() await run_gc(db_session, repo.repo_id) row = await db_session.get(MusehubCommit, orphan_id) assert row is None async def test_reachable_count_plus_deleted_equals_total(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) for _ in range(5): await _mk_commit(db_session, repo.repo_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.reachable_commit_count + result.commits_deleted == 6 async def test_shared_snapshot_not_deleted_when_reachable(self, db_session: AsyncSession) -> None: """Snapshot referenced by both a reachable and orphaned commit must not be deleted.""" repo = await _mk_repo(db_session) snap = await _mk_snapshot(db_session, repo.repo_id) head = await _mk_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) await _mk_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) # orphan await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 1 assert result.snapshots_deleted == 0 # snapshot still present snap_row = await db_session.get(MusehubSnapshot, snap.snapshot_id) assert snap_row is not None async def test_gc_idempotent_second_run_deletes_nothing(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) await _mk_commit(db_session, repo.repo_id) # orphan await db_session.commit() first = await run_gc(db_session, repo.repo_id) assert first.commits_deleted == 1 second = await run_gc(db_session, repo.repo_id) assert second.commits_deleted == 0 async def test_multi_branch_all_heads_reachable(self, db_session: AsyncSession) -> None: """Commits pointed to by multiple branches are all preserved.""" repo = await _mk_repo(db_session) c_main = await _mk_commit(db_session, repo.repo_id) c_dev = await _mk_commit(db_session, repo.repo_id) c_feat = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=c_main.commit_id, name="main") await _mk_branch(db_session, repo.repo_id, head_commit_id=c_dev.commit_id, name="dev") await _mk_branch(db_session, repo.repo_id, head_commit_id=c_feat.commit_id, name="feat") await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.reachable_commit_count == 3 # ───────────────────────────────────────────────────────────────────────────── # LAYER 6 — SECURITY # ───────────────────────────────────────────────────────────────────────────── class TestGCBackgroundSecurity: """Security: isolation, error suppression, no cross-repo contamination.""" async def test_gc_cannot_delete_commits_in_other_repo(self, db_session: AsyncSession) -> None: repo_a = await _mk_repo(db_session, "-sec-a") repo_b = await _mk_repo(db_session, "-sec-b") # repo_b: clean head_b = await _mk_commit(db_session, repo_b.repo_id) await _mk_branch(db_session, repo_b.repo_id, head_commit_id=head_b.commit_id) # repo_a: orphaned commit orphan = await _mk_commit(db_session, repo_a.repo_id) orphan_id = orphan.commit_id await db_session.commit() await run_gc(db_session, repo_b.repo_id) # repo_a's commit untouched still_there = await db_session.get(MusehubCommit, orphan_id) assert still_there is not None async def test_gc_invalid_repo_id_no_exception(self, db_session: AsyncSession) -> None: result = await run_gc(db_session, "totally-invalid-id") assert result.commits_deleted == 0 async def test_gc_large_repo_id_no_exception(self, db_session: AsyncSession) -> None: result = await run_gc(db_session, "x" * 1000) assert result.commits_deleted == 0 async def test_gc_does_not_expose_internal_state_via_result(self, db_session: AsyncSession) -> None: """GCResult contains no raw SQL or stack traces.""" repo = await _mk_repo(db_session) await db_session.commit() result = await run_gc(db_session, repo.repo_id) result_str = str(result) assert "SELECT" not in result_str assert "Traceback" not in result_str # ───────────────────────────────────────────────────────────────────────────── # LAYER 7 — PERFORMANCE # ───────────────────────────────────────────────────────────────────────────── class TestGCPerformance: """Performance: GC latency budgets.""" async def test_gc_clean_repo_under_100ms(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) await db_session.commit() t0 = time.perf_counter() await run_gc(db_session, repo.repo_id) elapsed = time.perf_counter() - t0 assert elapsed < 0.1, f"GC on clean repo took {elapsed:.3f}s" async def test_gc_100_commit_chain_under_2s(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) parent_id: str | None = None commits = [] for _ in range(100): c = await _mk_commit( db_session, repo.repo_id, parent_ids=[parent_id] if parent_id else [] ) commits.append(c) parent_id = c.commit_id await _mk_branch(db_session, repo.repo_id, head_commit_id=commits[-1].commit_id) await db_session.commit() t0 = time.perf_counter() await run_gc(db_session, repo.repo_id) elapsed = time.perf_counter() - t0 assert elapsed < 2.0, f"GC on 100-commit chain took {elapsed:.3f}s" async def test_gc_50_orphans_under_1s(self, db_session: AsyncSession) -> None: repo = await _mk_repo(db_session) head = await _mk_commit(db_session, repo.repo_id) await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id) for _ in range(50): await _mk_commit(db_session, repo.repo_id) await db_session.commit() t0 = time.perf_counter() await run_gc(db_session, repo.repo_id) elapsed = time.perf_counter() - t0 assert elapsed < 1.0, f"GC on 50 orphans took {elapsed:.3f}s" async def test_gc_result_construction_is_negligible(self) -> None: t0 = time.perf_counter() for _ in range(10_000): GCResult(repo_id="x") elapsed = time.perf_counter() - t0 assert elapsed < 0.1, f"10K GCResult() took {elapsed:.3f}s" async def test_gc_result_construction_fast(self) -> None: """GCResult construction overhead is negligible.""" t0 = time.perf_counter() for _ in range(10_000): GCResult(repo_id="x", reachable_commit_count=5) elapsed = time.perf_counter() - t0 assert elapsed < 0.1, f"10K GCResult() took {elapsed:.3f}s"