test_mpack_fetch_phase3.py
python
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
7 days ago
| 1 | """TDD — Phase 3: mpack-based fetch returns presigned mpack URLs (issue #63). |
| 2 | |
| 3 | PF-1 wire_fetch_mpack for a repo with a populated mpack index returns |
| 4 | mpack_urls (presigned GET URLs for the push mpacks) instead of fetching |
| 5 | each object individually from MinIO. |
| 6 | |
| 7 | PF-2 The number of mpack_urls equals the number of distinct mpacks that cover |
| 8 | the needed objects — not the number of objects. |
| 9 | |
| 10 | PF-3 wire_fetch_mpack still returns correct commit and snapshot metadata |
| 11 | alongside the mpack URLs so the client can reconstruct the repo. |
| 12 | |
| 13 | PF-4 wire_fetch_mpack falls back to the legacy inline-bytes path when the |
| 14 | mpack index has no rows for the requested objects (repos pushed before |
| 15 | Phase 1 migration, or empty delta). |
| 16 | """ |
| 17 | from __future__ import annotations |
| 18 | |
| 19 | import datetime |
| 20 | import hashlib |
| 21 | |
| 22 | import msgpack |
| 23 | import pytest |
| 24 | |
| 25 | pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") |
| 26 | from sqlalchemy import select |
| 27 | from sqlalchemy.ext.asyncio import AsyncSession |
| 28 | |
| 29 | from muse.core.types import blob_id |
| 30 | from musehub.db import musehub_repo_models as db |
| 31 | from musehub.db.database import Base as _Base |
| 32 | from musehub.core.genesis import compute_identity_id |
| 33 | from musehub.services.musehub_repository import create_repo |
| 34 | |
| 35 | |
| 36 | # --------------------------------------------------------------------------- |
| 37 | # Helpers — reuse push helpers from Phase 1/2 |
| 38 | # --------------------------------------------------------------------------- |
| 39 | |
| 40 | def _make_mpack( |
| 41 | objects: dict[str, bytes], |
| 42 | commits: list[dict] | None = None, |
| 43 | snapshots: list[dict] | None = None, |
| 44 | branch_heads: dict[str, str] | None = None, |
| 45 | ) -> tuple[bytes, str]: |
| 46 | mpack = { |
| 47 | "commits": commits or [], |
| 48 | "snapshots": snapshots or [], |
| 49 | "objects": [ |
| 50 | {"object_id": oid, "content": data} |
| 51 | for oid, data in objects.items() |
| 52 | ], |
| 53 | "branch_heads": branch_heads or {}, |
| 54 | } |
| 55 | wire_bytes = msgpack.packb(mpack, use_bin_type=True) |
| 56 | mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() |
| 57 | return wire_bytes, mpack_key |
| 58 | |
| 59 | |
| 60 | async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: |
| 61 | import musehub.storage.backends as _backends_mod |
| 62 | backend = _backends_mod.get_backend() |
| 63 | await backend.put_mpack(mpack_key, wire_bytes) |
| 64 | |
| 65 | |
| 66 | async def _push_and_index( |
| 67 | session: AsyncSession, |
| 68 | repo_id: str, |
| 69 | objects: dict[str, bytes], |
| 70 | commits: list[dict] | None = None, |
| 71 | snapshots: list[dict] | None = None, |
| 72 | branch_heads: dict[str, str] | None = None, |
| 73 | ) -> str: |
| 74 | """Push an mpack and run process_mpack_index_job. Returns mpack_key.""" |
| 75 | from musehub.core.genesis import compute_job_id |
| 76 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 77 | from musehub.services.musehub_wire import process_mpack_index_job |
| 78 | |
| 79 | wire_bytes, mpack_key = _make_mpack(objects, commits, snapshots, branch_heads) |
| 80 | await _store_mpack(wire_bytes, mpack_key) |
| 81 | |
| 82 | now = datetime.datetime.now(datetime.timezone.utc) |
| 83 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 84 | session.add(MusehubBackgroundJob( |
| 85 | job_id=job_id, |
| 86 | repo_id=repo_id, |
| 87 | job_type="mpack.index", |
| 88 | payload={ |
| 89 | "mpack_key": mpack_key, |
| 90 | "branch": "main", |
| 91 | "head": (commits or [{}])[-1].get("commit_id", ""), |
| 92 | "pusher_id": "", |
| 93 | "declared_objects_count": len(objects), |
| 94 | "declared_commits_count": len(commits or []), |
| 95 | }, |
| 96 | status="pending", |
| 97 | created_at=now, |
| 98 | attempt=0, |
| 99 | )) |
| 100 | await session.commit() |
| 101 | await process_mpack_index_job(session, job_id) |
| 102 | await session.commit() |
| 103 | return mpack_key |
| 104 | |
| 105 | |
| 106 | def _make_commit_chain(n: int, seed: str) -> tuple[list[dict], list[dict], str, dict[str, str]]: |
| 107 | """Return (commits, snapshots, tip_commit_id, objects_dict).""" |
| 108 | objects: dict[str, bytes] = {} |
| 109 | commits = [] |
| 110 | snapshots = [] |
| 111 | parent_id = None |
| 112 | |
| 113 | for i in range(n): |
| 114 | oid = blob_id(f"{seed}-obj-{i}".encode()) |
| 115 | objects[oid] = f"{seed}-obj-{i}".encode() |
| 116 | snap_id = blob_id(f"{seed}-snap-{i}".encode()) |
| 117 | snapshots.append({ |
| 118 | "snapshot_id": snap_id, |
| 119 | "parent_snapshot_id": None, |
| 120 | "delta_upsert": {f"file_{i}.txt": oid}, |
| 121 | "delta_remove": [], |
| 122 | }) |
| 123 | cid = blob_id(f"{seed}-commit-{i}-p={parent_id}".encode()) |
| 124 | commits.append({ |
| 125 | "commit_id": cid, |
| 126 | "branch": "main", |
| 127 | "message": f"commit {i}", |
| 128 | "author": "gabriel", |
| 129 | "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(), |
| 130 | "parent_commit_id": parent_id, |
| 131 | "parent2_commit_id": None, |
| 132 | "snapshot_id": snap_id, |
| 133 | "agent_id": "", |
| 134 | "model_id": "", |
| 135 | "toolchain_id": "", |
| 136 | "sem_ver_bump": "none", |
| 137 | "breaking_changes": [], |
| 138 | "signature": "", |
| 139 | "signer_key_id": "", |
| 140 | "signer_public_key": "", |
| 141 | "prompt_hash": "", |
| 142 | }) |
| 143 | parent_id = cid |
| 144 | |
| 145 | return commits, snapshots, parent_id, objects # type: ignore[return-value] |
| 146 | |
| 147 | |
| 148 | # --------------------------------------------------------------------------- |
| 149 | # PF-1 |
| 150 | # --------------------------------------------------------------------------- |
| 151 | |
| 152 | @pytest.mark.asyncio |
| 153 | async def test_pf1_fetch_returns_mpack_urls(db_session: AsyncSession) -> None: |
| 154 | """wire_fetch_mpack returns mpack_urls when mpack index is populated.""" |
| 155 | repo = await create_repo( |
| 156 | db_session, |
| 157 | name="pf-test-1", |
| 158 | owner="gabriel", |
| 159 | owner_user_id=compute_identity_id(b"gabriel"), |
| 160 | visibility="public", |
| 161 | initialize=False, |
| 162 | ) |
| 163 | |
| 164 | commits, snapshots, tip, objects = _make_commit_chain(5, seed="pf1") |
| 165 | mpack_key = await _push_and_index( |
| 166 | db_session, repo.repo_id, objects, commits, snapshots, |
| 167 | branch_heads={"main": tip}, |
| 168 | ) |
| 169 | |
| 170 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 171 | |
| 172 | result = await wire_fetch_mpack( |
| 173 | db_session, repo.repo_id, |
| 174 | want=[tip], have=[], |
| 175 | ttl_seconds=3600, |
| 176 | ) |
| 177 | |
| 178 | assert "mpack_urls" in result, "result missing mpack_urls key" |
| 179 | assert len(result["mpack_urls"]) > 0, "mpack_urls is empty — expected at least one presigned URL" |
| 180 | assert result.get("mpack_fetch"), "mpack_fetch flag not set" |
| 181 | |
| 182 | |
| 183 | # --------------------------------------------------------------------------- |
| 184 | # PF-2 |
| 185 | # --------------------------------------------------------------------------- |
| 186 | |
| 187 | @pytest.mark.asyncio |
| 188 | async def test_pf2_mpack_urls_count_equals_distinct_mpacks(db_session: AsyncSession) -> None: |
| 189 | """mpack_urls count == number of distinct mpacks, not number of objects.""" |
| 190 | repo = await create_repo( |
| 191 | db_session, |
| 192 | name="pf-test-2", |
| 193 | owner="gabriel", |
| 194 | owner_user_id=compute_identity_id(b"gabriel"), |
| 195 | visibility="public", |
| 196 | initialize=False, |
| 197 | ) |
| 198 | |
| 199 | # Two pushes → two distinct mpacks |
| 200 | commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, seed="pf2a") |
| 201 | key_a = await _push_and_index( |
| 202 | db_session, repo.repo_id, objs_a, commits_a, snaps_a, |
| 203 | branch_heads={"main": tip_a}, |
| 204 | ) |
| 205 | |
| 206 | commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, seed="pf2b") |
| 207 | # b's commits build on a's tip |
| 208 | commits_b[0]["parent_commit_id"] = tip_a |
| 209 | key_b = await _push_and_index( |
| 210 | db_session, repo.repo_id, objs_b, commits_b, snaps_b, |
| 211 | branch_heads={"main": tip_b}, |
| 212 | ) |
| 213 | |
| 214 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 215 | |
| 216 | result = await wire_fetch_mpack( |
| 217 | db_session, repo.repo_id, |
| 218 | want=[tip_b], have=[], |
| 219 | ttl_seconds=3600, |
| 220 | ) |
| 221 | |
| 222 | assert result.get("mpack_fetch"), "mpack_fetch flag not set" |
| 223 | # 2 distinct mpacks cover all objects — not 6 individual object GETs |
| 224 | assert len(result["mpack_urls"]) == 2, ( |
| 225 | f"expected 2 mpack URLs (one per push), got {len(result['mpack_urls'])}" |
| 226 | ) |
| 227 | |
| 228 | |
| 229 | # --------------------------------------------------------------------------- |
| 230 | # PF-3 |
| 231 | # --------------------------------------------------------------------------- |
| 232 | |
| 233 | @pytest.mark.asyncio |
| 234 | async def test_pf3_mpack_fetch_includes_commit_and_snapshot_metadata(db_session: AsyncSession) -> None: |
| 235 | """mpack-based response includes commits and snapshots for client reconstruction.""" |
| 236 | repo = await create_repo( |
| 237 | db_session, |
| 238 | name="pf-test-3", |
| 239 | owner="gabriel", |
| 240 | owner_user_id=compute_identity_id(b"gabriel"), |
| 241 | visibility="public", |
| 242 | initialize=False, |
| 243 | ) |
| 244 | |
| 245 | commits, snapshots, tip, objects = _make_commit_chain(3, seed="pf3") |
| 246 | await _push_and_index( |
| 247 | db_session, repo.repo_id, objects, commits, snapshots, |
| 248 | branch_heads={"main": tip}, |
| 249 | ) |
| 250 | |
| 251 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 252 | |
| 253 | result = await wire_fetch_mpack( |
| 254 | db_session, repo.repo_id, |
| 255 | want=[tip], have=[], |
| 256 | ttl_seconds=3600, |
| 257 | ) |
| 258 | |
| 259 | assert result.get("mpack_fetch"), "mpack_fetch flag not set" |
| 260 | assert result["commit_count"] == len(commits), ( |
| 261 | f"expected {len(commits)} commits in result, got {result['commit_count']}" |
| 262 | ) |
| 263 | assert len(result.get("commits", [])) == len(commits), "commits list missing or wrong length" |
| 264 | assert len(result.get("snapshots", [])) > 0, "snapshots missing from mpack-fetch result" |
| 265 | |
| 266 | |
| 267 | # --------------------------------------------------------------------------- |
| 268 | # PF-4 |
| 269 | # --------------------------------------------------------------------------- |
| 270 | |
| 271 | @pytest.mark.asyncio |
| 272 | async def test_pf4_fallback_to_inline_when_no_mpack_index(db_session: AsyncSession) -> None: |
| 273 | """Falls back to inline bytes when mpack index has no rows for this repo.""" |
| 274 | repo = await create_repo( |
| 275 | db_session, |
| 276 | name="pf-test-4", |
| 277 | owner="gabriel", |
| 278 | owner_user_id=compute_identity_id(b"gabriel"), |
| 279 | visibility="public", |
| 280 | initialize=False, |
| 281 | ) |
| 282 | |
| 283 | # Manually insert commits + objects into DB WITHOUT going through |
| 284 | # process_mpack_index_job, so the mpack index is empty. |
| 285 | from musehub.db.musehub_repo_models import MusehubCommit, MusehubObject, MusehubObjectRef, MusehubBranch |
| 286 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 287 | import musehub.storage.backends as _bm |
| 288 | |
| 289 | now = datetime.datetime.now(datetime.timezone.utc) |
| 290 | oid = blob_id(b"pf4-obj") |
| 291 | snap_id = blob_id(b"pf4-snap") |
| 292 | cid = blob_id(b"pf4-commit") |
| 293 | obj_data = b"pf4-obj" |
| 294 | |
| 295 | backend = _bm.get_backend() |
| 296 | await backend.put(oid, obj_data) |
| 297 | |
| 298 | await session_add_all(db_session, [ |
| 299 | MusehubObject( |
| 300 | object_id=oid, path="", size_bytes=len(obj_data), |
| 301 | storage_uri=backend.uri_for(oid), |
| 302 | ), |
| 303 | ]) |
| 304 | await db_session.execute( |
| 305 | pg_insert(MusehubObjectRef).values([ |
| 306 | {"repo_id": repo.repo_id, "object_id": oid} |
| 307 | ]).on_conflict_do_nothing(index_elements=["repo_id", "object_id"]) |
| 308 | ) |
| 309 | from musehub.db.musehub_repo_models import MusehubSnapshot |
| 310 | import msgpack as _mp |
| 311 | manifest_blob = _mp.packb({f"file.txt": oid}, use_bin_type=True) |
| 312 | await db_session.execute( |
| 313 | pg_insert(MusehubSnapshot).values([ |
| 314 | {"snapshot_id": snap_id, "directories": [], "manifest_blob": manifest_blob, "entry_count": 1, "created_at": now} |
| 315 | ]).on_conflict_do_nothing(index_elements=["snapshot_id"]) |
| 316 | ) |
| 317 | await db_session.execute( |
| 318 | pg_insert(MusehubCommit).values([{ |
| 319 | "commit_id": cid, |
| 320 | "branch": "main", |
| 321 | "message": "manual", |
| 322 | "author": "gabriel", |
| 323 | "timestamp": now, |
| 324 | "parent_ids": [], |
| 325 | "snapshot_id": snap_id, |
| 326 | "agent_id": "", "model_id": "", "toolchain_id": "", |
| 327 | "commit_branch": "main", |
| 328 | "signature": "", "signer_public_key": "", "signer_key_id": "", |
| 329 | "sem_ver_bump": "none", "breaking_changes": [], "reviewed_by": [], |
| 330 | "test_runs": 0, "prompt_hash": "", "structured_delta": None, |
| 331 | }]).on_conflict_do_nothing(index_elements=["commit_id"]) |
| 332 | ) |
| 333 | from musehub.core.genesis import compute_branch_id |
| 334 | db_session.add(MusehubBranch( |
| 335 | branch_id=compute_branch_id(repo.repo_id, "main"), |
| 336 | repo_id=repo.repo_id, name="main", head_commit_id=cid, |
| 337 | )) |
| 338 | await db_session.commit() |
| 339 | |
| 340 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 341 | |
| 342 | result = await wire_fetch_mpack( |
| 343 | db_session, repo.repo_id, |
| 344 | want=[cid], have=[], |
| 345 | ttl_seconds=3600, |
| 346 | ) |
| 347 | |
| 348 | # No mpack index → should fall back to inline bytes (not mpack_fetch) |
| 349 | assert not result.get("mpack_fetch"), ( |
| 350 | "expected fallback to inline bytes when mpack index is empty, got mpack_fetch=True" |
| 351 | ) |
| 352 | assert result["mpack_bytes"] or result.get("presigned_url"), "no mpack bytes in fallback" |
| 353 | |
| 354 | |
| 355 | async def session_add_all(session: AsyncSession, items: list[_Base]) -> None: |
| 356 | for item in items: |
| 357 | session.add(item) |
File History
1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
7 days ago