"""TDD: GC object-ref pruning and global object cleanup. After a force push or branch rewrite, old commits and their snapshots become orphaned. Objects referenced exclusively by orphaned snapshots should have their ref rows removed. Objects with zero remaining refs across all repos should be deleted from musehub_objects and from storage. Coverage matrix: 1. GC deletes ref row for object unreachable from all live snapshots. 2. GC does NOT delete ref when object is still referenced by a live snapshot. 3. GC deletes musehub_objects row when no refs remain globally. 4. GC does NOT delete musehub_objects row when another repo still holds a ref. 5. GC on a clean repo (no orphaned commits) is a no-op — no refs disturbed. 6. GCResult fields are populated correctly. """ from __future__ import annotations import secrets from datetime import datetime, timezone import msgpack import pytest import sqlalchemy as sa from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef from musehub.services.musehub_gc import run_gc from musehub.types.json_types import StrDict from tests.factories import create_repo def _now() -> datetime: return datetime.now(tz=timezone.utc) def _oid(seed: str) -> str: return blob_id(seed.encode()) def _manifest(mapping: StrDict) -> bytes: """Encode a {path: object_id} dict as msgpack.""" return msgpack.packb(mapping, use_bin_type=True) # --------------------------------------------------------------------------- # Low-level DB helpers # --------------------------------------------------------------------------- async def _insert_object(session: AsyncSession, oid: str, repo_id: str) -> None: """Insert a minimal musehub_objects row and ref (skips if already present).""" exists = (await session.execute( sa.select(MusehubObject.object_id).where(MusehubObject.object_id == oid) )).scalar_one_or_none() if exists: return obj = MusehubObject( object_id=oid, size_bytes=10, path="test.md", content_cache=b"test content", ) session.add(obj) await session.flush() async def _insert_ref(session: AsyncSession, repo_id: str, oid: str) -> None: """Insert a musehub_object_refs row (idempotent).""" existing = (await session.execute( sa.select(MusehubObjectRef).where( MusehubObjectRef.repo_id == repo_id, MusehubObjectRef.object_id == oid, ) )).scalar_one_or_none() if existing: return session.add(MusehubObjectRef(repo_id=repo_id, object_id=oid)) await session.flush() async def _insert_snapshot( session: AsyncSession, snapshot_id: str, manifest: dict[str, str], repo_id: str = "", ) -> MusehubSnapshot: snap = MusehubSnapshot( snapshot_id=snapshot_id, manifest_blob=_manifest(manifest), entry_count=len(manifest), directories=[], ) session.add(snap) if repo_id: session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snapshot_id)) await session.flush() return snap async def _insert_commit( session: AsyncSession, repo_id: str, commit_id: str, snapshot_id: str | None = None, parent_ids: list[str] | None = None, branch: str = "main", ) -> MusehubCommit: commit = MusehubCommit( commit_id=commit_id, message="test commit", author="test-user", branch=branch, parent_ids=parent_ids or [], snapshot_id=snapshot_id, timestamp=_now(), ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) await session.flush() return commit async def _insert_branch( session: AsyncSession, repo_id: str, head_commit_id: str, name: str = "main", ) -> MusehubBranch: from musehub.core.genesis import compute_branch_id branch = MusehubBranch( branch_id=compute_branch_id(repo_id, name), repo_id=repo_id, name=name, head_commit_id=head_commit_id, ) session.add(branch) await session.flush() return branch async def _ref_exists(session: AsyncSession, repo_id: str, oid: str) -> bool: row = (await session.execute( sa.select(MusehubObjectRef).where( MusehubObjectRef.repo_id == repo_id, MusehubObjectRef.object_id == oid, ) )).scalar_one_or_none() return row is not None async def _object_exists(session: AsyncSession, oid: str) -> bool: row = (await session.execute( sa.select(MusehubObject.object_id).where(MusehubObject.object_id == oid) )).scalar_one_or_none() return row is not None # --------------------------------------------------------------------------- # Test 1: GC removes ref for object only in orphaned snapshot # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_gc_removes_stale_ref_for_orphaned_object( db_session: AsyncSession, ) -> None: """A ref row for an object that only appears in an orphaned snapshot must be deleted.""" repo = await create_repo(db_session, slug="gc-stale-ref", owner="test-user-wire") oid = _oid("stale-object-only-in-orphaned-snapshot") # Orphaned commit chain: C1 -> C2 (orphaned after force-push to C3) snap_orphan_id = f"snap_{secrets.token_hex(4)}" c1_id = secrets.token_hex(16) c2_id = secrets.token_hex(16) c3_id = secrets.token_hex(16) snap_live_id = f"snap_{secrets.token_hex(4)}" await _insert_object(db_session, oid, repo.repo_id) await _insert_ref(db_session, repo.repo_id, oid) # Orphaned snapshot references the object await _insert_snapshot(db_session, snap_orphan_id, {"file.md": oid}, repo_id=repo.repo_id) # Live snapshot is empty (object not referenced by any live snapshot) await _insert_snapshot(db_session, snap_live_id, {}, repo_id=repo.repo_id) await _insert_commit(db_session, repo.repo_id, c1_id) await _insert_commit(db_session, repo.repo_id, c2_id, snapshot_id=snap_orphan_id, parent_ids=[c1_id]) await _insert_commit(db_session, repo.repo_id, c3_id, snapshot_id=snap_live_id) # force-push resets branch await _insert_branch(db_session, repo.repo_id, c3_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.object_refs_deleted >= 1, "stale ref must be deleted" assert not await _ref_exists(db_session, repo.repo_id, oid), \ "ref row must be gone after GC" # --------------------------------------------------------------------------- # Test 2: GC keeps ref when object is still in a live snapshot # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_gc_keeps_ref_for_live_object( db_session: AsyncSession, ) -> None: """A ref for an object that appears in both orphaned and live snapshots must survive.""" repo = await create_repo(db_session, slug="gc-live-ref", owner="test-user-wire") oid = _oid("object-in-both-orphaned-and-live-snapshot") snap_orphan_id = f"snap_{secrets.token_hex(4)}" snap_live_id = f"snap_{secrets.token_hex(4)}" c_orphan_id = secrets.token_hex(16) c_live_id = secrets.token_hex(16) await _insert_object(db_session, oid, repo.repo_id) await _insert_ref(db_session, repo.repo_id, oid) # Both snapshots reference the same object await _insert_snapshot(db_session, snap_orphan_id, {"a.md": oid}, repo_id=repo.repo_id) await _insert_snapshot(db_session, snap_live_id, {"b.md": oid}, repo_id=repo.repo_id) await _insert_commit(db_session, repo.repo_id, c_orphan_id, snapshot_id=snap_orphan_id) await _insert_commit(db_session, repo.repo_id, c_live_id, snapshot_id=snap_live_id) await _insert_branch(db_session, repo.repo_id, c_live_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.object_refs_deleted == 0, "ref to live object must not be deleted" assert await _ref_exists(db_session, repo.repo_id, oid), \ "ref row must survive GC when object is live" # --------------------------------------------------------------------------- # Test 3: GC deletes musehub_objects row when globally orphaned # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_gc_deletes_globally_orphaned_object( db_session: AsyncSession, ) -> None: """After the last ref is deleted, the musehub_objects row must be deleted too.""" repo = await create_repo(db_session, slug="gc-global-orphan", owner="test-user-wire") oid = _oid("globally-orphaned-no-other-repo-refs") snap_orphan_id = f"snap_{secrets.token_hex(4)}" snap_live_id = f"snap_{secrets.token_hex(4)}" c_orphan_id = secrets.token_hex(16) c_live_id = secrets.token_hex(16) await _insert_object(db_session, oid, repo.repo_id) await _insert_ref(db_session, repo.repo_id, oid) await _insert_snapshot(db_session, snap_orphan_id, {"file.md": oid}, repo_id=repo.repo_id) await _insert_snapshot(db_session, snap_live_id, {}, repo_id=repo.repo_id) # live snapshot has no objects await _insert_commit(db_session, repo.repo_id, c_orphan_id, snapshot_id=snap_orphan_id) await _insert_commit(db_session, repo.repo_id, c_live_id, snapshot_id=snap_live_id) await _insert_branch(db_session, repo.repo_id, c_live_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.objects_deleted >= 1, "globally orphaned object must be deleted from DB" assert not await _object_exists(db_session, oid), \ "musehub_objects row must be gone after GC" # --------------------------------------------------------------------------- # Test 4: GC does NOT delete musehub_objects when another repo still refs it # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_gc_keeps_object_when_other_repo_holds_ref( db_session: AsyncSession, ) -> None: """An object shared with another repo must NOT be deleted from musehub_objects.""" repo_a = await create_repo(db_session, slug="gc-shared-a", owner="test-user-wire") repo_b = await create_repo(db_session, slug="gc-shared-b", owner="test-user-wire") oid = _oid("shared-object-two-repos") # Set up repo_a with orphaned snapshot referencing the object snap_orphan_id = f"snap_{secrets.token_hex(4)}" snap_live_id = f"snap_{secrets.token_hex(4)}" c_orphan_id = secrets.token_hex(16) c_live_id = secrets.token_hex(16) await _insert_object(db_session, oid, repo_a.repo_id) # Both repos hold a ref await _insert_ref(db_session, repo_a.repo_id, oid) await _insert_ref(db_session, repo_b.repo_id, oid) await _insert_snapshot(db_session, snap_orphan_id, {"file.md": oid}, repo_id=repo_a.repo_id) await _insert_snapshot(db_session, snap_live_id, {}, repo_id=repo_a.repo_id) await _insert_commit(db_session, repo_a.repo_id, c_orphan_id, snapshot_id=snap_orphan_id) await _insert_commit(db_session, repo_a.repo_id, c_live_id, snapshot_id=snap_live_id) await _insert_branch(db_session, repo_a.repo_id, c_live_id) await db_session.commit() # GC repo_a — prunes repo_a's ref but repo_b's ref survives result = await run_gc(db_session, repo_a.repo_id) assert result.objects_deleted == 0, \ "must not delete object still referenced by repo_b" assert await _object_exists(db_session, oid), \ "musehub_objects row must survive because repo_b still holds a ref" assert await _ref_exists(db_session, repo_b.repo_id, oid), \ "repo_b ref must be untouched by repo_a GC" # --------------------------------------------------------------------------- # Test 5: Clean repo (no orphaned commits) — no refs disturbed # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_gc_clean_repo_does_not_touch_refs( db_session: AsyncSession, ) -> None: """GC on a fully-reachable repo must be a no-op for object refs.""" repo = await create_repo(db_session, slug="gc-clean-repo", owner="test-user-wire") oid = _oid("clean-repo-live-object") snap_id = f"snap_{secrets.token_hex(4)}" c_id = secrets.token_hex(16) await _insert_object(db_session, oid, repo.repo_id) await _insert_ref(db_session, repo.repo_id, oid) await _insert_snapshot(db_session, snap_id, {"readme.md": oid}, repo_id=repo.repo_id) await _insert_commit(db_session, repo.repo_id, c_id, snapshot_id=snap_id) await _insert_branch(db_session, repo.repo_id, c_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.commits_deleted == 0 assert result.snapshots_deleted == 0 assert result.object_refs_deleted == 0 assert result.objects_deleted == 0 assert await _ref_exists(db_session, repo.repo_id, oid), \ "live object ref must survive a clean GC run" # --------------------------------------------------------------------------- # Test 6: GCResult fields are correctly populated # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_gc_result_fields( db_session: AsyncSession, ) -> None: """GCResult must accurately reflect what was deleted.""" repo = await create_repo(db_session, slug="gc-result-fields", owner="test-user-wire") oid = _oid("result-fields-object") snap_orphan_id = f"snap_{secrets.token_hex(4)}" snap_live_id = f"snap_{secrets.token_hex(4)}" c_orphan_id = secrets.token_hex(16) c_live_id = secrets.token_hex(16) await _insert_object(db_session, oid, repo.repo_id) await _insert_ref(db_session, repo.repo_id, oid) await _insert_snapshot(db_session, snap_orphan_id, {"file.md": oid}, repo_id=repo.repo_id) await _insert_snapshot(db_session, snap_live_id, {}, repo_id=repo.repo_id) await _insert_commit(db_session, repo.repo_id, c_orphan_id, snapshot_id=snap_orphan_id) await _insert_commit(db_session, repo.repo_id, c_live_id, snapshot_id=snap_live_id) await _insert_branch(db_session, repo.repo_id, c_live_id) await db_session.commit() result = await run_gc(db_session, repo.repo_id) assert result.repo_id == repo.repo_id assert result.commits_deleted == 1, "one orphaned commit" assert result.snapshots_deleted == 1, "one orphaned snapshot" assert result.object_refs_deleted == 1, "one stale ref" assert result.objects_deleted == 1, "one globally orphaned object" assert result.reachable_commit_count == 1, "one live commit" # errors list exists even when empty assert isinstance(result.errors, list)