gabriel / musehub public
test_fetch_mpack_prebuild.py python
381 lines 16.1 KB
Raw
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 4 days ago
1 """TDD — fetch.mpack.prebuild job handler and wire_fetch_mpack cache (issue #92 Phases 2–5).
2
3 Test IDs:
4 FMC_07 Unit: mock wire_fetch_mpack, confirm cache rows written for each tip,
5 confirm existing fresh entries are skipped
6 FMC_08 Integration: insert a job row, run the handler, verify
7 MusehubFetchMPackCache row exists with correct repo_id/tip/mpack_id
8 FMC_13 Unit: cache hit returns presigned URL without entering blob-load path
9 FMC_14 Unit: cache miss builds and writes cache row
10 FMC_18 Integration: enqueue_push_intel inserts fetch.mpack.prebuild with branch tips
11 FMC_20 Unit: gc_fetch_mpack_cache deletes expired rows + R2 objects; fresh rows untouched
12 """
13 from __future__ import annotations
14
15 import hashlib
16 from datetime import datetime, timedelta, timezone
17 from unittest.mock import AsyncMock, patch
18
19 import pytest
20 from sqlalchemy import select
21 from sqlalchemy.ext.asyncio import AsyncSession
22
23 from musehub.core.genesis import compute_job_id
24 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
25 from musehub.db.musehub_repo_models import MusehubFetchMPackCache
26 from musehub.services.musehub_gc import gc_fetch_mpack_cache
27 from musehub.services.musehub_jobs import enqueue_push_intel
28 from musehub.services.musehub_wire_fetch import process_fetch_mpack_prebuild_job, wire_fetch_mpack
29 from tests.factories import create_branch, create_repo
30
31
32 def _now() -> datetime:
33 return datetime.now(tz=timezone.utc)
34
35
36 def _fake_commit_id(seed: str) -> str:
37 return "sha256:" + hashlib.sha256(seed.encode()).hexdigest()
38
39
40 def _fake_mpack_id(seed: str) -> str:
41 return "sha256:" + hashlib.sha256(f"mpack-{seed}".encode()).hexdigest()
42
43
44 async def _insert_job(
45 session: AsyncSession,
46 repo_id: str,
47 tip_commit_ids: list[str],
48 ) -> str:
49 now = _now()
50 job_id = compute_job_id(repo_id, "fetch.mpack.prebuild", now.isoformat())
51 session.add(MusehubBackgroundJob(
52 job_id=job_id,
53 repo_id=repo_id,
54 job_type="fetch.mpack.prebuild",
55 payload={"tip_commit_ids": tip_commit_ids},
56 status="pending",
57 created_at=now,
58 attempt=0,
59 ))
60 await session.flush()
61 return job_id
62
63
64 # ── FMC_07 ────────────────────────────────────────────────────────────────────
65
66 @pytest.mark.tier2
67 async def test_fmc_07_builds_uncached_tips_in_one_combined_mpack(db_session: AsyncSession) -> None:
68 """FMC_07a: the handler builds all uncached tips in a SINGLE combined wire_fetch_mpack
69 call (want=[all uncached tips]) — not one call per tip. The per-tip cache rows are
70 written inside wire_fetch_mpack (covered by FMC_14), so it is mocked here and we assert
71 the handler's orchestration + counts only."""
72 repo = await create_repo(db_session, owner="gabriel", visibility="public")
73 tip_a = _fake_commit_id("tip-a")
74 tip_b = _fake_commit_id("tip-b")
75 combined_mpack = _fake_mpack_id("combined")
76
77 job_id = await _insert_job(db_session, repo.repo_id, [tip_a, tip_b])
78 await db_session.commit()
79
80 with patch(
81 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
82 new_callable=AsyncMock,
83 return_value={"mpack_id": combined_mpack, "mpack_url": "https://r2.example/c", "commit_count": 2, "blob_count": 5},
84 ) as mock_build:
85 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
86 await db_session.commit()
87
88 # One combined build covering both uncached tips — not one call per tip.
89 assert mock_build.call_count == 1
90 assert mock_build.call_args.kwargs["want"] == [tip_a, tip_b]
91 assert result["tips_requested"] == 2
92 assert result["tips_built"] == 2
93 assert result["tips_skipped"] == 0
94
95
96 @pytest.mark.tier2
97 async def test_fmc_07b_skips_tips_with_fresh_cache(db_session: AsyncSession) -> None:
98 """FMC_07b: tips with a non-expired cache entry are skipped without calling wire_fetch_mpack."""
99 repo = await create_repo(db_session, owner="gabriel", visibility="public")
100 tip_cached = _fake_commit_id("tip-cached")
101 tip_new = _fake_commit_id("tip-new")
102 existing_mpack = _fake_mpack_id("existing")
103 new_mpack = _fake_mpack_id("new")
104
105 # Pre-populate a fresh cache entry for tip_cached.
106 cache_id = hashlib.sha256((repo.repo_id + tip_cached).encode()).hexdigest()
107 db_session.add(MusehubFetchMPackCache(
108 cache_id=cache_id,
109 repo_id=repo.repo_id,
110 tip_commit_id=tip_cached,
111 mpack_id=existing_mpack,
112 created_at=_now(),
113 expires_at=_now() + timedelta(days=7),
114 ))
115 job_id = await _insert_job(db_session, repo.repo_id, [tip_cached, tip_new])
116 await db_session.commit()
117
118 with patch(
119 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
120 new_callable=AsyncMock,
121 return_value={"mpack_id": new_mpack, "mpack_url": "https://r2.example/new", "commit_count": 1, "blob_count": 1},
122 ) as mock_build:
123 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
124 await db_session.commit()
125
126 # Only the new tip should have triggered a build.
127 assert mock_build.call_count == 1
128 assert mock_build.call_args[1]["want"] == [tip_new] or mock_build.call_args[0][2] == [tip_new]
129 assert result["tips_built"] == 1
130 assert result["tips_skipped"] == 1
131
132 # The pre-existing cache entry must be unchanged.
133 cached_row = (await db_session.execute(
134 select(MusehubFetchMPackCache)
135 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
136 .where(MusehubFetchMPackCache.tip_commit_id == tip_cached)
137 )).scalar_one()
138 assert cached_row.mpack_id == existing_mpack
139
140
141 @pytest.mark.tier2
142 async def test_fmc_07c_empty_payload_is_a_noop(db_session: AsyncSession) -> None:
143 """FMC_07c: job with no tip_commit_ids returns zeros without calling wire_fetch_mpack."""
144 repo = await create_repo(db_session, owner="gabriel", visibility="public")
145 job_id = await _insert_job(db_session, repo.repo_id, [])
146 await db_session.commit()
147
148 with patch(
149 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
150 new_callable=AsyncMock,
151 ) as mock_build:
152 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
153
154 assert mock_build.call_count == 0
155 assert result["tips_requested"] == 0
156 assert result["tips_built"] == 0
157
158
159 # ── FMC_08 ────────────────────────────────────────────────────────────────────
160
161 @pytest.mark.tier2
162 async def test_fmc_08_cache_row_has_correct_fields(db_session: AsyncSession) -> None:
163 """FMC_08: running the handler end to end (through a REAL wire_fetch_mpack) writes a
164 cache row with matching repo_id, tip, and mpack_id. Cache rows are written inside
165 wire_fetch_mpack, so this exercises the full handler -> build -> cache-write chain
166 rather than mocking the build away. Only the expensive externals (DAG walk, mpack
167 bytes, storage) are stubbed — same pattern as FMC_14."""
168 from types import SimpleNamespace
169
170 repo = await create_repo(db_session, owner="gabriel", visibility="public")
171 tip = _fake_commit_id("integration-tip")
172 built_mpack = _fake_mpack_id("integration")
173
174 job_id = await _insert_job(db_session, repo.repo_id, [tip])
175 await db_session.commit()
176
177 mock_backend = AsyncMock()
178 mock_backend.put_mpack.return_value = None
179 mock_backend.presign_mpack_get.return_value = "https://r2.example/int"
180 mock_backend.delete.return_value = None
181 fake_proxy = SimpleNamespace(commit_id=tip, snapshot_id=None, parent_ids=[])
182
183 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
184 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock,
185 return_value={tip: fake_proxy}), \
186 patch("muse.core.mpack.build_wire_mpack", return_value=b"MUSE\x00fake-mpack"), \
187 patch("musehub.services.musehub_wire_fetch.blob_id", return_value=built_mpack):
188 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
189 await db_session.commit()
190
191 assert result["tips_built"] == 1
192
193 row = (await db_session.execute(
194 select(MusehubFetchMPackCache)
195 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
196 .where(MusehubFetchMPackCache.tip_commit_id == tip)
197 )).scalar_one()
198
199 assert row.repo_id == repo.repo_id
200 assert row.tip_commit_id == tip
201 assert row.mpack_id == built_mpack
202 assert row.expires_at > _now()
203
204
205 # ── FMC_13 ────────────────────────────────────────────────────────────────────
206
207 @pytest.mark.tier2
208 async def test_fmc_13_cache_hit_returns_presigned_url_without_blob_load(db_session: AsyncSession) -> None:
209 """FMC_13: cache hit returns the presigned URL immediately; blob-load path is never entered."""
210 repo = await create_repo(db_session, owner="gabriel", visibility="public")
211 tip = _fake_commit_id("fmc13-tip")
212 cached_mpack = _fake_mpack_id("fmc13-tip")
213 expected_url = "https://r2.example/cached-fmc13"
214
215 # Pre-populate a fresh cache entry.
216 cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest()
217 db_session.add(MusehubFetchMPackCache(
218 cache_id=cache_id,
219 repo_id=repo.repo_id,
220 tip_commit_id=tip,
221 mpack_id=cached_mpack,
222 created_at=_now(),
223 expires_at=_now() + timedelta(days=7),
224 ))
225 await db_session.commit()
226
227 mock_backend = AsyncMock()
228 mock_backend.presign_mpack_get.return_value = expected_url
229
230 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
231 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock) as mock_walk:
232 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
233
234 # The expensive DAG walk must never have been called.
235 mock_walk.assert_not_called()
236 # The returned URL must match the presigned URL for the cached mpack.
237 assert result["mpack_url"] == expected_url
238 assert result["mpack_id"] == cached_mpack
239
240
241 # ── FMC_14 ────────────────────────────────────────────────────────────────────
242
243 @pytest.mark.tier2
244 async def test_fmc_14_cache_miss_builds_and_writes_cache_row(db_session: AsyncSession) -> None:
245 """FMC_14: on a cache miss, wire_fetch_mpack builds the mpack and writes a cache row."""
246 from types import SimpleNamespace
247
248 repo = await create_repo(db_session, owner="gabriel", visibility="public")
249 tip = _fake_commit_id("fmc14-tip")
250 built_mpack = _fake_mpack_id("fmc14-tip")
251 built_url = "https://r2.example/built-fmc14"
252
253 # No cache row exists — this is a cold miss.
254 mock_backend = AsyncMock()
255 mock_backend.put_mpack.return_value = None
256 mock_backend.presign_mpack_get.return_value = built_url
257 mock_backend.delete.return_value = None
258
259 # _walk_commit_delta returns one proxy commit with no snapshot so that
260 # wire_fetch_mpack proceeds to build an empty-but-valid mpack without
261 # needing real commit/snapshot/object rows in the DB.
262 fake_proxy = SimpleNamespace(commit_id=tip, snapshot_id=None, parent_ids=[])
263
264 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
265 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock,
266 return_value={tip: fake_proxy}), \
267 patch("muse.core.mpack.build_wire_mpack", return_value=b"MUSE\x00fake-mpack"), \
268 patch("musehub.services.musehub_wire_fetch.blob_id", return_value=built_mpack):
269 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
270 await db_session.commit()
271
272 assert result["mpack_url"] == built_url
273 assert result["mpack_id"] == built_mpack
274
275 # The cache row must have been written.
276 row = (await db_session.execute(
277 select(MusehubFetchMPackCache)
278 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
279 .where(MusehubFetchMPackCache.tip_commit_id == tip)
280 )).scalar_one()
281 assert row.mpack_id == built_mpack
282 assert row.expires_at > _now()
283
284
285 # ── FMC_18 ────────────────────────────────────────────────────────────────────
286
287 @pytest.mark.tier2
288 async def test_fmc_18_enqueue_push_intel_creates_prebuild_job_with_branch_tips(
289 db_session: AsyncSession,
290 ) -> None:
291 """FMC_18: enqueue_push_intel enqueues fetch.mpack.prebuild with all branch tip commit IDs."""
292 repo = await create_repo(db_session, owner="gabriel", visibility="public")
293
294 tip_a = _fake_commit_id("branch-main")
295 tip_b = _fake_commit_id("branch-dev")
296 await create_branch(db_session, repo.repo_id, name="main", head_commit_id=tip_a)
297 await create_branch(db_session, repo.repo_id, name="dev", head_commit_id=tip_b)
298
299 await enqueue_push_intel(
300 db_session,
301 repo.repo_id,
302 head=tip_a,
303 domain_id=None,
304 branch="main",
305 )
306 await db_session.commit()
307
308 job_row = (await db_session.execute(
309 select(MusehubBackgroundJob)
310 .where(MusehubBackgroundJob.repo_id == repo.repo_id)
311 .where(MusehubBackgroundJob.job_type == "fetch.mpack.prebuild")
312 .where(MusehubBackgroundJob.status == "pending")
313 )).scalar_one()
314
315 tip_ids = set(job_row.payload.get("tip_commit_ids", []))
316 assert tip_a in tip_ids, f"main tip {tip_a[:20]} missing from payload"
317 assert tip_b in tip_ids, f"dev tip {tip_b[:20]} missing from payload"
318
319
320 # ── FMC_20 ────────────────────────────────────────────────────────────────────
321
322 @pytest.mark.tier2
323 async def test_fmc_20_gc_deletes_expired_rows_and_r2_objects_leaves_fresh_untouched(
324 db_session: AsyncSession,
325 ) -> None:
326 """FMC_20: gc_fetch_mpack_cache deletes expired rows + R2 objects; fresh rows survive."""
327 repo = await create_repo(db_session, owner="gabriel", visibility="public")
328
329 tip_expired_a = _fake_commit_id("expired-a")
330 tip_expired_b = _fake_commit_id("expired-b")
331 tip_fresh = _fake_commit_id("fresh")
332
333 mpack_expired_a = _fake_mpack_id("expired-a")
334 mpack_expired_b = _fake_mpack_id("expired-b")
335 mpack_fresh = _fake_mpack_id("fresh")
336
337 past = _now() - timedelta(days=1)
338 future = _now() + timedelta(days=6)
339
340 for tip, mpack, exp in [
341 (tip_expired_a, mpack_expired_a, past),
342 (tip_expired_b, mpack_expired_b, past),
343 (tip_fresh, mpack_fresh, future),
344 ]:
345 cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest()
346 db_session.add(MusehubFetchMPackCache(
347 cache_id=cache_id,
348 repo_id=repo.repo_id,
349 tip_commit_id=tip,
350 mpack_id=mpack,
351 created_at=_now(),
352 expires_at=exp,
353 ))
354 await db_session.commit()
355
356 mock_backend = AsyncMock()
357 mock_backend.delete.return_value = None
358
359 with patch("musehub.services.musehub_gc.get_backend", return_value=mock_backend):
360 n_deleted = await gc_fetch_mpack_cache(db_session, repo.repo_id)
361 await db_session.commit()
362
363 assert n_deleted == 2
364
365 # R2 delete called exactly once for each expired mpack — in any order.
366 deleted_mpack_ids = {call.args[0] for call in mock_backend.delete.call_args_list}
367 assert mpack_expired_a in deleted_mpack_ids
368 assert mpack_expired_b in deleted_mpack_ids
369 assert mpack_fresh not in deleted_mpack_ids
370
371 # Expired rows gone from DB.
372 remaining = (await db_session.execute(
373 select(MusehubFetchMPackCache)
374 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
375 )).scalars().all()
376 remaining_tips = {r.tip_commit_id for r in remaining}
377 assert tip_expired_a not in remaining_tips
378 assert tip_expired_b not in remaining_tips
379
380 # Fresh row survives.
381 assert tip_fresh in remaining_tips
File History 4 commits
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 4 days ago
sha256:d50f9cf9829dfbe35721a23b81ad256c729ddf9dd565a0a9e56d27847e255632 feat(#92): phase 4 — enqueue fetch.mpack.prebuild on push (… Sonnet 4.6 patch 4 days ago
sha256:1c5b7a0aba79472f4b10e52326dc010bdab1a498c9e195593d0707860478a034 feat(#92): phase 3 — cache lookup in wire_fetch_mpack (FMC_… Sonnet 4.6 patch 4 days ago
sha256:0e447fc3f6b7887d5d9e86b557c659ef7d0b05e2e09ddb0cb551ada240e48a51 feat(phase2): fetch.mpack.prebuild job handler + worker dis… Sonnet 4.6 patch 4 days ago