gabriel / musehub public
musehub_gc.py python
323 lines 12.6 KB
Raw
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 4 days ago
1 """MuseHub garbage collection — prune commits, snapshots, object refs, and orphaned objects.
2
3 After a force push, the old commit chain remains in the database but is no longer
4 reachable from any branch. This service identifies those orphaned objects and
5 removes them, keeping the DB consistent with the actual repo state.
6
7 Architecture:
8 Objects are content-addressed and globally shared. A single ``musehub_objects``
9 row may be referenced by many repos via ``musehub_object_refs``. GC must never
10 delete a storage object that is still referenced by another repo.
11
12 Phase 1 — Commits: BFS from branch heads; delete unreachable commits.
13 Phase 2 — Snapshots: delete snapshots not referenced by any live commit.
14 Phase 3 — Refs: delete ref rows for objects no longer in any live snapshot
15 of *this* repo.
16 Phase 4 — Objects: delete ``musehub_objects`` rows (and storage bytes) for
17 objects with zero remaining refs across *all* repos.
18
19 Usage:
20
21 from musehub.services.musehub_gc import run_gc
22 result = await run_gc(session, repo_id)
23
24 The returned ``GCResult`` contains counts of what was deleted.
25 """
26
27 import logging
28 from dataclasses import dataclass, field
29
30 import msgpack
31 from sqlalchemy import delete, select
32 from sqlalchemy.engine import CursorResult
33 from sqlalchemy.ext.asyncio import AsyncSession
34
35 from musehub.db.musehub_repo_models import (
36 MusehubBranch,
37 MusehubCommit,
38 MusehubCommitRef,
39 MusehubFetchMPackCache,
40 MusehubObject,
41 MusehubObjectRef,
42 MusehubSnapshot,
43 )
44 from musehub.storage import get_backend
45
46 type _CommitGraph = dict[str, list[str]]
47
48 logger = logging.getLogger(__name__)
49
50 @dataclass
51 class GCResult:
52 """Counts of what was pruned during a single GC run.
53
54 Attributes:
55 repo_id: The repo this GC run targeted.
56 commits_deleted: Commits unreachable from all branch heads.
57 snapshots_deleted: Snapshots no longer referenced by any live commit.
58 object_refs_deleted: Rows removed from ``musehub_object_refs`` for this repo.
59 objects_deleted: Globally orphaned ``musehub_objects`` rows removed from DB.
60 storage_bytes_freed: Bytes removed from R2/local storage (best-effort; 0 if
61 the backend does not report sizes).
62 reachable_commit_count: How many commits survived (for observability).
63 errors: Non-fatal errors (e.g. storage delete failed for one object).
64 """
65
66 repo_id: str
67 commits_deleted: int = 0
68 snapshots_deleted: int = 0
69 object_refs_deleted: int = 0
70 objects_deleted: int = 0
71 storage_bytes_freed: int = 0
72 reachable_commit_count: int = 0
73 errors: list[str] = field(default_factory=list)
74
75 def _decode_manifest(blob: bytes | None) -> set[str]:
76 """Decode a msgpack manifest blob and return the set of object_ids it contains.
77
78 Manifests are stored as ``{path: object_id}`` msgpack maps. Returns an empty
79 set if ``blob`` is None or malformed.
80 """
81 if not blob:
82 return set()
83 try:
84 mapping = msgpack.unpackb(blob, raw=False)
85 return {v for v in mapping.values() if isinstance(v, str)}
86 except Exception:
87 return set()
88
89 async def run_gc(session: AsyncSession, repo_id: str) -> GCResult:
90 """Prune all commits, snapshots, object refs, and orphaned objects for a repo.
91
92 Algorithm:
93 1. Load all branch heads for the repo from ``musehub_branches``.
94 2. Walk from every head via walk_dag to collect reachable commit IDs.
95 3. Collect snapshot_ids from orphaned commits (capture manifest blobs first).
96 4. Collect live snapshot object_ids from surviving commits.
97 5. DELETE orphaned commits.
98 6. DELETE orphaned snapshots (those not referenced by any live commit).
99 7. DELETE ref rows in ``musehub_object_refs`` for objects no longer in any
100 live snapshot of this repo.
101 8. Find objects globally unreferenced (zero ref rows remaining across all repos).
102 9. DELETE those rows from ``musehub_objects``; delete bytes from storage.
103
104 Steps 7-9 implement the object-level GC that makes the ref table the single
105 source of truth for object reachability.
106 """
107 result = GCResult(repo_id=repo_id)
108 backend = get_backend()
109
110 # ── 1. Collect branch heads ──────────────────────────────────────────────
111 branches_result = await session.execute(
112 select(MusehubBranch.head_commit_id).where(
113 MusehubBranch.repo_id == repo_id
114 )
115 )
116 heads = [row[0] for row in branches_result.fetchall() if row[0]]
117
118 if not heads:
119 logger.warning("GC: no branch heads found for repo %s — skipping", repo_id)
120 return result
121
122 # ── 2. Load all commits for this repo, then BFS from every head ──────────
123 all_result = await session.execute(
124 select(MusehubCommit.commit_id, MusehubCommit.parent_ids)
125 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
126 .where(MusehubCommitRef.repo_id == repo_id)
127 )
128 all_commits: _CommitGraph = {
129 row[0]: (row[1] or []) for row in all_result.fetchall()
130 }
131
132 from muse.core.graph import walk_dag
133 reachable: set[str] = {
134 cid for cid in walk_dag(heads, lambda cid: all_commits.get(cid, []))
135 if cid in all_commits
136 }
137
138 result.reachable_commit_count = len(reachable)
139 orphaned_commit_ids = [cid for cid in all_commits if cid not in reachable]
140
141 if not orphaned_commit_ids:
142 logger.info("GC: repo %s is clean — no orphaned commits", repo_id)
143 return result
144
145 logger.info(
146 "GC: repo %s — %d reachable, %d orphaned commits to prune",
147 repo_id, len(reachable), len(orphaned_commit_ids),
148 )
149
150 # ── 3. Collect orphaned snapshot IDs + manifest blobs (before deletion) ──
151 orphaned_snap_result = await session.execute(
152 select(MusehubSnapshot.snapshot_id, MusehubSnapshot.manifest_blob).where(
153 MusehubCommit.commit_id.in_(orphaned_commit_ids),
154 MusehubCommit.snapshot_id == MusehubSnapshot.snapshot_id,
155 MusehubCommit.snapshot_id.isnot(None),
156 )
157 )
158 orphaned_snap_rows = orphaned_snap_result.fetchall()
159 orphaned_snapshot_ids = [row[0] for row in orphaned_snap_rows if row[0]]
160
161 # Decode object_ids referenced by orphaned snapshots.
162 orphaned_object_ids: set[str] = set()
163 for _, blob in orphaned_snap_rows:
164 orphaned_object_ids.update(_decode_manifest(blob))
165
166 # ── 4. Collect live snapshot object_ids (snapshots reachable from heads) ─
167 if reachable:
168 live_snap_result = await session.execute(
169 select(MusehubSnapshot.snapshot_id, MusehubSnapshot.manifest_blob)
170 .join(MusehubCommit, MusehubCommit.snapshot_id == MusehubSnapshot.snapshot_id)
171 .where(
172 MusehubCommit.commit_id.in_(list(reachable)),
173 MusehubCommit.snapshot_id.isnot(None),
174 )
175 )
176 live_snap_rows = live_snap_result.fetchall()
177 live_snapshot_ids = {row[0] for row in live_snap_rows if row[0]}
178 live_object_ids: set[str] = set()
179 for _, blob in live_snap_rows:
180 live_object_ids.update(_decode_manifest(blob))
181 else:
182 live_snapshot_ids = set()
183 live_object_ids = set()
184
185 truly_orphaned_snapshots = [
186 sid for sid in orphaned_snapshot_ids if sid not in live_snapshot_ids
187 ]
188
189 # ── 5. Delete orphaned commits ────────────────────────────────────────────
190 del_commits = await session.execute(
191 delete(MusehubCommit).where(
192 MusehubCommit.commit_id.in_(orphaned_commit_ids)
193 )
194 )
195 assert isinstance(del_commits, CursorResult)
196 result.commits_deleted = del_commits.rowcount
197
198 # ── 6. Delete orphaned snapshots ─────────────────────────────────────────
199 if truly_orphaned_snapshots:
200 del_snaps = await session.execute(
201 delete(MusehubSnapshot).where(
202 MusehubSnapshot.snapshot_id.in_(truly_orphaned_snapshots)
203 )
204 )
205 assert isinstance(del_snaps, CursorResult)
206 result.snapshots_deleted = del_snaps.rowcount
207
208 await session.commit()
209
210 # ── 7. Prune stale object refs for this repo ─────────────────────────────
211 # An object_id that appeared only in orphaned snapshots (not in any live
212 # snapshot) no longer has a logical reference from this repo.
213 stale_oids = orphaned_object_ids - live_object_ids
214 if stale_oids:
215 del_refs = await session.execute(
216 delete(MusehubObjectRef).where(
217 MusehubObjectRef.repo_id == repo_id,
218 MusehubObjectRef.object_id.in_(list(stale_oids)),
219 )
220 )
221 assert isinstance(del_refs, CursorResult)
222 result.object_refs_deleted = del_refs.rowcount
223 await session.commit()
224
225 logger.info(
226 "GC: repo %s — pruned %d stale object refs",
227 repo_id, result.object_refs_deleted,
228 )
229
230 # ── 8. Find globally orphaned objects (zero remaining refs) ──────────────
231 # Only bother checking the stale candidates — objects still referenced by
232 # other repos will have surviving ref rows and will not appear here.
233 if stale_oids:
234 still_referenced_result = await session.execute(
235 select(MusehubObjectRef.object_id).where(
236 MusehubObjectRef.object_id.in_(list(stale_oids))
237 ).distinct()
238 )
239 still_referenced = {row[0] for row in still_referenced_result.fetchall()}
240 globally_orphaned = stale_oids - still_referenced
241 else:
242 globally_orphaned = set()
243
244 # ── 9. Delete globally orphaned objects from DB + storage ─────────────────
245 if globally_orphaned:
246 logger.info(
247 "GC: repo %s — %d globally orphaned objects to delete from storage",
248 repo_id, len(globally_orphaned),
249 )
250 for oid in globally_orphaned:
251 try:
252 await backend.delete(oid)
253 except Exception as exc:
254 msg = f"storage delete failed for {oid}: {exc}"
255 logger.warning("GC: %s", msg)
256 result.errors.append(msg)
257
258 del_objs = await session.execute(
259 delete(MusehubObject).where(
260 MusehubObject.object_id.in_(list(globally_orphaned))
261 )
262 )
263 assert isinstance(del_objs, CursorResult)
264 result.objects_deleted = del_objs.rowcount
265 await session.commit()
266
267 logger.info(
268 "GC: repo %s — deleted %d objects from DB",
269 repo_id, result.objects_deleted,
270 )
271
272 logger.info(
273 "GC complete: repo %s — commits=%d snapshots=%d refs=%d objects=%d errors=%d",
274 repo_id,
275 result.commits_deleted,
276 result.snapshots_deleted,
277 result.object_refs_deleted,
278 result.objects_deleted,
279 len(result.errors),
280 )
281 return result
282
283
284 async def gc_fetch_mpack_cache(session: AsyncSession, repo_id: str) -> int:
285 """Delete expired fetch mpack cache rows and their R2 objects for a repo.
286
287 Called by the gc worker handler after every normal GC run. Only touches
288 rows whose ``expires_at`` is in the past — fresh entries are never removed.
289
290 Returns the number of cache rows deleted.
291 """
292 from datetime import datetime, timezone as _tz
293 now = datetime.now(tz=_tz.utc)
294
295 expired_q = await session.execute(
296 select(MusehubFetchMPackCache.cache_id, MusehubFetchMPackCache.mpack_id)
297 .where(MusehubFetchMPackCache.repo_id == repo_id)
298 .where(MusehubFetchMPackCache.expires_at <= now)
299 )
300 expired_rows = expired_q.all()
301 if not expired_rows:
302 return 0
303
304 backend = get_backend()
305 for _, mpack_id in expired_rows:
306 try:
307 await backend.delete(mpack_id)
308 except Exception as exc:
309 logger.warning(
310 "gc_fetch_mpack_cache: R2 delete failed mpack_id=%s: %s",
311 mpack_id[:20] if mpack_id else "?", exc,
312 )
313
314 expired_cache_ids = [row[0] for row in expired_rows]
315 await session.execute(
316 delete(MusehubFetchMPackCache)
317 .where(MusehubFetchMPackCache.cache_id.in_(expired_cache_ids))
318 )
319 logger.info(
320 "gc_fetch_mpack_cache: repo=%s deleted=%d expired cache rows",
321 repo_id, len(expired_cache_ids),
322 )
323 return len(expired_cache_ids)
File History 1 commit
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 4 days ago