gabriel / musehub public
test_mpack_fetch_phase3.py python
357 lines 12.7 KB
Raw
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 7 days ago
1 """TDD — Phase 3: mpack-based fetch returns presigned mpack URLs (issue #63).
2
3 PF-1 wire_fetch_mpack for a repo with a populated mpack index returns
4 mpack_urls (presigned GET URLs for the push mpacks) instead of fetching
5 each object individually from MinIO.
6
7 PF-2 The number of mpack_urls equals the number of distinct mpacks that cover
8 the needed objects — not the number of objects.
9
10 PF-3 wire_fetch_mpack still returns correct commit and snapshot metadata
11 alongside the mpack URLs so the client can reconstruct the repo.
12
13 PF-4 wire_fetch_mpack falls back to the legacy inline-bytes path when the
14 mpack index has no rows for the requested objects (repos pushed before
15 Phase 1 migration, or empty delta).
16 """
17 from __future__ import annotations
18
19 import datetime
20 import hashlib
21
22 import msgpack
23 import pytest
24
25 pytestmark = pytest.mark.skip(reason="muse wire protocol in flux")
26 from sqlalchemy import select
27 from sqlalchemy.ext.asyncio import AsyncSession
28
29 from muse.core.types import blob_id
30 from musehub.db import musehub_repo_models as db
31 from musehub.db.database import Base as _Base
32 from musehub.core.genesis import compute_identity_id
33 from musehub.services.musehub_repository import create_repo
34
35
36 # ---------------------------------------------------------------------------
37 # Helpers — reuse push helpers from Phase 1/2
38 # ---------------------------------------------------------------------------
39
40 def _make_mpack(
41 objects: dict[str, bytes],
42 commits: list[dict] | None = None,
43 snapshots: list[dict] | None = None,
44 branch_heads: dict[str, str] | None = None,
45 ) -> tuple[bytes, str]:
46 mpack = {
47 "commits": commits or [],
48 "snapshots": snapshots or [],
49 "objects": [
50 {"object_id": oid, "content": data}
51 for oid, data in objects.items()
52 ],
53 "branch_heads": branch_heads or {},
54 }
55 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
56 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
57 return wire_bytes, mpack_key
58
59
60 async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None:
61 import musehub.storage.backends as _backends_mod
62 backend = _backends_mod.get_backend()
63 await backend.put_mpack(mpack_key, wire_bytes)
64
65
66 async def _push_and_index(
67 session: AsyncSession,
68 repo_id: str,
69 objects: dict[str, bytes],
70 commits: list[dict] | None = None,
71 snapshots: list[dict] | None = None,
72 branch_heads: dict[str, str] | None = None,
73 ) -> str:
74 """Push an mpack and run process_mpack_index_job. Returns mpack_key."""
75 from musehub.core.genesis import compute_job_id
76 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
77 from musehub.services.musehub_wire import process_mpack_index_job
78
79 wire_bytes, mpack_key = _make_mpack(objects, commits, snapshots, branch_heads)
80 await _store_mpack(wire_bytes, mpack_key)
81
82 now = datetime.datetime.now(datetime.timezone.utc)
83 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
84 session.add(MusehubBackgroundJob(
85 job_id=job_id,
86 repo_id=repo_id,
87 job_type="mpack.index",
88 payload={
89 "mpack_key": mpack_key,
90 "branch": "main",
91 "head": (commits or [{}])[-1].get("commit_id", ""),
92 "pusher_id": "",
93 "declared_objects_count": len(objects),
94 "declared_commits_count": len(commits or []),
95 },
96 status="pending",
97 created_at=now,
98 attempt=0,
99 ))
100 await session.commit()
101 await process_mpack_index_job(session, job_id)
102 await session.commit()
103 return mpack_key
104
105
106 def _make_commit_chain(n: int, seed: str) -> tuple[list[dict], list[dict], str, dict[str, str]]:
107 """Return (commits, snapshots, tip_commit_id, objects_dict)."""
108 objects: dict[str, bytes] = {}
109 commits = []
110 snapshots = []
111 parent_id = None
112
113 for i in range(n):
114 oid = blob_id(f"{seed}-obj-{i}".encode())
115 objects[oid] = f"{seed}-obj-{i}".encode()
116 snap_id = blob_id(f"{seed}-snap-{i}".encode())
117 snapshots.append({
118 "snapshot_id": snap_id,
119 "parent_snapshot_id": None,
120 "delta_upsert": {f"file_{i}.txt": oid},
121 "delta_remove": [],
122 })
123 cid = blob_id(f"{seed}-commit-{i}-p={parent_id}".encode())
124 commits.append({
125 "commit_id": cid,
126 "branch": "main",
127 "message": f"commit {i}",
128 "author": "gabriel",
129 "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(),
130 "parent_commit_id": parent_id,
131 "parent2_commit_id": None,
132 "snapshot_id": snap_id,
133 "agent_id": "",
134 "model_id": "",
135 "toolchain_id": "",
136 "sem_ver_bump": "none",
137 "breaking_changes": [],
138 "signature": "",
139 "signer_key_id": "",
140 "signer_public_key": "",
141 "prompt_hash": "",
142 })
143 parent_id = cid
144
145 return commits, snapshots, parent_id, objects # type: ignore[return-value]
146
147
148 # ---------------------------------------------------------------------------
149 # PF-1
150 # ---------------------------------------------------------------------------
151
152 @pytest.mark.asyncio
153 async def test_pf1_fetch_returns_mpack_urls(db_session: AsyncSession) -> None:
154 """wire_fetch_mpack returns mpack_urls when mpack index is populated."""
155 repo = await create_repo(
156 db_session,
157 name="pf-test-1",
158 owner="gabriel",
159 owner_user_id=compute_identity_id(b"gabriel"),
160 visibility="public",
161 initialize=False,
162 )
163
164 commits, snapshots, tip, objects = _make_commit_chain(5, seed="pf1")
165 mpack_key = await _push_and_index(
166 db_session, repo.repo_id, objects, commits, snapshots,
167 branch_heads={"main": tip},
168 )
169
170 from musehub.services.musehub_wire import wire_fetch_mpack
171
172 result = await wire_fetch_mpack(
173 db_session, repo.repo_id,
174 want=[tip], have=[],
175 ttl_seconds=3600,
176 )
177
178 assert "mpack_urls" in result, "result missing mpack_urls key"
179 assert len(result["mpack_urls"]) > 0, "mpack_urls is empty — expected at least one presigned URL"
180 assert result.get("mpack_fetch"), "mpack_fetch flag not set"
181
182
183 # ---------------------------------------------------------------------------
184 # PF-2
185 # ---------------------------------------------------------------------------
186
187 @pytest.mark.asyncio
188 async def test_pf2_mpack_urls_count_equals_distinct_mpacks(db_session: AsyncSession) -> None:
189 """mpack_urls count == number of distinct mpacks, not number of objects."""
190 repo = await create_repo(
191 db_session,
192 name="pf-test-2",
193 owner="gabriel",
194 owner_user_id=compute_identity_id(b"gabriel"),
195 visibility="public",
196 initialize=False,
197 )
198
199 # Two pushes → two distinct mpacks
200 commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, seed="pf2a")
201 key_a = await _push_and_index(
202 db_session, repo.repo_id, objs_a, commits_a, snaps_a,
203 branch_heads={"main": tip_a},
204 )
205
206 commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, seed="pf2b")
207 # b's commits build on a's tip
208 commits_b[0]["parent_commit_id"] = tip_a
209 key_b = await _push_and_index(
210 db_session, repo.repo_id, objs_b, commits_b, snaps_b,
211 branch_heads={"main": tip_b},
212 )
213
214 from musehub.services.musehub_wire import wire_fetch_mpack
215
216 result = await wire_fetch_mpack(
217 db_session, repo.repo_id,
218 want=[tip_b], have=[],
219 ttl_seconds=3600,
220 )
221
222 assert result.get("mpack_fetch"), "mpack_fetch flag not set"
223 # 2 distinct mpacks cover all objects — not 6 individual object GETs
224 assert len(result["mpack_urls"]) == 2, (
225 f"expected 2 mpack URLs (one per push), got {len(result['mpack_urls'])}"
226 )
227
228
229 # ---------------------------------------------------------------------------
230 # PF-3
231 # ---------------------------------------------------------------------------
232
233 @pytest.mark.asyncio
234 async def test_pf3_mpack_fetch_includes_commit_and_snapshot_metadata(db_session: AsyncSession) -> None:
235 """mpack-based response includes commits and snapshots for client reconstruction."""
236 repo = await create_repo(
237 db_session,
238 name="pf-test-3",
239 owner="gabriel",
240 owner_user_id=compute_identity_id(b"gabriel"),
241 visibility="public",
242 initialize=False,
243 )
244
245 commits, snapshots, tip, objects = _make_commit_chain(3, seed="pf3")
246 await _push_and_index(
247 db_session, repo.repo_id, objects, commits, snapshots,
248 branch_heads={"main": tip},
249 )
250
251 from musehub.services.musehub_wire import wire_fetch_mpack
252
253 result = await wire_fetch_mpack(
254 db_session, repo.repo_id,
255 want=[tip], have=[],
256 ttl_seconds=3600,
257 )
258
259 assert result.get("mpack_fetch"), "mpack_fetch flag not set"
260 assert result["commit_count"] == len(commits), (
261 f"expected {len(commits)} commits in result, got {result['commit_count']}"
262 )
263 assert len(result.get("commits", [])) == len(commits), "commits list missing or wrong length"
264 assert len(result.get("snapshots", [])) > 0, "snapshots missing from mpack-fetch result"
265
266
267 # ---------------------------------------------------------------------------
268 # PF-4
269 # ---------------------------------------------------------------------------
270
271 @pytest.mark.asyncio
272 async def test_pf4_fallback_to_inline_when_no_mpack_index(db_session: AsyncSession) -> None:
273 """Falls back to inline bytes when mpack index has no rows for this repo."""
274 repo = await create_repo(
275 db_session,
276 name="pf-test-4",
277 owner="gabriel",
278 owner_user_id=compute_identity_id(b"gabriel"),
279 visibility="public",
280 initialize=False,
281 )
282
283 # Manually insert commits + objects into DB WITHOUT going through
284 # process_mpack_index_job, so the mpack index is empty.
285 from musehub.db.musehub_repo_models import MusehubCommit, MusehubObject, MusehubObjectRef, MusehubBranch
286 from sqlalchemy.dialects.postgresql import insert as pg_insert
287 import musehub.storage.backends as _bm
288
289 now = datetime.datetime.now(datetime.timezone.utc)
290 oid = blob_id(b"pf4-obj")
291 snap_id = blob_id(b"pf4-snap")
292 cid = blob_id(b"pf4-commit")
293 obj_data = b"pf4-obj"
294
295 backend = _bm.get_backend()
296 await backend.put(oid, obj_data)
297
298 await session_add_all(db_session, [
299 MusehubObject(
300 object_id=oid, path="", size_bytes=len(obj_data),
301 storage_uri=backend.uri_for(oid),
302 ),
303 ])
304 await db_session.execute(
305 pg_insert(MusehubObjectRef).values([
306 {"repo_id": repo.repo_id, "object_id": oid}
307 ]).on_conflict_do_nothing(index_elements=["repo_id", "object_id"])
308 )
309 from musehub.db.musehub_repo_models import MusehubSnapshot
310 import msgpack as _mp
311 manifest_blob = _mp.packb({f"file.txt": oid}, use_bin_type=True)
312 await db_session.execute(
313 pg_insert(MusehubSnapshot).values([
314 {"snapshot_id": snap_id, "directories": [], "manifest_blob": manifest_blob, "entry_count": 1, "created_at": now}
315 ]).on_conflict_do_nothing(index_elements=["snapshot_id"])
316 )
317 await db_session.execute(
318 pg_insert(MusehubCommit).values([{
319 "commit_id": cid,
320 "branch": "main",
321 "message": "manual",
322 "author": "gabriel",
323 "timestamp": now,
324 "parent_ids": [],
325 "snapshot_id": snap_id,
326 "agent_id": "", "model_id": "", "toolchain_id": "",
327 "commit_branch": "main",
328 "signature": "", "signer_public_key": "", "signer_key_id": "",
329 "sem_ver_bump": "none", "breaking_changes": [], "reviewed_by": [],
330 "test_runs": 0, "prompt_hash": "", "structured_delta": None,
331 }]).on_conflict_do_nothing(index_elements=["commit_id"])
332 )
333 from musehub.core.genesis import compute_branch_id
334 db_session.add(MusehubBranch(
335 branch_id=compute_branch_id(repo.repo_id, "main"),
336 repo_id=repo.repo_id, name="main", head_commit_id=cid,
337 ))
338 await db_session.commit()
339
340 from musehub.services.musehub_wire import wire_fetch_mpack
341
342 result = await wire_fetch_mpack(
343 db_session, repo.repo_id,
344 want=[cid], have=[],
345 ttl_seconds=3600,
346 )
347
348 # No mpack index → should fall back to inline bytes (not mpack_fetch)
349 assert not result.get("mpack_fetch"), (
350 "expected fallback to inline bytes when mpack index is empty, got mpack_fetch=True"
351 )
352 assert result["mpack_bytes"] or result.get("presigned_url"), "no mpack bytes in fallback"
353
354
355 async def session_add_all(session: AsyncSession, items: list[_Base]) -> None:
356 for item in items:
357 session.add(item)
File History 1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 7 days ago