gabriel / musehub public

test_gc_background_tasks.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Section 41 β€” GC & Background Tasks: 7-layer test suite.
2
3 Covers:
4 - musehub/services/musehub_gc.py::GCResult, run_gc
5
6 Background jobs (symbol indexing, GC) are now dispatched via enqueue_job
7 and executed by the job runner β€” there are no standalone _build_symbol_index_async
8 or _run_gc_async coroutines in wire.py.
9 """
10 from __future__ import annotations
11
12 import asyncio
13 import secrets
14 import time
15 from datetime import UTC, datetime
16
17 import msgpack
18 import pytest
19 from sqlalchemy import select
20 from sqlalchemy.ext.asyncio import AsyncSession
21
22 from musehub.core.genesis import compute_identity_id, compute_repo_id
23 from musehub.db.musehub_repo_models import (
24 MusehubBranch,
25 MusehubCommit,
26 MusehubCommitRef,
27 MusehubRepo,
28 MusehubSnapshot,
29 MusehubSnapshotRef,
30 )
31 from musehub.services.musehub_gc import GCResult, run_gc
32
33
34 # ─────────────────────────────────────────────────────────────────────────────
35 # Helpers
36 # ─────────────────────────────────────────────────────────────────────────────
37
38 def _uid() -> str:
39 return secrets.token_hex(16)
40
41
42 def _hex(n: int = 32) -> str:
43 return secrets.token_hex(n)
44
45
46 async def _mk_repo(session: AsyncSession, suffix: str = "") -> MusehubRepo:
47 slug = f"gc-test{suffix}-{_uid()[:8]}"
48 created_at = datetime.now(tz=UTC)
49 owner_id = compute_identity_id(b"testuser")
50 repo = MusehubRepo(
51 repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()),
52 name=f"gc-test{suffix}",
53 owner="testuser",
54 slug=slug,
55 owner_user_id=owner_id,
56 created_at=created_at,
57 updated_at=created_at,
58 )
59 session.add(repo)
60 await session.flush()
61 return repo
62
63
64 async def _mk_branch(
65 session: AsyncSession,
66 repo_id: str,
67 head_commit_id: str | None = None,
68 name: str = "main",
69 ) -> MusehubBranch:
70 branch = MusehubBranch(
71 branch_id=_uid(),
72 repo_id=repo_id,
73 name=name,
74 head_commit_id=head_commit_id,
75 )
76 session.add(branch)
77 await session.flush()
78 return branch
79
80
81 async def _mk_commit(
82 session: AsyncSession,
83 repo_id: str,
84 commit_id: str | None = None,
85 parent_ids: list[str] | None = None,
86 snapshot_id: str | None = None,
87 ) -> MusehubCommit:
88 commit = MusehubCommit(
89 commit_id=commit_id or _hex(),
90 branch="main",
91 parent_ids=parent_ids or [],
92 message="test commit",
93 author="testuser",
94 timestamp=datetime.now(UTC),
95 snapshot_id=snapshot_id,
96 )
97 session.add(commit)
98 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit.commit_id))
99 await session.flush()
100 return commit
101
102
103 async def _mk_snapshot(
104 session: AsyncSession, repo_id: str, snapshot_id: str | None = None
105 ) -> MusehubSnapshot:
106 snap = MusehubSnapshot(
107 snapshot_id=snapshot_id or _hex(),
108 manifest_blob=msgpack.packb({}, use_bin_type=True),
109 entry_count=0,
110 )
111 session.add(snap)
112 session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap.snapshot_id))
113 await session.flush()
114 return snap
115
116
117 async def _count_commits(session: AsyncSession, repo_id: str) -> int:
118 result = await session.execute(
119 select(MusehubCommit)
120 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
121 .where(MusehubCommitRef.repo_id == repo_id)
122 )
123 return len(result.scalars().all())
124
125
126 # ─────────────────────────────────────────────────────────────────────────────
127 # LAYER 1 β€” UNIT
128 # ─────────────────────────────────────────────────────────────────────────────
129
130
131 class TestGCResultUnit:
132 """Unit: GCResult dataclass shape and defaults."""
133
134 def test_default_counts_are_zero(self) -> None:
135 r = GCResult(repo_id="abc")
136 assert r.commits_deleted == 0
137 assert r.snapshots_deleted == 0
138 assert r.reachable_commit_count == 0
139
140 def test_default_errors_is_empty_list(self) -> None:
141 r = GCResult(repo_id="abc")
142 assert r.errors == []
143
144 def test_errors_is_independent_per_instance(self) -> None:
145 a = GCResult(repo_id="a")
146 b = GCResult(repo_id="b")
147 a.errors.append("x")
148 assert b.errors == []
149
150 def test_fields_set_correctly(self) -> None:
151 r = GCResult(repo_id="x", commits_deleted=3, snapshots_deleted=1, reachable_commit_count=5)
152 assert r.repo_id == "x"
153 assert r.commits_deleted == 3
154 assert r.snapshots_deleted == 1
155 assert r.reachable_commit_count == 5
156
157
158 class TestRunGcNoBranches:
159 """Unit: run_gc early-exit when repo has no branches."""
160
161 async def test_no_branches_returns_zero_deletions(self, db_session: AsyncSession) -> None:
162 repo = await _mk_repo(db_session)
163 result = await run_gc(db_session, repo.repo_id)
164 assert result.commits_deleted == 0
165 assert result.reachable_commit_count == 0
166
167 async def test_branch_with_null_head_is_ignored(self, db_session: AsyncSession) -> None:
168 repo = await _mk_repo(db_session)
169 await _mk_branch(db_session, repo.repo_id, head_commit_id=None)
170 result = await run_gc(db_session, repo.repo_id)
171 assert result.commits_deleted == 0
172
173 async def test_unknown_repo_id_returns_empty_result(self, db_session: AsyncSession) -> None:
174 result = await run_gc(db_session, "nonexistent-repo-id")
175 assert result.commits_deleted == 0
176 assert result.reachable_commit_count == 0
177
178
179
180
181 # ─────────────────────────────────────────────────────────────────────────────
182 # LAYER 2 β€” INTEGRATION
183 # ─────────────────────────────────────────────────────────────────────────────
184
185
186 class TestRunGcIntegration:
187 """Integration: run_gc with real in-memory DB."""
188
189 async def test_clean_repo_deletes_nothing(self, db_session: AsyncSession) -> None:
190 repo = await _mk_repo(db_session)
191 commit = await _mk_commit(db_session, repo.repo_id)
192 await _mk_branch(db_session, repo.repo_id, head_commit_id=commit.commit_id)
193 result = await run_gc(db_session, repo.repo_id)
194 assert result.commits_deleted == 0
195 assert result.reachable_commit_count == 1
196
197 async def test_orphaned_commit_is_deleted(self, db_session: AsyncSession) -> None:
198 repo = await _mk_repo(db_session)
199 # reachable commit
200 head = await _mk_commit(db_session, repo.repo_id)
201 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
202 # orphaned commit β€” not on any branch
203 await _mk_commit(db_session, repo.repo_id)
204 await db_session.commit()
205
206 result = await run_gc(db_session, repo.repo_id)
207 assert result.commits_deleted == 1
208 assert result.reachable_commit_count == 1
209
210 async def test_orphaned_chain_all_deleted(self, db_session: AsyncSession) -> None:
211 repo = await _mk_repo(db_session)
212 head = await _mk_commit(db_session, repo.repo_id)
213 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
214 # chain of 3 orphaned commits
215 c1 = await _mk_commit(db_session, repo.repo_id)
216 c2 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c1.commit_id])
217 c3 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c2.commit_id])
218 await db_session.commit()
219
220 result = await run_gc(db_session, repo.repo_id)
221 assert result.commits_deleted == 3
222 assert result.reachable_commit_count == 1
223
224 async def test_reachable_commit_chain_untouched(self, db_session: AsyncSession) -> None:
225 repo = await _mk_repo(db_session)
226 c1 = await _mk_commit(db_session, repo.repo_id)
227 c2 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c1.commit_id])
228 c3 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c2.commit_id])
229 await _mk_branch(db_session, repo.repo_id, head_commit_id=c3.commit_id)
230 await db_session.commit()
231
232 result = await run_gc(db_session, repo.repo_id)
233 assert result.commits_deleted == 0
234 assert result.reachable_commit_count == 3
235
236 async def test_gc_scoped_to_repo(self, db_session: AsyncSession) -> None:
237 """Orphaned commits in repo_a are not touched when GC runs on repo_b."""
238 repo_a = await _mk_repo(db_session, "-a")
239 repo_b = await _mk_repo(db_session, "-b")
240 # repo_b: clean
241 head_b = await _mk_commit(db_session, repo_b.repo_id)
242 await _mk_branch(db_session, repo_b.repo_id, head_commit_id=head_b.commit_id)
243 # repo_a: orphaned commit
244 await _mk_commit(db_session, repo_a.repo_id)
245 await db_session.commit()
246
247 result = await run_gc(db_session, repo_b.repo_id)
248 assert result.commits_deleted == 0
249 # repo_a's orphan still exists
250 assert await _count_commits(db_session, repo_a.repo_id) == 1
251
252 async def test_orphaned_snapshot_deleted_with_commit(self, db_session: AsyncSession) -> None:
253 repo = await _mk_repo(db_session)
254 snap = await _mk_snapshot(db_session, repo.repo_id)
255 head = await _mk_commit(db_session, repo.repo_id)
256 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
257 # orphaned commit references a snapshot
258 await _mk_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id)
259 await db_session.commit()
260
261 result = await run_gc(db_session, repo.repo_id)
262 assert result.commits_deleted == 1
263 assert result.snapshots_deleted == 1
264
265
266 # ─────────────────────────────────────────────────────────────────────────────
267 # LAYER 3 β€” E2E
268 # ─────────────────────────────────────────────────────────────────────────────
269
270
271 class TestBackgroundTaskE2E:
272 """E2E: run_gc runs to completion against the test DB."""
273
274 async def test_run_gc_completes_on_real_db(self, db_session: AsyncSession) -> None:
275 repo = await _mk_repo(db_session)
276 head = await _mk_commit(db_session, repo.repo_id)
277 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
278 await db_session.commit()
279 result = await run_gc(db_session, repo.repo_id)
280 assert result.commits_deleted == 0
281
282 async def test_run_gc_with_orphan_completes(self, db_session: AsyncSession) -> None:
283 repo = await _mk_repo(db_session)
284 head = await _mk_commit(db_session, repo.repo_id)
285 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
286 await _mk_commit(db_session, repo.repo_id) # orphan
287 await db_session.commit()
288 result = await run_gc(db_session, repo.repo_id)
289 assert result.commits_deleted == 1
290
291
292 # ─────────────────────────────────────────────────────────────────────────────
293 # LAYER 4 β€” STRESS
294 # ─────────────────────────────────────────────────────────────────────────────
295
296
297 class TestGCStress:
298 """Stress: GC under large commit volumes."""
299
300 async def test_gc_100_commit_linear_chain_all_reachable(self, db_session: AsyncSession) -> None:
301 repo = await _mk_repo(db_session)
302 parent_id: str | None = None
303 commits = []
304 for _ in range(100):
305 c = await _mk_commit(db_session, repo.repo_id, parent_ids=[parent_id] if parent_id else [])
306 commits.append(c)
307 parent_id = c.commit_id
308 await _mk_branch(db_session, repo.repo_id, head_commit_id=commits[-1].commit_id)
309 await db_session.commit()
310
311 result = await run_gc(db_session, repo.repo_id)
312 assert result.commits_deleted == 0
313 assert result.reachable_commit_count == 100
314
315 async def test_gc_50_orphaned_commits_all_deleted(self, db_session: AsyncSession) -> None:
316 repo = await _mk_repo(db_session)
317 head = await _mk_commit(db_session, repo.repo_id)
318 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
319 for _ in range(50):
320 await _mk_commit(db_session, repo.repo_id)
321 await db_session.commit()
322
323 result = await run_gc(db_session, repo.repo_id)
324 assert result.commits_deleted == 50
325
326 async def test_gc_diamond_merge_topology(self, db_session: AsyncSession) -> None:
327 """Diamond: base β†’ left + right β†’ merge; all 4 commits are reachable."""
328 repo = await _mk_repo(db_session)
329 base = await _mk_commit(db_session, repo.repo_id)
330 left = await _mk_commit(db_session, repo.repo_id, parent_ids=[base.commit_id])
331 right = await _mk_commit(db_session, repo.repo_id, parent_ids=[base.commit_id])
332 merge = await _mk_commit(
333 db_session, repo.repo_id, parent_ids=[left.commit_id, right.commit_id]
334 )
335 await _mk_branch(db_session, repo.repo_id, head_commit_id=merge.commit_id)
336 await db_session.commit()
337
338 result = await run_gc(db_session, repo.repo_id)
339 assert result.commits_deleted == 0
340 assert result.reachable_commit_count == 4
341
342 async def test_sequential_gc_calls_idempotent(self, db_session: AsyncSession) -> None:
343 """Two sequential GC calls on the same repo both complete without error."""
344 repo = await _mk_repo(db_session)
345 head = await _mk_commit(db_session, repo.repo_id)
346 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
347 await db_session.commit()
348
349 first = await run_gc(db_session, repo.repo_id)
350 second = await run_gc(db_session, repo.repo_id)
351 assert first.commits_deleted == 0
352 assert second.commits_deleted == 0
353
354
355 # ─────────────────────────────────────────────────────────────────────────────
356 # LAYER 5 β€” DATA INTEGRITY
357 # ─────────────────────────────────────────────────────────────────────────────
358
359
360 class TestGCDataIntegrity:
361 """Data Integrity: GC preserves reachable objects and correctly tracks counts."""
362
363 async def test_reachable_commits_still_in_db_after_gc(self, db_session: AsyncSession) -> None:
364 repo = await _mk_repo(db_session)
365 c1 = await _mk_commit(db_session, repo.repo_id)
366 c2 = await _mk_commit(db_session, repo.repo_id, parent_ids=[c1.commit_id])
367 await _mk_branch(db_session, repo.repo_id, head_commit_id=c2.commit_id)
368 await _mk_commit(db_session, repo.repo_id) # orphan
369 await db_session.commit()
370
371 await run_gc(db_session, repo.repo_id)
372 remaining = await _count_commits(db_session, repo.repo_id)
373 assert remaining == 2
374
375 async def test_orphaned_commits_gone_after_gc(self, db_session: AsyncSession) -> None:
376 repo = await _mk_repo(db_session)
377 head = await _mk_commit(db_session, repo.repo_id)
378 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
379 orphan = await _mk_commit(db_session, repo.repo_id)
380 orphan_id = orphan.commit_id
381 await db_session.commit()
382
383 await run_gc(db_session, repo.repo_id)
384 row = await db_session.get(MusehubCommit, orphan_id)
385 assert row is None
386
387 async def test_reachable_count_plus_deleted_equals_total(self, db_session: AsyncSession) -> None:
388 repo = await _mk_repo(db_session)
389 head = await _mk_commit(db_session, repo.repo_id)
390 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
391 for _ in range(5):
392 await _mk_commit(db_session, repo.repo_id)
393 await db_session.commit()
394
395 result = await run_gc(db_session, repo.repo_id)
396 assert result.reachable_commit_count + result.commits_deleted == 6
397
398 async def test_shared_snapshot_not_deleted_when_reachable(self, db_session: AsyncSession) -> None:
399 """Snapshot referenced by both a reachable and orphaned commit must not be deleted."""
400 repo = await _mk_repo(db_session)
401 snap = await _mk_snapshot(db_session, repo.repo_id)
402 head = await _mk_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id)
403 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
404 await _mk_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) # orphan
405 await db_session.commit()
406
407 result = await run_gc(db_session, repo.repo_id)
408 assert result.commits_deleted == 1
409 assert result.snapshots_deleted == 0
410 # snapshot still present
411 snap_row = await db_session.get(MusehubSnapshot, snap.snapshot_id)
412 assert snap_row is not None
413
414 async def test_gc_idempotent_second_run_deletes_nothing(self, db_session: AsyncSession) -> None:
415 repo = await _mk_repo(db_session)
416 head = await _mk_commit(db_session, repo.repo_id)
417 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
418 await _mk_commit(db_session, repo.repo_id) # orphan
419 await db_session.commit()
420
421 first = await run_gc(db_session, repo.repo_id)
422 assert first.commits_deleted == 1
423
424 second = await run_gc(db_session, repo.repo_id)
425 assert second.commits_deleted == 0
426
427 async def test_multi_branch_all_heads_reachable(self, db_session: AsyncSession) -> None:
428 """Commits pointed to by multiple branches are all preserved."""
429 repo = await _mk_repo(db_session)
430 c_main = await _mk_commit(db_session, repo.repo_id)
431 c_dev = await _mk_commit(db_session, repo.repo_id)
432 c_feat = await _mk_commit(db_session, repo.repo_id)
433 await _mk_branch(db_session, repo.repo_id, head_commit_id=c_main.commit_id, name="main")
434 await _mk_branch(db_session, repo.repo_id, head_commit_id=c_dev.commit_id, name="dev")
435 await _mk_branch(db_session, repo.repo_id, head_commit_id=c_feat.commit_id, name="feat")
436 await db_session.commit()
437
438 result = await run_gc(db_session, repo.repo_id)
439 assert result.commits_deleted == 0
440 assert result.reachable_commit_count == 3
441
442
443 # ─────────────────────────────────────────────────────────────────────────────
444 # LAYER 6 β€” SECURITY
445 # ─────────────────────────────────────────────────────────────────────────────
446
447
448 class TestGCBackgroundSecurity:
449 """Security: isolation, error suppression, no cross-repo contamination."""
450
451 async def test_gc_cannot_delete_commits_in_other_repo(self, db_session: AsyncSession) -> None:
452 repo_a = await _mk_repo(db_session, "-sec-a")
453 repo_b = await _mk_repo(db_session, "-sec-b")
454 # repo_b: clean
455 head_b = await _mk_commit(db_session, repo_b.repo_id)
456 await _mk_branch(db_session, repo_b.repo_id, head_commit_id=head_b.commit_id)
457 # repo_a: orphaned commit
458 orphan = await _mk_commit(db_session, repo_a.repo_id)
459 orphan_id = orphan.commit_id
460 await db_session.commit()
461
462 await run_gc(db_session, repo_b.repo_id)
463
464 # repo_a's commit untouched
465 still_there = await db_session.get(MusehubCommit, orphan_id)
466 assert still_there is not None
467
468 async def test_gc_invalid_repo_id_no_exception(self, db_session: AsyncSession) -> None:
469 result = await run_gc(db_session, "totally-invalid-id")
470 assert result.commits_deleted == 0
471
472 async def test_gc_large_repo_id_no_exception(self, db_session: AsyncSession) -> None:
473 result = await run_gc(db_session, "x" * 1000)
474 assert result.commits_deleted == 0
475
476 async def test_gc_does_not_expose_internal_state_via_result(self, db_session: AsyncSession) -> None:
477 """GCResult contains no raw SQL or stack traces."""
478 repo = await _mk_repo(db_session)
479 await db_session.commit()
480 result = await run_gc(db_session, repo.repo_id)
481 result_str = str(result)
482 assert "SELECT" not in result_str
483 assert "Traceback" not in result_str
484
485
486 # ─────────────────────────────────────────────────────────────────────────────
487 # LAYER 7 β€” PERFORMANCE
488 # ─────────────────────────────────────────────────────────────────────────────
489
490
491 class TestGCPerformance:
492 """Performance: GC latency budgets."""
493
494 async def test_gc_clean_repo_under_100ms(self, db_session: AsyncSession) -> None:
495 repo = await _mk_repo(db_session)
496 head = await _mk_commit(db_session, repo.repo_id)
497 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
498 await db_session.commit()
499
500 t0 = time.perf_counter()
501 await run_gc(db_session, repo.repo_id)
502 elapsed = time.perf_counter() - t0
503 assert elapsed < 0.1, f"GC on clean repo took {elapsed:.3f}s"
504
505 async def test_gc_100_commit_chain_under_2s(self, db_session: AsyncSession) -> None:
506 repo = await _mk_repo(db_session)
507 parent_id: str | None = None
508 commits = []
509 for _ in range(100):
510 c = await _mk_commit(
511 db_session, repo.repo_id, parent_ids=[parent_id] if parent_id else []
512 )
513 commits.append(c)
514 parent_id = c.commit_id
515 await _mk_branch(db_session, repo.repo_id, head_commit_id=commits[-1].commit_id)
516 await db_session.commit()
517
518 t0 = time.perf_counter()
519 await run_gc(db_session, repo.repo_id)
520 elapsed = time.perf_counter() - t0
521 assert elapsed < 2.0, f"GC on 100-commit chain took {elapsed:.3f}s"
522
523 async def test_gc_50_orphans_under_1s(self, db_session: AsyncSession) -> None:
524 repo = await _mk_repo(db_session)
525 head = await _mk_commit(db_session, repo.repo_id)
526 await _mk_branch(db_session, repo.repo_id, head_commit_id=head.commit_id)
527 for _ in range(50):
528 await _mk_commit(db_session, repo.repo_id)
529 await db_session.commit()
530
531 t0 = time.perf_counter()
532 await run_gc(db_session, repo.repo_id)
533 elapsed = time.perf_counter() - t0
534 assert elapsed < 1.0, f"GC on 50 orphans took {elapsed:.3f}s"
535
536 async def test_gc_result_construction_is_negligible(self) -> None:
537 t0 = time.perf_counter()
538 for _ in range(10_000):
539 GCResult(repo_id="x")
540 elapsed = time.perf_counter() - t0
541 assert elapsed < 0.1, f"10K GCResult() took {elapsed:.3f}s"
542
543 async def test_gc_result_construction_fast(self) -> None:
544 """GCResult construction overhead is negligible."""
545 t0 = time.perf_counter()
546 for _ in range(10_000):
547 GCResult(repo_id="x", reachable_commit_count=5)
548 elapsed = time.perf_counter() - t0
549 assert elapsed < 0.1, f"10K GCResult() took {elapsed:.3f}s"