gabriel / musehub public
test_fetch_mpack_prebuild.py python
378 lines 15.5 KB
Raw
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 2 days ago
1 """TDD — fetch.mpack.prebuild job handler and wire_fetch_mpack cache (issue #92 Phases 2–5).
2
3 Test IDs:
4 FMC_07 Unit: mock wire_fetch_mpack, confirm cache rows written for each tip,
5 confirm existing fresh entries are skipped
6 FMC_08 Integration: insert a job row, run the handler, verify
7 MusehubFetchMPackCache row exists with correct repo_id/tip/mpack_id
8 FMC_13 Unit: cache hit returns presigned URL without entering blob-load path
9 FMC_14 Unit: cache miss builds and writes cache row
10 FMC_18 Integration: enqueue_push_intel inserts fetch.mpack.prebuild with branch tips
11 FMC_20 Unit: gc_fetch_mpack_cache deletes expired rows + R2 objects; fresh rows untouched
12 """
13 from __future__ import annotations
14
15 import hashlib
16 from datetime import datetime, timedelta, timezone
17 from unittest.mock import AsyncMock, patch
18
19 import pytest
20 from sqlalchemy import select
21 from sqlalchemy.ext.asyncio import AsyncSession
22
23 from musehub.core.genesis import compute_job_id
24 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
25 from musehub.db.musehub_repo_models import MusehubFetchMPackCache
26 from musehub.services.musehub_gc import gc_fetch_mpack_cache
27 from musehub.services.musehub_jobs import enqueue_push_intel
28 from musehub.services.musehub_wire_fetch import process_fetch_mpack_prebuild_job, wire_fetch_mpack
29 from tests.factories import create_branch, create_repo
30
31
32 def _now() -> datetime:
33 return datetime.now(tz=timezone.utc)
34
35
36 def _fake_commit_id(seed: str) -> str:
37 return "sha256:" + hashlib.sha256(seed.encode()).hexdigest()
38
39
40 def _fake_mpack_id(seed: str) -> str:
41 return "sha256:" + hashlib.sha256(f"mpack-{seed}".encode()).hexdigest()
42
43
44 async def _insert_job(
45 session: AsyncSession,
46 repo_id: str,
47 tip_commit_ids: list[str],
48 ) -> str:
49 now = _now()
50 job_id = compute_job_id(repo_id, "fetch.mpack.prebuild", now.isoformat())
51 session.add(MusehubBackgroundJob(
52 job_id=job_id,
53 repo_id=repo_id,
54 job_type="fetch.mpack.prebuild",
55 payload={"tip_commit_ids": tip_commit_ids},
56 status="pending",
57 created_at=now,
58 attempt=0,
59 ))
60 await session.flush()
61 return job_id
62
63
64 # ── FMC_07 ────────────────────────────────────────────────────────────────────
65
66 @pytest.mark.tier2
67 async def test_fmc_07_builds_each_tip_and_writes_cache(db_session: AsyncSession) -> None:
68 """FMC_07a: handler calls wire_fetch_mpack once per tip and writes cache rows."""
69 repo = await create_repo(db_session, owner="gabriel", visibility="public")
70 tip_a = _fake_commit_id("tip-a")
71 tip_b = _fake_commit_id("tip-b")
72 mpack_a = _fake_mpack_id("tip-a")
73 mpack_b = _fake_mpack_id("tip-b")
74
75 job_id = await _insert_job(db_session, repo.repo_id, [tip_a, tip_b])
76 await db_session.commit()
77
78 side_effects = [
79 {"mpack_id": mpack_a, "mpack_url": "https://r2.example/a", "commit_count": 1, "blob_count": 2},
80 {"mpack_id": mpack_b, "mpack_url": "https://r2.example/b", "commit_count": 1, "blob_count": 3},
81 ]
82
83 with patch(
84 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
85 new_callable=AsyncMock,
86 side_effect=side_effects,
87 ) as mock_build:
88 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
89 await db_session.commit()
90
91 assert mock_build.call_count == 2
92 assert result["tips_requested"] == 2
93 assert result["tips_built"] == 2
94 assert result["tips_skipped"] == 0
95
96 rows = (await db_session.execute(
97 select(MusehubFetchMPackCache)
98 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
99 .order_by(MusehubFetchMPackCache.tip_commit_id)
100 )).scalars().all()
101 assert len(rows) == 2
102 mpack_ids = {r.tip_commit_id: r.mpack_id for r in rows}
103 assert mpack_ids[tip_a] == mpack_a
104 assert mpack_ids[tip_b] == mpack_b
105
106
107 @pytest.mark.tier2
108 async def test_fmc_07b_skips_tips_with_fresh_cache(db_session: AsyncSession) -> None:
109 """FMC_07b: tips with a non-expired cache entry are skipped without calling wire_fetch_mpack."""
110 repo = await create_repo(db_session, owner="gabriel", visibility="public")
111 tip_cached = _fake_commit_id("tip-cached")
112 tip_new = _fake_commit_id("tip-new")
113 existing_mpack = _fake_mpack_id("existing")
114 new_mpack = _fake_mpack_id("new")
115
116 # Pre-populate a fresh cache entry for tip_cached.
117 cache_id = hashlib.sha256((repo.repo_id + tip_cached).encode()).hexdigest()
118 db_session.add(MusehubFetchMPackCache(
119 cache_id=cache_id,
120 repo_id=repo.repo_id,
121 tip_commit_id=tip_cached,
122 mpack_id=existing_mpack,
123 created_at=_now(),
124 expires_at=_now() + timedelta(days=7),
125 ))
126 job_id = await _insert_job(db_session, repo.repo_id, [tip_cached, tip_new])
127 await db_session.commit()
128
129 with patch(
130 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
131 new_callable=AsyncMock,
132 return_value={"mpack_id": new_mpack, "mpack_url": "https://r2.example/new", "commit_count": 1, "blob_count": 1},
133 ) as mock_build:
134 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
135 await db_session.commit()
136
137 # Only the new tip should have triggered a build.
138 assert mock_build.call_count == 1
139 assert mock_build.call_args[1]["want"] == [tip_new] or mock_build.call_args[0][2] == [tip_new]
140 assert result["tips_built"] == 1
141 assert result["tips_skipped"] == 1
142
143 # The pre-existing cache entry must be unchanged.
144 cached_row = (await db_session.execute(
145 select(MusehubFetchMPackCache)
146 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
147 .where(MusehubFetchMPackCache.tip_commit_id == tip_cached)
148 )).scalar_one()
149 assert cached_row.mpack_id == existing_mpack
150
151
152 @pytest.mark.tier2
153 async def test_fmc_07c_empty_payload_is_a_noop(db_session: AsyncSession) -> None:
154 """FMC_07c: job with no tip_commit_ids returns zeros without calling wire_fetch_mpack."""
155 repo = await create_repo(db_session, owner="gabriel", visibility="public")
156 job_id = await _insert_job(db_session, repo.repo_id, [])
157 await db_session.commit()
158
159 with patch(
160 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
161 new_callable=AsyncMock,
162 ) as mock_build:
163 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
164
165 assert mock_build.call_count == 0
166 assert result["tips_requested"] == 0
167 assert result["tips_built"] == 0
168
169
170 # ── FMC_08 ────────────────────────────────────────────────────────────────────
171
172 @pytest.mark.tier2
173 async def test_fmc_08_cache_row_has_correct_fields(db_session: AsyncSession) -> None:
174 """FMC_08: after the handler runs, cache row has matching repo_id, tip, and mpack_id."""
175 repo = await create_repo(db_session, owner="gabriel", visibility="public")
176 tip = _fake_commit_id("integration-tip")
177 mpack = _fake_mpack_id("integration")
178
179 job_id = await _insert_job(db_session, repo.repo_id, [tip])
180 await db_session.commit()
181
182 with patch(
183 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
184 new_callable=AsyncMock,
185 return_value={"mpack_id": mpack, "mpack_url": "https://r2.example/int", "commit_count": 5, "blob_count": 10},
186 ):
187 await process_fetch_mpack_prebuild_job(db_session, job_id)
188 await db_session.commit()
189
190 row = (await db_session.execute(
191 select(MusehubFetchMPackCache)
192 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
193 .where(MusehubFetchMPackCache.tip_commit_id == tip)
194 )).scalar_one()
195
196 assert row.repo_id == repo.repo_id
197 assert row.tip_commit_id == tip
198 assert row.mpack_id == mpack
199 assert row.expires_at > _now()
200
201
202 # ── FMC_13 ────────────────────────────────────────────────────────────────────
203
204 @pytest.mark.tier2
205 async def test_fmc_13_cache_hit_returns_presigned_url_without_blob_load(db_session: AsyncSession) -> None:
206 """FMC_13: cache hit returns the presigned URL immediately; blob-load path is never entered."""
207 repo = await create_repo(db_session, owner="gabriel", visibility="public")
208 tip = _fake_commit_id("fmc13-tip")
209 cached_mpack = _fake_mpack_id("fmc13-tip")
210 expected_url = "https://r2.example/cached-fmc13"
211
212 # Pre-populate a fresh cache entry.
213 cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest()
214 db_session.add(MusehubFetchMPackCache(
215 cache_id=cache_id,
216 repo_id=repo.repo_id,
217 tip_commit_id=tip,
218 mpack_id=cached_mpack,
219 created_at=_now(),
220 expires_at=_now() + timedelta(days=7),
221 ))
222 await db_session.commit()
223
224 mock_backend = AsyncMock()
225 mock_backend.presign_mpack_get.return_value = expected_url
226
227 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
228 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock) as mock_walk:
229 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
230
231 # The expensive DAG walk must never have been called.
232 mock_walk.assert_not_called()
233 # The returned URL must match the presigned URL for the cached mpack.
234 assert result["mpack_url"] == expected_url
235 assert result["mpack_id"] == cached_mpack
236
237
238 # ── FMC_14 ────────────────────────────────────────────────────────────────────
239
240 @pytest.mark.tier2
241 async def test_fmc_14_cache_miss_builds_and_writes_cache_row(db_session: AsyncSession) -> None:
242 """FMC_14: on a cache miss, wire_fetch_mpack builds the mpack and writes a cache row."""
243 from types import SimpleNamespace
244
245 repo = await create_repo(db_session, owner="gabriel", visibility="public")
246 tip = _fake_commit_id("fmc14-tip")
247 built_mpack = _fake_mpack_id("fmc14-tip")
248 built_url = "https://r2.example/built-fmc14"
249
250 # No cache row exists — this is a cold miss.
251 mock_backend = AsyncMock()
252 mock_backend.put_mpack.return_value = None
253 mock_backend.presign_mpack_get.return_value = built_url
254 mock_backend.delete.return_value = None
255
256 # _walk_commit_delta returns one proxy commit with no snapshot so that
257 # wire_fetch_mpack proceeds to build an empty-but-valid mpack without
258 # needing real commit/snapshot/object rows in the DB.
259 fake_proxy = SimpleNamespace(commit_id=tip, snapshot_id=None, parent_ids=[])
260
261 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
262 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock,
263 return_value={tip: fake_proxy}), \
264 patch("muse.core.mpack.build_wire_mpack", return_value=b"MUSE\x00fake-mpack"), \
265 patch("musehub.services.musehub_wire_fetch.blob_id", return_value=built_mpack):
266 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
267 await db_session.commit()
268
269 assert result["mpack_url"] == built_url
270 assert result["mpack_id"] == built_mpack
271
272 # The cache row must have been written.
273 row = (await db_session.execute(
274 select(MusehubFetchMPackCache)
275 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
276 .where(MusehubFetchMPackCache.tip_commit_id == tip)
277 )).scalar_one()
278 assert row.mpack_id == built_mpack
279 assert row.expires_at > _now()
280
281
282 # ── FMC_18 ────────────────────────────────────────────────────────────────────
283
284 @pytest.mark.tier2
285 async def test_fmc_18_enqueue_push_intel_creates_prebuild_job_with_branch_tips(
286 db_session: AsyncSession,
287 ) -> None:
288 """FMC_18: enqueue_push_intel enqueues fetch.mpack.prebuild with all branch tip commit IDs."""
289 repo = await create_repo(db_session, owner="gabriel", visibility="public")
290
291 tip_a = _fake_commit_id("branch-main")
292 tip_b = _fake_commit_id("branch-dev")
293 await create_branch(db_session, repo.repo_id, name="main", head_commit_id=tip_a)
294 await create_branch(db_session, repo.repo_id, name="dev", head_commit_id=tip_b)
295
296 await enqueue_push_intel(
297 db_session,
298 repo.repo_id,
299 head=tip_a,
300 domain_id=None,
301 branch="main",
302 )
303 await db_session.commit()
304
305 job_row = (await db_session.execute(
306 select(MusehubBackgroundJob)
307 .where(MusehubBackgroundJob.repo_id == repo.repo_id)
308 .where(MusehubBackgroundJob.job_type == "fetch.mpack.prebuild")
309 .where(MusehubBackgroundJob.status == "pending")
310 )).scalar_one()
311
312 tip_ids = set(job_row.payload.get("tip_commit_ids", []))
313 assert tip_a in tip_ids, f"main tip {tip_a[:20]} missing from payload"
314 assert tip_b in tip_ids, f"dev tip {tip_b[:20]} missing from payload"
315
316
317 # ── FMC_20 ────────────────────────────────────────────────────────────────────
318
319 @pytest.mark.tier2
320 async def test_fmc_20_gc_deletes_expired_rows_and_r2_objects_leaves_fresh_untouched(
321 db_session: AsyncSession,
322 ) -> None:
323 """FMC_20: gc_fetch_mpack_cache deletes expired rows + R2 objects; fresh rows survive."""
324 repo = await create_repo(db_session, owner="gabriel", visibility="public")
325
326 tip_expired_a = _fake_commit_id("expired-a")
327 tip_expired_b = _fake_commit_id("expired-b")
328 tip_fresh = _fake_commit_id("fresh")
329
330 mpack_expired_a = _fake_mpack_id("expired-a")
331 mpack_expired_b = _fake_mpack_id("expired-b")
332 mpack_fresh = _fake_mpack_id("fresh")
333
334 past = _now() - timedelta(days=1)
335 future = _now() + timedelta(days=6)
336
337 for tip, mpack, exp in [
338 (tip_expired_a, mpack_expired_a, past),
339 (tip_expired_b, mpack_expired_b, past),
340 (tip_fresh, mpack_fresh, future),
341 ]:
342 cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest()
343 db_session.add(MusehubFetchMPackCache(
344 cache_id=cache_id,
345 repo_id=repo.repo_id,
346 tip_commit_id=tip,
347 mpack_id=mpack,
348 created_at=_now(),
349 expires_at=exp,
350 ))
351 await db_session.commit()
352
353 mock_backend = AsyncMock()
354 mock_backend.delete.return_value = None
355
356 with patch("musehub.services.musehub_gc.get_backend", return_value=mock_backend):
357 n_deleted = await gc_fetch_mpack_cache(db_session, repo.repo_id)
358 await db_session.commit()
359
360 assert n_deleted == 2
361
362 # R2 delete called exactly once for each expired mpack — in any order.
363 deleted_mpack_ids = {call.args[0] for call in mock_backend.delete.call_args_list}
364 assert mpack_expired_a in deleted_mpack_ids
365 assert mpack_expired_b in deleted_mpack_ids
366 assert mpack_fresh not in deleted_mpack_ids
367
368 # Expired rows gone from DB.
369 remaining = (await db_session.execute(
370 select(MusehubFetchMPackCache)
371 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
372 )).scalars().all()
373 remaining_tips = {r.tip_commit_id for r in remaining}
374 assert tip_expired_a not in remaining_tips
375 assert tip_expired_b not in remaining_tips
376
377 # Fresh row survives.
378 assert tip_fresh in remaining_tips
File History 4 commits
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 2 days ago
sha256:d50f9cf9829dfbe35721a23b81ad256c729ddf9dd565a0a9e56d27847e255632 feat(#92): phase 4 — enqueue fetch.mpack.prebuild on push (… Sonnet 4.6 patch 2 days ago
sha256:1c5b7a0aba79472f4b10e52326dc010bdab1a498c9e195593d0707860478a034 feat(#92): phase 3 — cache lookup in wire_fetch_mpack (FMC_… Sonnet 4.6 patch 2 days ago
sha256:0e447fc3f6b7887d5d9e86b557c659ef7d0b05e2e09ddb0cb551ada240e48a51 feat(phase2): fetch.mpack.prebuild job handler + worker dis… Sonnet 4.6 patch 2 days ago