musehub_gc.py
python
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618
feat(#92): phase 5 — GC expired fetch mpack cache entries (…
Sonnet 4.6
patch
5 days ago
| 1 | """MuseHub garbage collection — prune commits, snapshots, object refs, and orphaned objects. |
| 2 | |
| 3 | After a force push, the old commit chain remains in the database but is no longer |
| 4 | reachable from any branch. This service identifies those orphaned objects and |
| 5 | removes them, keeping the DB consistent with the actual repo state. |
| 6 | |
| 7 | Architecture: |
| 8 | Objects are content-addressed and globally shared. A single ``musehub_objects`` |
| 9 | row may be referenced by many repos via ``musehub_object_refs``. GC must never |
| 10 | delete a storage object that is still referenced by another repo. |
| 11 | |
| 12 | Phase 1 — Commits: BFS from branch heads; delete unreachable commits. |
| 13 | Phase 2 — Snapshots: delete snapshots not referenced by any live commit. |
| 14 | Phase 3 — Refs: delete ref rows for objects no longer in any live snapshot |
| 15 | of *this* repo. |
| 16 | Phase 4 — Objects: delete ``musehub_objects`` rows (and storage bytes) for |
| 17 | objects with zero remaining refs across *all* repos. |
| 18 | |
| 19 | Usage: |
| 20 | |
| 21 | from musehub.services.musehub_gc import run_gc |
| 22 | result = await run_gc(session, repo_id) |
| 23 | |
| 24 | The returned ``GCResult`` contains counts of what was deleted. |
| 25 | """ |
| 26 | |
| 27 | import logging |
| 28 | from dataclasses import dataclass, field |
| 29 | |
| 30 | import msgpack |
| 31 | from sqlalchemy import delete, select |
| 32 | from sqlalchemy.engine import CursorResult |
| 33 | from sqlalchemy.ext.asyncio import AsyncSession |
| 34 | |
| 35 | from musehub.db.musehub_repo_models import ( |
| 36 | MusehubBranch, |
| 37 | MusehubCommit, |
| 38 | MusehubCommitRef, |
| 39 | MusehubFetchMPackCache, |
| 40 | MusehubObject, |
| 41 | MusehubObjectRef, |
| 42 | MusehubSnapshot, |
| 43 | ) |
| 44 | from musehub.storage import get_backend |
| 45 | |
| 46 | type _CommitGraph = dict[str, list[str]] |
| 47 | |
| 48 | logger = logging.getLogger(__name__) |
| 49 | |
| 50 | @dataclass |
| 51 | class GCResult: |
| 52 | """Counts of what was pruned during a single GC run. |
| 53 | |
| 54 | Attributes: |
| 55 | repo_id: The repo this GC run targeted. |
| 56 | commits_deleted: Commits unreachable from all branch heads. |
| 57 | snapshots_deleted: Snapshots no longer referenced by any live commit. |
| 58 | object_refs_deleted: Rows removed from ``musehub_object_refs`` for this repo. |
| 59 | objects_deleted: Globally orphaned ``musehub_objects`` rows removed from DB. |
| 60 | storage_bytes_freed: Bytes removed from R2/local storage (best-effort; 0 if |
| 61 | the backend does not report sizes). |
| 62 | reachable_commit_count: How many commits survived (for observability). |
| 63 | errors: Non-fatal errors (e.g. storage delete failed for one object). |
| 64 | """ |
| 65 | |
| 66 | repo_id: str |
| 67 | commits_deleted: int = 0 |
| 68 | snapshots_deleted: int = 0 |
| 69 | object_refs_deleted: int = 0 |
| 70 | objects_deleted: int = 0 |
| 71 | storage_bytes_freed: int = 0 |
| 72 | reachable_commit_count: int = 0 |
| 73 | errors: list[str] = field(default_factory=list) |
| 74 | |
| 75 | def _decode_manifest(blob: bytes | None) -> set[str]: |
| 76 | """Decode a msgpack manifest blob and return the set of object_ids it contains. |
| 77 | |
| 78 | Manifests are stored as ``{path: object_id}`` msgpack maps. Returns an empty |
| 79 | set if ``blob`` is None or malformed. |
| 80 | """ |
| 81 | if not blob: |
| 82 | return set() |
| 83 | try: |
| 84 | mapping = msgpack.unpackb(blob, raw=False) |
| 85 | return {v for v in mapping.values() if isinstance(v, str)} |
| 86 | except Exception: |
| 87 | return set() |
| 88 | |
| 89 | async def run_gc(session: AsyncSession, repo_id: str) -> GCResult: |
| 90 | """Prune all commits, snapshots, object refs, and orphaned objects for a repo. |
| 91 | |
| 92 | Algorithm: |
| 93 | 1. Load all branch heads for the repo from ``musehub_branches``. |
| 94 | 2. Walk from every head via walk_dag to collect reachable commit IDs. |
| 95 | 3. Collect snapshot_ids from orphaned commits (capture manifest blobs first). |
| 96 | 4. Collect live snapshot object_ids from surviving commits. |
| 97 | 5. DELETE orphaned commits. |
| 98 | 6. DELETE orphaned snapshots (those not referenced by any live commit). |
| 99 | 7. DELETE ref rows in ``musehub_object_refs`` for objects no longer in any |
| 100 | live snapshot of this repo. |
| 101 | 8. Find objects globally unreferenced (zero ref rows remaining across all repos). |
| 102 | 9. DELETE those rows from ``musehub_objects``; delete bytes from storage. |
| 103 | |
| 104 | Steps 7-9 implement the object-level GC that makes the ref table the single |
| 105 | source of truth for object reachability. |
| 106 | """ |
| 107 | result = GCResult(repo_id=repo_id) |
| 108 | backend = get_backend() |
| 109 | |
| 110 | # ── 1. Collect branch heads ────────────────────────────────────────────── |
| 111 | branches_result = await session.execute( |
| 112 | select(MusehubBranch.head_commit_id).where( |
| 113 | MusehubBranch.repo_id == repo_id |
| 114 | ) |
| 115 | ) |
| 116 | heads = [row[0] for row in branches_result.fetchall() if row[0]] |
| 117 | |
| 118 | if not heads: |
| 119 | logger.warning("GC: no branch heads found for repo %s — skipping", repo_id) |
| 120 | return result |
| 121 | |
| 122 | # ── 2. Load all commits for this repo, then BFS from every head ────────── |
| 123 | all_result = await session.execute( |
| 124 | select(MusehubCommit.commit_id, MusehubCommit.parent_ids) |
| 125 | .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) |
| 126 | .where(MusehubCommitRef.repo_id == repo_id) |
| 127 | ) |
| 128 | all_commits: _CommitGraph = { |
| 129 | row[0]: (row[1] or []) for row in all_result.fetchall() |
| 130 | } |
| 131 | |
| 132 | from muse.core.graph import walk_dag |
| 133 | reachable: set[str] = { |
| 134 | cid for cid in walk_dag(heads, lambda cid: all_commits.get(cid, [])) |
| 135 | if cid in all_commits |
| 136 | } |
| 137 | |
| 138 | result.reachable_commit_count = len(reachable) |
| 139 | orphaned_commit_ids = [cid for cid in all_commits if cid not in reachable] |
| 140 | |
| 141 | if not orphaned_commit_ids: |
| 142 | logger.info("GC: repo %s is clean — no orphaned commits", repo_id) |
| 143 | return result |
| 144 | |
| 145 | logger.info( |
| 146 | "GC: repo %s — %d reachable, %d orphaned commits to prune", |
| 147 | repo_id, len(reachable), len(orphaned_commit_ids), |
| 148 | ) |
| 149 | |
| 150 | # ── 3. Collect orphaned snapshot IDs + manifest blobs (before deletion) ── |
| 151 | orphaned_snap_result = await session.execute( |
| 152 | select(MusehubSnapshot.snapshot_id, MusehubSnapshot.manifest_blob).where( |
| 153 | MusehubCommit.commit_id.in_(orphaned_commit_ids), |
| 154 | MusehubCommit.snapshot_id == MusehubSnapshot.snapshot_id, |
| 155 | MusehubCommit.snapshot_id.isnot(None), |
| 156 | ) |
| 157 | ) |
| 158 | orphaned_snap_rows = orphaned_snap_result.fetchall() |
| 159 | orphaned_snapshot_ids = [row[0] for row in orphaned_snap_rows if row[0]] |
| 160 | |
| 161 | # Decode object_ids referenced by orphaned snapshots. |
| 162 | orphaned_object_ids: set[str] = set() |
| 163 | for _, blob in orphaned_snap_rows: |
| 164 | orphaned_object_ids.update(_decode_manifest(blob)) |
| 165 | |
| 166 | # ── 4. Collect live snapshot object_ids (snapshots reachable from heads) ─ |
| 167 | if reachable: |
| 168 | live_snap_result = await session.execute( |
| 169 | select(MusehubSnapshot.snapshot_id, MusehubSnapshot.manifest_blob) |
| 170 | .join(MusehubCommit, MusehubCommit.snapshot_id == MusehubSnapshot.snapshot_id) |
| 171 | .where( |
| 172 | MusehubCommit.commit_id.in_(list(reachable)), |
| 173 | MusehubCommit.snapshot_id.isnot(None), |
| 174 | ) |
| 175 | ) |
| 176 | live_snap_rows = live_snap_result.fetchall() |
| 177 | live_snapshot_ids = {row[0] for row in live_snap_rows if row[0]} |
| 178 | live_object_ids: set[str] = set() |
| 179 | for _, blob in live_snap_rows: |
| 180 | live_object_ids.update(_decode_manifest(blob)) |
| 181 | else: |
| 182 | live_snapshot_ids = set() |
| 183 | live_object_ids = set() |
| 184 | |
| 185 | truly_orphaned_snapshots = [ |
| 186 | sid for sid in orphaned_snapshot_ids if sid not in live_snapshot_ids |
| 187 | ] |
| 188 | |
| 189 | # ── 5. Delete orphaned commits ──────────────────────────────────────────── |
| 190 | del_commits = await session.execute( |
| 191 | delete(MusehubCommit).where( |
| 192 | MusehubCommit.commit_id.in_(orphaned_commit_ids) |
| 193 | ) |
| 194 | ) |
| 195 | assert isinstance(del_commits, CursorResult) |
| 196 | result.commits_deleted = del_commits.rowcount |
| 197 | |
| 198 | # ── 6. Delete orphaned snapshots ───────────────────────────────────────── |
| 199 | if truly_orphaned_snapshots: |
| 200 | del_snaps = await session.execute( |
| 201 | delete(MusehubSnapshot).where( |
| 202 | MusehubSnapshot.snapshot_id.in_(truly_orphaned_snapshots) |
| 203 | ) |
| 204 | ) |
| 205 | assert isinstance(del_snaps, CursorResult) |
| 206 | result.snapshots_deleted = del_snaps.rowcount |
| 207 | |
| 208 | await session.commit() |
| 209 | |
| 210 | # ── 7. Prune stale object refs for this repo ───────────────────────────── |
| 211 | # An object_id that appeared only in orphaned snapshots (not in any live |
| 212 | # snapshot) no longer has a logical reference from this repo. |
| 213 | stale_oids = orphaned_object_ids - live_object_ids |
| 214 | if stale_oids: |
| 215 | del_refs = await session.execute( |
| 216 | delete(MusehubObjectRef).where( |
| 217 | MusehubObjectRef.repo_id == repo_id, |
| 218 | MusehubObjectRef.object_id.in_(list(stale_oids)), |
| 219 | ) |
| 220 | ) |
| 221 | assert isinstance(del_refs, CursorResult) |
| 222 | result.object_refs_deleted = del_refs.rowcount |
| 223 | await session.commit() |
| 224 | |
| 225 | logger.info( |
| 226 | "GC: repo %s — pruned %d stale object refs", |
| 227 | repo_id, result.object_refs_deleted, |
| 228 | ) |
| 229 | |
| 230 | # ── 8. Find globally orphaned objects (zero remaining refs) ────────────── |
| 231 | # Only bother checking the stale candidates — objects still referenced by |
| 232 | # other repos will have surviving ref rows and will not appear here. |
| 233 | if stale_oids: |
| 234 | still_referenced_result = await session.execute( |
| 235 | select(MusehubObjectRef.object_id).where( |
| 236 | MusehubObjectRef.object_id.in_(list(stale_oids)) |
| 237 | ).distinct() |
| 238 | ) |
| 239 | still_referenced = {row[0] for row in still_referenced_result.fetchall()} |
| 240 | globally_orphaned = stale_oids - still_referenced |
| 241 | else: |
| 242 | globally_orphaned = set() |
| 243 | |
| 244 | # ── 9. Delete globally orphaned objects from DB + storage ───────────────── |
| 245 | if globally_orphaned: |
| 246 | logger.info( |
| 247 | "GC: repo %s — %d globally orphaned objects to delete from storage", |
| 248 | repo_id, len(globally_orphaned), |
| 249 | ) |
| 250 | for oid in globally_orphaned: |
| 251 | try: |
| 252 | await backend.delete(oid) |
| 253 | except Exception as exc: |
| 254 | msg = f"storage delete failed for {oid}: {exc}" |
| 255 | logger.warning("GC: %s", msg) |
| 256 | result.errors.append(msg) |
| 257 | |
| 258 | del_objs = await session.execute( |
| 259 | delete(MusehubObject).where( |
| 260 | MusehubObject.object_id.in_(list(globally_orphaned)) |
| 261 | ) |
| 262 | ) |
| 263 | assert isinstance(del_objs, CursorResult) |
| 264 | result.objects_deleted = del_objs.rowcount |
| 265 | await session.commit() |
| 266 | |
| 267 | logger.info( |
| 268 | "GC: repo %s — deleted %d objects from DB", |
| 269 | repo_id, result.objects_deleted, |
| 270 | ) |
| 271 | |
| 272 | logger.info( |
| 273 | "GC complete: repo %s — commits=%d snapshots=%d refs=%d objects=%d errors=%d", |
| 274 | repo_id, |
| 275 | result.commits_deleted, |
| 276 | result.snapshots_deleted, |
| 277 | result.object_refs_deleted, |
| 278 | result.objects_deleted, |
| 279 | len(result.errors), |
| 280 | ) |
| 281 | return result |
| 282 | |
| 283 | |
| 284 | async def gc_fetch_mpack_cache(session: AsyncSession, repo_id: str) -> int: |
| 285 | """Delete expired fetch mpack cache rows and their R2 objects for a repo. |
| 286 | |
| 287 | Called by the gc worker handler after every normal GC run. Only touches |
| 288 | rows whose ``expires_at`` is in the past — fresh entries are never removed. |
| 289 | |
| 290 | Returns the number of cache rows deleted. |
| 291 | """ |
| 292 | from datetime import datetime, timezone as _tz |
| 293 | now = datetime.now(tz=_tz.utc) |
| 294 | |
| 295 | expired_q = await session.execute( |
| 296 | select(MusehubFetchMPackCache.cache_id, MusehubFetchMPackCache.mpack_id) |
| 297 | .where(MusehubFetchMPackCache.repo_id == repo_id) |
| 298 | .where(MusehubFetchMPackCache.expires_at <= now) |
| 299 | ) |
| 300 | expired_rows = expired_q.all() |
| 301 | if not expired_rows: |
| 302 | return 0 |
| 303 | |
| 304 | backend = get_backend() |
| 305 | for _, mpack_id in expired_rows: |
| 306 | try: |
| 307 | await backend.delete(mpack_id) |
| 308 | except Exception as exc: |
| 309 | logger.warning( |
| 310 | "gc_fetch_mpack_cache: R2 delete failed mpack_id=%s: %s", |
| 311 | mpack_id[:20] if mpack_id else "?", exc, |
| 312 | ) |
| 313 | |
| 314 | expired_cache_ids = [row[0] for row in expired_rows] |
| 315 | await session.execute( |
| 316 | delete(MusehubFetchMPackCache) |
| 317 | .where(MusehubFetchMPackCache.cache_id.in_(expired_cache_ids)) |
| 318 | ) |
| 319 | logger.info( |
| 320 | "gc_fetch_mpack_cache: repo=%s deleted=%d expired cache rows", |
| 321 | repo_id, len(expired_cache_ids), |
| 322 | ) |
| 323 | return len(expired_cache_ids) |
File History
1 commit
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618
feat(#92): phase 5 — GC expired fetch mpack cache entries (…
Sonnet 4.6
patch
5 days ago