gabriel / musehub public
test_fetch_mpack_prebuild.py python
275 lines 11.3 KB
Raw
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 7 days ago
1 """TDD — fetch.mpack.prebuild job handler and wire_fetch_mpack cache (issue #92 Phases 2–3).
2
3 Test IDs:
4 FMC_07 Unit: mock wire_fetch_mpack, confirm cache rows written for each tip,
5 confirm existing fresh entries are skipped
6 FMC_08 Integration: insert a job row, run the handler, verify
7 MusehubFetchMPackCache row exists with correct repo_id/tip/mpack_id
8 FMC_13 Unit: cache hit returns presigned URL without entering blob-load path
9 FMC_14 Unit: cache miss builds and writes cache row
10 """
11 from __future__ import annotations
12
13 import hashlib
14 from datetime import datetime, timedelta, timezone
15 from unittest.mock import AsyncMock, patch
16
17 import pytest
18 from sqlalchemy import select
19 from sqlalchemy.ext.asyncio import AsyncSession
20
21 from musehub.core.genesis import compute_job_id
22 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
23 from musehub.db.musehub_repo_models import MusehubFetchMPackCache
24 from musehub.services.musehub_wire_fetch import process_fetch_mpack_prebuild_job, wire_fetch_mpack
25 from tests.factories import create_repo
26
27
28 def _now() -> datetime:
29 return datetime.now(tz=timezone.utc)
30
31
32 def _fake_commit_id(seed: str) -> str:
33 return "sha256:" + hashlib.sha256(seed.encode()).hexdigest()
34
35
36 def _fake_mpack_id(seed: str) -> str:
37 return "sha256:" + hashlib.sha256(f"mpack-{seed}".encode()).hexdigest()
38
39
40 async def _insert_job(
41 session: AsyncSession,
42 repo_id: str,
43 tip_commit_ids: list[str],
44 ) -> str:
45 now = _now()
46 job_id = compute_job_id(repo_id, "fetch.mpack.prebuild", now.isoformat())
47 session.add(MusehubBackgroundJob(
48 job_id=job_id,
49 repo_id=repo_id,
50 job_type="fetch.mpack.prebuild",
51 payload={"tip_commit_ids": tip_commit_ids},
52 status="pending",
53 created_at=now,
54 attempt=0,
55 ))
56 await session.flush()
57 return job_id
58
59
60 # ── FMC_07 ────────────────────────────────────────────────────────────────────
61
62 @pytest.mark.tier2
63 async def test_fmc_07_builds_each_tip_and_writes_cache(db_session: AsyncSession) -> None:
64 """FMC_07a: handler calls wire_fetch_mpack once per tip and writes cache rows."""
65 repo = await create_repo(db_session, owner="gabriel", visibility="public")
66 tip_a = _fake_commit_id("tip-a")
67 tip_b = _fake_commit_id("tip-b")
68 mpack_a = _fake_mpack_id("tip-a")
69 mpack_b = _fake_mpack_id("tip-b")
70
71 job_id = await _insert_job(db_session, repo.repo_id, [tip_a, tip_b])
72 await db_session.commit()
73
74 side_effects = [
75 {"mpack_id": mpack_a, "mpack_url": "https://r2.example/a", "commit_count": 1, "blob_count": 2},
76 {"mpack_id": mpack_b, "mpack_url": "https://r2.example/b", "commit_count": 1, "blob_count": 3},
77 ]
78
79 with patch(
80 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
81 new_callable=AsyncMock,
82 side_effect=side_effects,
83 ) as mock_build:
84 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
85 await db_session.commit()
86
87 assert mock_build.call_count == 2
88 assert result["tips_requested"] == 2
89 assert result["tips_built"] == 2
90 assert result["tips_skipped"] == 0
91
92 rows = (await db_session.execute(
93 select(MusehubFetchMPackCache)
94 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
95 .order_by(MusehubFetchMPackCache.tip_commit_id)
96 )).scalars().all()
97 assert len(rows) == 2
98 mpack_ids = {r.tip_commit_id: r.mpack_id for r in rows}
99 assert mpack_ids[tip_a] == mpack_a
100 assert mpack_ids[tip_b] == mpack_b
101
102
103 @pytest.mark.tier2
104 async def test_fmc_07b_skips_tips_with_fresh_cache(db_session: AsyncSession) -> None:
105 """FMC_07b: tips with a non-expired cache entry are skipped without calling wire_fetch_mpack."""
106 repo = await create_repo(db_session, owner="gabriel", visibility="public")
107 tip_cached = _fake_commit_id("tip-cached")
108 tip_new = _fake_commit_id("tip-new")
109 existing_mpack = _fake_mpack_id("existing")
110 new_mpack = _fake_mpack_id("new")
111
112 # Pre-populate a fresh cache entry for tip_cached.
113 cache_id = hashlib.sha256((repo.repo_id + tip_cached).encode()).hexdigest()
114 db_session.add(MusehubFetchMPackCache(
115 cache_id=cache_id,
116 repo_id=repo.repo_id,
117 tip_commit_id=tip_cached,
118 mpack_id=existing_mpack,
119 created_at=_now(),
120 expires_at=_now() + timedelta(days=7),
121 ))
122 job_id = await _insert_job(db_session, repo.repo_id, [tip_cached, tip_new])
123 await db_session.commit()
124
125 with patch(
126 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
127 new_callable=AsyncMock,
128 return_value={"mpack_id": new_mpack, "mpack_url": "https://r2.example/new", "commit_count": 1, "blob_count": 1},
129 ) as mock_build:
130 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
131 await db_session.commit()
132
133 # Only the new tip should have triggered a build.
134 assert mock_build.call_count == 1
135 assert mock_build.call_args[1]["want"] == [tip_new] or mock_build.call_args[0][2] == [tip_new]
136 assert result["tips_built"] == 1
137 assert result["tips_skipped"] == 1
138
139 # The pre-existing cache entry must be unchanged.
140 cached_row = (await db_session.execute(
141 select(MusehubFetchMPackCache)
142 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
143 .where(MusehubFetchMPackCache.tip_commit_id == tip_cached)
144 )).scalar_one()
145 assert cached_row.mpack_id == existing_mpack
146
147
148 @pytest.mark.tier2
149 async def test_fmc_07c_empty_payload_is_a_noop(db_session: AsyncSession) -> None:
150 """FMC_07c: job with no tip_commit_ids returns zeros without calling wire_fetch_mpack."""
151 repo = await create_repo(db_session, owner="gabriel", visibility="public")
152 job_id = await _insert_job(db_session, repo.repo_id, [])
153 await db_session.commit()
154
155 with patch(
156 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
157 new_callable=AsyncMock,
158 ) as mock_build:
159 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
160
161 assert mock_build.call_count == 0
162 assert result["tips_requested"] == 0
163 assert result["tips_built"] == 0
164
165
166 # ── FMC_08 ────────────────────────────────────────────────────────────────────
167
168 @pytest.mark.tier2
169 async def test_fmc_08_cache_row_has_correct_fields(db_session: AsyncSession) -> None:
170 """FMC_08: after the handler runs, cache row has matching repo_id, tip, and mpack_id."""
171 repo = await create_repo(db_session, owner="gabriel", visibility="public")
172 tip = _fake_commit_id("integration-tip")
173 mpack = _fake_mpack_id("integration")
174
175 job_id = await _insert_job(db_session, repo.repo_id, [tip])
176 await db_session.commit()
177
178 with patch(
179 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
180 new_callable=AsyncMock,
181 return_value={"mpack_id": mpack, "mpack_url": "https://r2.example/int", "commit_count": 5, "blob_count": 10},
182 ):
183 await process_fetch_mpack_prebuild_job(db_session, job_id)
184 await db_session.commit()
185
186 row = (await db_session.execute(
187 select(MusehubFetchMPackCache)
188 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
189 .where(MusehubFetchMPackCache.tip_commit_id == tip)
190 )).scalar_one()
191
192 assert row.repo_id == repo.repo_id
193 assert row.tip_commit_id == tip
194 assert row.mpack_id == mpack
195 assert row.expires_at > _now()
196
197
198 # ── FMC_13 ────────────────────────────────────────────────────────────────────
199
200 @pytest.mark.tier2
201 async def test_fmc_13_cache_hit_returns_presigned_url_without_blob_load(db_session: AsyncSession) -> None:
202 """FMC_13: cache hit returns the presigned URL immediately; blob-load path is never entered."""
203 repo = await create_repo(db_session, owner="gabriel", visibility="public")
204 tip = _fake_commit_id("fmc13-tip")
205 cached_mpack = _fake_mpack_id("fmc13-tip")
206 expected_url = "https://r2.example/cached-fmc13"
207
208 # Pre-populate a fresh cache entry.
209 cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest()
210 db_session.add(MusehubFetchMPackCache(
211 cache_id=cache_id,
212 repo_id=repo.repo_id,
213 tip_commit_id=tip,
214 mpack_id=cached_mpack,
215 created_at=_now(),
216 expires_at=_now() + timedelta(days=7),
217 ))
218 await db_session.commit()
219
220 mock_backend = AsyncMock()
221 mock_backend.presign_mpack_get.return_value = expected_url
222
223 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
224 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock) as mock_walk:
225 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
226
227 # The expensive DAG walk must never have been called.
228 mock_walk.assert_not_called()
229 # The returned URL must match the presigned URL for the cached mpack.
230 assert result["mpack_url"] == expected_url
231 assert result["mpack_id"] == cached_mpack
232
233
234 # ── FMC_14 ────────────────────────────────────────────────────────────────────
235
236 @pytest.mark.tier2
237 async def test_fmc_14_cache_miss_builds_and_writes_cache_row(db_session: AsyncSession) -> None:
238 """FMC_14: on a cache miss, wire_fetch_mpack builds the mpack and writes a cache row."""
239 from types import SimpleNamespace
240
241 repo = await create_repo(db_session, owner="gabriel", visibility="public")
242 tip = _fake_commit_id("fmc14-tip")
243 built_mpack = _fake_mpack_id("fmc14-tip")
244 built_url = "https://r2.example/built-fmc14"
245
246 # No cache row exists — this is a cold miss.
247 mock_backend = AsyncMock()
248 mock_backend.put_mpack.return_value = None
249 mock_backend.presign_mpack_get.return_value = built_url
250 mock_backend.delete.return_value = None
251
252 # _walk_commit_delta returns one proxy commit with no snapshot so that
253 # wire_fetch_mpack proceeds to build an empty-but-valid mpack without
254 # needing real commit/snapshot/object rows in the DB.
255 fake_proxy = SimpleNamespace(commit_id=tip, snapshot_id=None, parent_ids=[])
256
257 with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \
258 patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock,
259 return_value={tip: fake_proxy}), \
260 patch("muse.core.mpack.build_wire_mpack", return_value=b"MUSE\x00fake-mpack"), \
261 patch("musehub.services.musehub_wire_fetch.blob_id", return_value=built_mpack):
262 result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[])
263 await db_session.commit()
264
265 assert result["mpack_url"] == built_url
266 assert result["mpack_id"] == built_mpack
267
268 # The cache row must have been written.
269 row = (await db_session.execute(
270 select(MusehubFetchMPackCache)
271 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
272 .where(MusehubFetchMPackCache.tip_commit_id == tip)
273 )).scalar_one()
274 assert row.mpack_id == built_mpack
275 assert row.expires_at > _now()
File History 4 commits
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 7 days ago
sha256:d50f9cf9829dfbe35721a23b81ad256c729ddf9dd565a0a9e56d27847e255632 feat(#92): phase 4 — enqueue fetch.mpack.prebuild on push (… Sonnet 4.6 patch 7 days ago
sha256:1c5b7a0aba79472f4b10e52326dc010bdab1a498c9e195593d0707860478a034 feat(#92): phase 3 — cache lookup in wire_fetch_mpack (FMC_… Sonnet 4.6 patch 7 days ago
sha256:0e447fc3f6b7887d5d9e86b557c659ef7d0b05e2e09ddb0cb551ada240e48a51 feat(phase2): fetch.mpack.prebuild job handler + worker dis… Sonnet 4.6 patch 7 days ago