gabriel / musehub public
test_fetch_mpack_prebuild.py python
193 lines 7.3 KB
Raw
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 6 days ago
1 """TDD — fetch.mpack.prebuild job handler (issue #92 Phase 2).
2
3 Test IDs:
4 FMC_07 Unit: mock wire_fetch_mpack, confirm cache rows written for each tip,
5 confirm existing fresh entries are skipped
6 FMC_08 Integration: insert a job row, run the handler, verify
7 MusehubFetchMPackCache row exists with correct repo_id/tip/mpack_id
8 """
9 from __future__ import annotations
10
11 import hashlib
12 from datetime import datetime, timedelta, timezone
13 from unittest.mock import AsyncMock, patch
14
15 import pytest
16 from sqlalchemy import select
17 from sqlalchemy.ext.asyncio import AsyncSession
18
19 from musehub.core.genesis import compute_job_id
20 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
21 from musehub.db.musehub_repo_models import MusehubFetchMPackCache
22 from musehub.services.musehub_wire_fetch import process_fetch_mpack_prebuild_job
23 from tests.factories import create_repo
24
25
26 def _now() -> datetime:
27 return datetime.now(tz=timezone.utc)
28
29
30 def _fake_commit_id(seed: str) -> str:
31 return "sha256:" + hashlib.sha256(seed.encode()).hexdigest()
32
33
34 def _fake_mpack_id(seed: str) -> str:
35 return "sha256:" + hashlib.sha256(f"mpack-{seed}".encode()).hexdigest()
36
37
38 async def _insert_job(
39 session: AsyncSession,
40 repo_id: str,
41 tip_commit_ids: list[str],
42 ) -> str:
43 now = _now()
44 job_id = compute_job_id(repo_id, "fetch.mpack.prebuild", now.isoformat())
45 session.add(MusehubBackgroundJob(
46 job_id=job_id,
47 repo_id=repo_id,
48 job_type="fetch.mpack.prebuild",
49 payload={"tip_commit_ids": tip_commit_ids},
50 status="pending",
51 created_at=now,
52 attempt=0,
53 ))
54 await session.flush()
55 return job_id
56
57
58 # ── FMC_07 ────────────────────────────────────────────────────────────────────
59
60 @pytest.mark.tier2
61 async def test_fmc_07_builds_each_tip_and_writes_cache(db_session: AsyncSession) -> None:
62 """FMC_07a: handler calls wire_fetch_mpack once per tip and writes cache rows."""
63 repo = await create_repo(db_session, owner="gabriel", visibility="public")
64 tip_a = _fake_commit_id("tip-a")
65 tip_b = _fake_commit_id("tip-b")
66 mpack_a = _fake_mpack_id("tip-a")
67 mpack_b = _fake_mpack_id("tip-b")
68
69 job_id = await _insert_job(db_session, repo.repo_id, [tip_a, tip_b])
70 await db_session.commit()
71
72 side_effects = [
73 {"mpack_id": mpack_a, "mpack_url": "https://r2.example/a", "commit_count": 1, "blob_count": 2},
74 {"mpack_id": mpack_b, "mpack_url": "https://r2.example/b", "commit_count": 1, "blob_count": 3},
75 ]
76
77 with patch(
78 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
79 new_callable=AsyncMock,
80 side_effect=side_effects,
81 ) as mock_build:
82 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
83 await db_session.commit()
84
85 assert mock_build.call_count == 2
86 assert result["tips_requested"] == 2
87 assert result["tips_built"] == 2
88 assert result["tips_skipped"] == 0
89
90 rows = (await db_session.execute(
91 select(MusehubFetchMPackCache)
92 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
93 .order_by(MusehubFetchMPackCache.tip_commit_id)
94 )).scalars().all()
95 assert len(rows) == 2
96 mpack_ids = {r.tip_commit_id: r.mpack_id for r in rows}
97 assert mpack_ids[tip_a] == mpack_a
98 assert mpack_ids[tip_b] == mpack_b
99
100
101 @pytest.mark.tier2
102 async def test_fmc_07b_skips_tips_with_fresh_cache(db_session: AsyncSession) -> None:
103 """FMC_07b: tips with a non-expired cache entry are skipped without calling wire_fetch_mpack."""
104 repo = await create_repo(db_session, owner="gabriel", visibility="public")
105 tip_cached = _fake_commit_id("tip-cached")
106 tip_new = _fake_commit_id("tip-new")
107 existing_mpack = _fake_mpack_id("existing")
108 new_mpack = _fake_mpack_id("new")
109
110 # Pre-populate a fresh cache entry for tip_cached.
111 cache_id = hashlib.sha256((repo.repo_id + tip_cached).encode()).hexdigest()
112 db_session.add(MusehubFetchMPackCache(
113 cache_id=cache_id,
114 repo_id=repo.repo_id,
115 tip_commit_id=tip_cached,
116 mpack_id=existing_mpack,
117 created_at=_now(),
118 expires_at=_now() + timedelta(days=7),
119 ))
120 job_id = await _insert_job(db_session, repo.repo_id, [tip_cached, tip_new])
121 await db_session.commit()
122
123 with patch(
124 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
125 new_callable=AsyncMock,
126 return_value={"mpack_id": new_mpack, "mpack_url": "https://r2.example/new", "commit_count": 1, "blob_count": 1},
127 ) as mock_build:
128 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
129 await db_session.commit()
130
131 # Only the new tip should have triggered a build.
132 assert mock_build.call_count == 1
133 assert mock_build.call_args[1]["want"] == [tip_new] or mock_build.call_args[0][2] == [tip_new]
134 assert result["tips_built"] == 1
135 assert result["tips_skipped"] == 1
136
137 # The pre-existing cache entry must be unchanged.
138 cached_row = (await db_session.execute(
139 select(MusehubFetchMPackCache)
140 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
141 .where(MusehubFetchMPackCache.tip_commit_id == tip_cached)
142 )).scalar_one()
143 assert cached_row.mpack_id == existing_mpack
144
145
146 @pytest.mark.tier2
147 async def test_fmc_07c_empty_payload_is_a_noop(db_session: AsyncSession) -> None:
148 """FMC_07c: job with no tip_commit_ids returns zeros without calling wire_fetch_mpack."""
149 repo = await create_repo(db_session, owner="gabriel", visibility="public")
150 job_id = await _insert_job(db_session, repo.repo_id, [])
151 await db_session.commit()
152
153 with patch(
154 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
155 new_callable=AsyncMock,
156 ) as mock_build:
157 result = await process_fetch_mpack_prebuild_job(db_session, job_id)
158
159 assert mock_build.call_count == 0
160 assert result["tips_requested"] == 0
161 assert result["tips_built"] == 0
162
163
164 # ── FMC_08 ────────────────────────────────────────────────────────────────────
165
166 @pytest.mark.tier2
167 async def test_fmc_08_cache_row_has_correct_fields(db_session: AsyncSession) -> None:
168 """FMC_08: after the handler runs, cache row has matching repo_id, tip, and mpack_id."""
169 repo = await create_repo(db_session, owner="gabriel", visibility="public")
170 tip = _fake_commit_id("integration-tip")
171 mpack = _fake_mpack_id("integration")
172
173 job_id = await _insert_job(db_session, repo.repo_id, [tip])
174 await db_session.commit()
175
176 with patch(
177 "musehub.services.musehub_wire_fetch.wire_fetch_mpack",
178 new_callable=AsyncMock,
179 return_value={"mpack_id": mpack, "mpack_url": "https://r2.example/int", "commit_count": 5, "blob_count": 10},
180 ):
181 await process_fetch_mpack_prebuild_job(db_session, job_id)
182 await db_session.commit()
183
184 row = (await db_session.execute(
185 select(MusehubFetchMPackCache)
186 .where(MusehubFetchMPackCache.repo_id == repo.repo_id)
187 .where(MusehubFetchMPackCache.tip_commit_id == tip)
188 )).scalar_one()
189
190 assert row.repo_id == repo.repo_id
191 assert row.tip_commit_id == tip
192 assert row.mpack_id == mpack
193 assert row.expires_at > _now()
File History 4 commits
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 6 days ago
sha256:d50f9cf9829dfbe35721a23b81ad256c729ddf9dd565a0a9e56d27847e255632 feat(#92): phase 4 — enqueue fetch.mpack.prebuild on push (… Sonnet 4.6 patch 6 days ago
sha256:1c5b7a0aba79472f4b10e52326dc010bdab1a498c9e195593d0707860478a034 feat(#92): phase 3 — cache lookup in wire_fetch_mpack (FMC_… Sonnet 4.6 patch 6 days ago
sha256:0e447fc3f6b7887d5d9e86b557c659ef7d0b05e2e09ddb0cb551ada240e48a51 feat(phase2): fetch.mpack.prebuild job handler + worker dis… Sonnet 4.6 patch 6 days ago