gabriel / musehub public
test_fetch_mpack_prebuild.py python
311 lines 12.8 KB
Raw
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 4 days ago
1 """TDD — fetch.mpack.prebuild job handler and wire_fetch_mpack cache (issue #92 Phases 2–3).
2
3 Test IDs:
4 FMC_07 Unit: mock wire_fetch_mpack, confirm cache rows written for each tip,
5 confirm existing fresh entries are skipped
6 FMC_08 Integration: insert a job row, run the handler, verify
7 MusehubFetchMPackCache row exists with correct repo_id/tip/mpack_id
8 FMC_13 Unit: cache hit returns presigned URL without entering blob-load path
9 FMC_14 Unit: cache miss builds and writes cache row
10 """
11 from __future__ import annotations
12
13 import hashlib
14 from datetime import datetime, timedelta, timezone
15 from unittest.mock import AsyncMock, patch
16
17 import pytest
18 from sqlalchemy import select
19 from sqlalchemy.ext.asyncio import AsyncSession
20
21 from musehub.core.genesis import compute_job_id
22 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
23 from musehub.db.musehub_repo_models import MusehubFetchMPackCache
24 from musehub.services.musehub_jobs import enqueue_push_intel
25 from musehub.services.musehub_wire_fetch import process_fetch_mpack_prebuild_job, wire_fetch_mpack
26 from tests.factories import create_branch, create_repo
27
28
29 def _now() -> datetime:
30 return datetime.now(tz=timezone.utc)
31
32
33 def _fake_commit_id(seed: str) -> str:
34 return "sha256:" + hashlib.sha256(seed.encode()).hexdigest()
35
36
37 def _fake_mpack_id(seed: str) -> str:
38 return "sha256:" + hashlib.sha256(f"mpack-{seed}".encode()).hexdigest()
39
40
41 async def _insert_job(
42 session: AsyncSession,
43 repo_id: str,
44 tip_commit_ids: list[str],
45 ) -> str:
46 now = _now()
47 job_id = compute_job_id(repo_id, "fetch.mpack.prebuild", now.isoformat())
48 session.add(MusehubBackgroundJob(
49 job_id=job_id,
50 repo_id=repo_id,
51 job_type="fetch.mpack.prebuild",
52 payload={"tip_commit_ids": tip_commit_ids},
53 status="pending",
54 created_at=now,
55 attempt=0,
56 ))
57 await session.flush()
58 return job_id
59
60
61 # ── FMC_07 ────────────────────────────────────────────────────────────────────
62
63 @pytest.mark.tier2
64 async def test_fmc_07_builds_each_tip_and_writes_cache(db_session: AsyncSession) -> None:
65 """FMC_07a: handler calls wire_fetch_mpack once per tip and writes cache rows."""
66 repo = await create_repo(db_session, owner="gabriel", visibility="public")
67 tip_a = _fake_commit_id("tip-a")
68 tip_b = _fake_commit_id("tip-b")
69 mpack_a = _fake_mpack_id("tip-a")
70 mpack_b = _fake_mpack_id("tip-b")
71
72 job_id = await _insert_job(db_session, repo.repo_id, [tip_a, tip_b])
73 await db_session.commit()
74
75 side_effects = [
76 {"mpack_id": mpack_a, "mpack_url": "https://r2.example/a", "commit_count": 1, "blob_count": 2},
77 {"mpack_id": mpack_b, "mpack_url": "https://r2.example/b", "commit_count": 1, "blob_count": 3},
78 ]
79
80 with patch(
81 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
82 new_callable=AsyncMock,
83 side_effect=side_effects,
84 ) as mock_build:
85 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
86 await db_session.commit()
87
88 assert mock_build.call_count == 2
89 assert result["tips_requested"] == 2
90 assert result["tips_built"] == 2
91 assert result["tips_skipped"] == 0
92
93 rows = (await db_session.execute(
94 select(MusehubFetchMPackCache)
95 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
96 .order_by(MusehubFetchMPackCache.tip_commit_id)
97 )).scalars().all()
98 assert len(rows) == 2
99 mpack_ids = {r.tip_commit_id: r.mpack_id for r in rows}
100 assert mpack_ids[tip_a] == mpack_a
101 assert mpack_ids[tip_b] == mpack_b
102
103
104 @pytest.mark.tier2
105 async def test_fmc_07b_skips_tips_with_fresh_cache(db_session: AsyncSession) -> None:
106 """FMC_07b: tips with a non-expired cache entry are skipped without calling wire_fetch_mpack."""
107 repo = await create_repo(db_session, owner="gabriel", visibility="public")
108 tip_cached = _fake_commit_id("tip-cached")
109 tip_new = _fake_commit_id("tip-new")
110 existing_mpack = _fake_mpack_id("existing")
111 new_mpack = _fake_mpack_id("new")
112
113 # Pre-populate a fresh cache entry for tip_cached.
114 cache_id = hashlib.sha256((repo.repo_id + tip_cached).encode()).hexdigest()
115 db_session.add(MusehubFetchMPackCache(
116 cache_id=cache_id,
117 repo_id=repo.repo_id,
118 tip_commit_id=tip_cached,
119 mpack_id=existing_mpack,
120 created_at=_now(),
121 expires_at=_now() + timedelta(days=7),
122 ))
123 job_id = await _insert_job(db_session, repo.repo_id, [tip_cached, tip_new])
124 await db_session.commit()
125
126 with patch(
127 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
128 new_callable=AsyncMock,
129 return_value={"mpack_id": new_mpack, "mpack_url": "https://r2.example/new", "commit_count": 1, "blob_count": 1},
130 ) as mock_build:
131 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
132 await db_session.commit()
133
134 # Only the new tip should have triggered a build.
135 assert mock_build.call_count == 1
136 assert mock_build.call_args[1]["want"] == [tip_new] or mock_build.call_args[0][2] == [tip_new]
137 assert result["tips_built"] == 1
138 assert result["tips_skipped"] == 1
139
140 # The pre-existing cache entry must be unchanged.
141 cached_row = (await db_session.execute(
142 select(MusehubFetchMPackCache)
143 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
144 .where(MusehubFetchMPackCache.tip_commit_id == tip_cached)
145 )).scalar_one()
146 assert cached_row.mpack_id == existing_mpack
147
148
149 @pytest.mark.tier2
150 async def test_fmc_07c_empty_payload_is_a_noop(db_session: AsyncSession) -> None:
151 """FMC_07c: job with no tip_commit_ids returns zeros without calling wire_fetch_mpack."""
152 repo = await create_repo(db_session, owner="gabriel", visibility="public")
153 job_id = await _insert_job(db_session, repo.repo_id, [])
154 await db_session.commit()
155
156 with patch(
157 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
158 new_callable=AsyncMock,
159 ) as mock_build:
160 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
161
162 assert mock_build.call_count == 0
163 assert result["tips_requested"] == 0
164 assert result["tips_built"] == 0
165
166
167 # ── FMC_08 ────────────────────────────────────────────────────────────────────
168
169 @pytest.mark.tier2
170 async def test_fmc_08_cache_row_has_correct_fields(db_session: AsyncSession) -> None:
171 """FMC_08: after the handler runs, cache row has matching repo_id, tip, and mpack_id."""
172 repo = await create_repo(db_session, owner="gabriel", visibility="public")
173 tip = _fake_commit_id("integration-tip")
174 mpack = _fake_mpack_id("integration")
175
176 job_id = await _insert_job(db_session, repo.repo_id, [tip])
177 await db_session.commit()
178
179 with patch(
180 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
181 new_callable=AsyncMock,
182 return_value={"mpack_id": mpack, "mpack_url": "https://r2.example/int", "commit_count": 5, "blob_count": 10},
183 ):
184 await process_fetch_mpack_prebuild_job(db_session, job_id)
185 await db_session.commit()
186
187 row = (await db_session.execute(
188 select(MusehubFetchMPackCache)
189 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
190 .where(MusehubFetchMPackCache.tip_commit_id == tip)
191 )).scalar_one()
192
193 assert row.repo_id == repo.repo_id
194 assert row.tip_commit_id == tip
195 assert row.mpack_id == mpack
196 assert row.expires_at > _now()
197
198
199 # ── FMC_13 ────────────────────────────────────────────────────────────────────
200
201 @pytest.mark.tier2
202 async def test_fmc_13_cache_hit_returns_presigned_url_without_blob_load(db_session: AsyncSession) -> None:
203 """FMC_13: cache hit returns the presigned URL immediately; blob-load path is never entered."""
204 repo = await create_repo(db_session, owner="gabriel", visibility="public")
205 tip = _fake_commit_id("fmc13-tip")
206 cached_mpack = _fake_mpack_id("fmc13-tip")
207 expected_url = "https://r2.example/cached-fmc13"
208
209 # Pre-populate a fresh cache entry.
210 cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest()
211 db_session.add(MusehubFetchMPackCache(
212 cache_id=cache_id,
213 repo_id=repo.repo_id,
214 tip_commit_id=tip,
215 mpack_id=cached_mpack,
216 created_at=_now(),
217 expires_at=_now() + timedelta(days=7),
218 ))
219 await db_session.commit()
220
221 mock_backend = AsyncMock()
222 mock_backend.presign_mpack_get.return_value = expected_url
223
224 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
225 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock) as mock_walk:
226 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
227
228 # The expensive DAG walk must never have been called.
229 mock_walk.assert_not_called()
230 # The returned URL must match the presigned URL for the cached mpack.
231 assert result["mpack_url"] == expected_url
232 assert result["mpack_id"] == cached_mpack
233
234
235 # ── FMC_14 ────────────────────────────────────────────────────────────────────
236
237 @pytest.mark.tier2
238 async def test_fmc_14_cache_miss_builds_and_writes_cache_row(db_session: AsyncSession) -> None:
239 """FMC_14: on a cache miss, wire_fetch_mpack builds the mpack and writes a cache row."""
240 from types import SimpleNamespace
241
242 repo = await create_repo(db_session, owner="gabriel", visibility="public")
243 tip = _fake_commit_id("fmc14-tip")
244 built_mpack = _fake_mpack_id("fmc14-tip")
245 built_url = "https://r2.example/built-fmc14"
246
247 # No cache row exists — this is a cold miss.
248 mock_backend = AsyncMock()
249 mock_backend.put_mpack.return_value = None
250 mock_backend.presign_mpack_get.return_value = built_url
251 mock_backend.delete.return_value = None
252
253 # _walk_commit_delta returns one proxy commit with no snapshot so that
254 # wire_fetch_mpack proceeds to build an empty-but-valid mpack without
255 # needing real commit/snapshot/object rows in the DB.
256 fake_proxy = SimpleNamespace(commit_id=tip, snapshot_id=None, parent_ids=[])
257
258 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
259 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock,
260 return_value={tip: fake_proxy}), \
261 patch("muse.core.mpack.build_wire_mpack", return_value=b"MUSE\x00fake-mpack"), \
262 patch("musehub.services.musehub_wire_fetch.blob_id", return_value=built_mpack):
263 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
264 await db_session.commit()
265
266 assert result["mpack_url"] == built_url
267 assert result["mpack_id"] == built_mpack
268
269 # The cache row must have been written.
270 row = (await db_session.execute(
271 select(MusehubFetchMPackCache)
272 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
273 .where(MusehubFetchMPackCache.tip_commit_id == tip)
274 )).scalar_one()
275 assert row.mpack_id == built_mpack
276 assert row.expires_at > _now()
277
278
279 # ── FMC_18 ────────────────────────────────────────────────────────────────────
280
281 @pytest.mark.tier2
282 async def test_fmc_18_enqueue_push_intel_creates_prebuild_job_with_branch_tips(
283 db_session: AsyncSession,
284 ) -> None:
285 """FMC_18: enqueue_push_intel enqueues fetch.mpack.prebuild with all branch tip commit IDs."""
286 repo = await create_repo(db_session, owner="gabriel", visibility="public")
287
288 tip_a = _fake_commit_id("branch-main")
289 tip_b = _fake_commit_id("branch-dev")
290 await create_branch(db_session, repo.repo_id, name="main", head_commit_id=tip_a)
291 await create_branch(db_session, repo.repo_id, name="dev", head_commit_id=tip_b)
292
293 await enqueue_push_intel(
294 db_session,
295 repo.repo_id,
296 head=tip_a,
297 domain_id=None,
298 branch="main",
299 )
300 await db_session.commit()
301
302 job_row = (await db_session.execute(
303 select(MusehubBackgroundJob)
304 .where(MusehubBackgroundJob.repo_id == repo.repo_id)
305 .where(MusehubBackgroundJob.job_type == "fetch.mpack.prebuild")
306 .where(MusehubBackgroundJob.status == "pending")
307 )).scalar_one()
308
309 tip_ids = set(job_row.payload.get("tip_commit_ids", []))
310 assert tip_a in tip_ids, f"main tip {tip_a[:20]} missing from payload"
311 assert tip_b in tip_ids, f"dev tip {tip_b[:20]} missing from payload"
File History 4 commits
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 4 days ago
sha256:d50f9cf9829dfbe35721a23b81ad256c729ddf9dd565a0a9e56d27847e255632 feat(#92): phase 4 — enqueue fetch.mpack.prebuild on push (… Sonnet 4.6 patch 4 days ago
sha256:1c5b7a0aba79472f4b10e52326dc010bdab1a498c9e195593d0707860478a034 feat(#92): phase 3 — cache lookup in wire_fetch_mpack (FMC_… Sonnet 4.6 patch 4 days ago
sha256:0e447fc3f6b7887d5d9e86b557c659ef7d0b05e2e09ddb0cb551ada240e48a51 feat(phase2): fetch.mpack.prebuild job handler + worker dis… Sonnet 4.6 patch 4 days ago