"""TDD — Phase 3: mpack-based fetch returns presigned mpack URLs (issue #63). PF-1 wire_fetch_mpack for a repo with a populated mpack index returns mpack_urls (presigned GET URLs for the push mpacks) instead of fetching each object individually from MinIO. PF-2 The number of mpack_urls equals the number of distinct mpacks that cover the needed objects — not the number of objects. PF-3 wire_fetch_mpack still returns correct commit and snapshot metadata alongside the mpack URLs so the client can reconstruct the repo. PF-4 wire_fetch_mpack falls back to the legacy inline-bytes path when the mpack index has no rows for the requested objects (repos pushed before Phase 1 migration, or empty delta). """ from __future__ import annotations import datetime import hashlib import msgpack import pytest pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id from musehub.db import musehub_repo_models as db from musehub.db.database import Base as _Base from musehub.core.genesis import compute_identity_id from musehub.services.musehub_repository import create_repo # --------------------------------------------------------------------------- # Helpers — reuse push helpers from Phase 1/2 # --------------------------------------------------------------------------- def _make_mpack( objects: dict[str, bytes], commits: list[dict] | None = None, snapshots: list[dict] | None = None, branch_heads: dict[str, str] | None = None, ) -> tuple[bytes, str]: mpack = { "commits": commits or [], "snapshots": snapshots or [], "objects": [ {"object_id": oid, "content": data} for oid, data in objects.items() ], "branch_heads": branch_heads or {}, } wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() return wire_bytes, mpack_key async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: import musehub.storage.backends as _backends_mod backend = _backends_mod.get_backend() await backend.put_mpack(mpack_key, wire_bytes) async def _push_and_index( session: AsyncSession, repo_id: str, objects: dict[str, bytes], commits: list[dict] | None = None, snapshots: list[dict] | None = None, branch_heads: dict[str, str] | None = None, ) -> str: """Push an mpack and run process_mpack_index_job. Returns mpack_key.""" from musehub.core.genesis import compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_wire import process_mpack_index_job wire_bytes, mpack_key = _make_mpack(objects, commits, snapshots, branch_heads) await _store_mpack(wire_bytes, mpack_key) now = datetime.datetime.now(datetime.timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "branch": "main", "head": (commits or [{}])[-1].get("commit_id", ""), "pusher_id": "", "declared_objects_count": len(objects), "declared_commits_count": len(commits or []), }, status="pending", created_at=now, attempt=0, )) await session.commit() await process_mpack_index_job(session, job_id) await session.commit() return mpack_key def _make_commit_chain(n: int, seed: str) -> tuple[list[dict], list[dict], str, dict[str, str]]: """Return (commits, snapshots, tip_commit_id, objects_dict).""" objects: dict[str, bytes] = {} commits = [] snapshots = [] parent_id = None for i in range(n): oid = blob_id(f"{seed}-obj-{i}".encode()) objects[oid] = f"{seed}-obj-{i}".encode() snap_id = blob_id(f"{seed}-snap-{i}".encode()) snapshots.append({ "snapshot_id": snap_id, "parent_snapshot_id": None, "delta_upsert": {f"file_{i}.txt": oid}, "delta_remove": [], }) cid = blob_id(f"{seed}-commit-{i}-p={parent_id}".encode()) commits.append({ "commit_id": cid, "branch": "main", "message": f"commit {i}", "author": "gabriel", "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(), "parent_commit_id": parent_id, "parent2_commit_id": None, "snapshot_id": snap_id, "agent_id": "", "model_id": "", "toolchain_id": "", "sem_ver_bump": "none", "breaking_changes": [], "signature": "", "signer_key_id": "", "signer_public_key": "", "prompt_hash": "", }) parent_id = cid return commits, snapshots, parent_id, objects # type: ignore[return-value] # --------------------------------------------------------------------------- # PF-1 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pf1_fetch_returns_mpack_urls(db_session: AsyncSession) -> None: """wire_fetch_mpack returns mpack_urls when mpack index is populated.""" repo = await create_repo( db_session, name="pf-test-1", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) commits, snapshots, tip, objects = _make_commit_chain(5, seed="pf1") mpack_key = await _push_and_index( db_session, repo.repo_id, objects, commits, snapshots, branch_heads={"main": tip}, ) from musehub.services.musehub_wire import wire_fetch_mpack result = await wire_fetch_mpack( db_session, repo.repo_id, want=[tip], have=[], ttl_seconds=3600, ) assert "mpack_urls" in result, "result missing mpack_urls key" assert len(result["mpack_urls"]) > 0, "mpack_urls is empty — expected at least one presigned URL" assert result.get("mpack_fetch"), "mpack_fetch flag not set" # --------------------------------------------------------------------------- # PF-2 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pf2_mpack_urls_count_equals_distinct_mpacks(db_session: AsyncSession) -> None: """mpack_urls count == number of distinct mpacks, not number of objects.""" repo = await create_repo( db_session, name="pf-test-2", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) # Two pushes → two distinct mpacks commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, seed="pf2a") key_a = await _push_and_index( db_session, repo.repo_id, objs_a, commits_a, snaps_a, branch_heads={"main": tip_a}, ) commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, seed="pf2b") # b's commits build on a's tip commits_b[0]["parent_commit_id"] = tip_a key_b = await _push_and_index( db_session, repo.repo_id, objs_b, commits_b, snaps_b, branch_heads={"main": tip_b}, ) from musehub.services.musehub_wire import wire_fetch_mpack result = await wire_fetch_mpack( db_session, repo.repo_id, want=[tip_b], have=[], ttl_seconds=3600, ) assert result.get("mpack_fetch"), "mpack_fetch flag not set" # 2 distinct mpacks cover all objects — not 6 individual object GETs assert len(result["mpack_urls"]) == 2, ( f"expected 2 mpack URLs (one per push), got {len(result['mpack_urls'])}" ) # --------------------------------------------------------------------------- # PF-3 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pf3_mpack_fetch_includes_commit_and_snapshot_metadata(db_session: AsyncSession) -> None: """mpack-based response includes commits and snapshots for client reconstruction.""" repo = await create_repo( db_session, name="pf-test-3", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) commits, snapshots, tip, objects = _make_commit_chain(3, seed="pf3") await _push_and_index( db_session, repo.repo_id, objects, commits, snapshots, branch_heads={"main": tip}, ) from musehub.services.musehub_wire import wire_fetch_mpack result = await wire_fetch_mpack( db_session, repo.repo_id, want=[tip], have=[], ttl_seconds=3600, ) assert result.get("mpack_fetch"), "mpack_fetch flag not set" assert result["commit_count"] == len(commits), ( f"expected {len(commits)} commits in result, got {result['commit_count']}" ) assert len(result.get("commits", [])) == len(commits), "commits list missing or wrong length" assert len(result.get("snapshots", [])) > 0, "snapshots missing from mpack-fetch result" # --------------------------------------------------------------------------- # PF-4 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pf4_fallback_to_inline_when_no_mpack_index(db_session: AsyncSession) -> None: """Falls back to inline bytes when mpack index has no rows for this repo.""" repo = await create_repo( db_session, name="pf-test-4", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) # Manually insert commits + objects into DB WITHOUT going through # process_mpack_index_job, so the mpack index is empty. from musehub.db.musehub_repo_models import MusehubCommit, MusehubObject, MusehubObjectRef, MusehubBranch from sqlalchemy.dialects.postgresql import insert as pg_insert import musehub.storage.backends as _bm now = datetime.datetime.now(datetime.timezone.utc) oid = blob_id(b"pf4-obj") snap_id = blob_id(b"pf4-snap") cid = blob_id(b"pf4-commit") obj_data = b"pf4-obj" backend = _bm.get_backend() await backend.put(oid, obj_data) await session_add_all(db_session, [ MusehubObject( object_id=oid, path="", size_bytes=len(obj_data), storage_uri=backend.uri_for(oid), ), ]) await db_session.execute( pg_insert(MusehubObjectRef).values([ {"repo_id": repo.repo_id, "object_id": oid} ]).on_conflict_do_nothing(index_elements=["repo_id", "object_id"]) ) from musehub.db.musehub_repo_models import MusehubSnapshot import msgpack as _mp manifest_blob = _mp.packb({f"file.txt": oid}, use_bin_type=True) await db_session.execute( pg_insert(MusehubSnapshot).values([ {"snapshot_id": snap_id, "directories": [], "manifest_blob": manifest_blob, "entry_count": 1, "created_at": now} ]).on_conflict_do_nothing(index_elements=["snapshot_id"]) ) await db_session.execute( pg_insert(MusehubCommit).values([{ "commit_id": cid, "branch": "main", "message": "manual", "author": "gabriel", "timestamp": now, "parent_ids": [], "snapshot_id": snap_id, "agent_id": "", "model_id": "", "toolchain_id": "", "commit_branch": "main", "signature": "", "signer_public_key": "", "signer_key_id": "", "sem_ver_bump": "none", "breaking_changes": [], "reviewed_by": [], "test_runs": 0, "prompt_hash": "", "structured_delta": None, }]).on_conflict_do_nothing(index_elements=["commit_id"]) ) from musehub.core.genesis import compute_branch_id db_session.add(MusehubBranch( branch_id=compute_branch_id(repo.repo_id, "main"), repo_id=repo.repo_id, name="main", head_commit_id=cid, )) await db_session.commit() from musehub.services.musehub_wire import wire_fetch_mpack result = await wire_fetch_mpack( db_session, repo.repo_id, want=[cid], have=[], ttl_seconds=3600, ) # No mpack index → should fall back to inline bytes (not mpack_fetch) assert not result.get("mpack_fetch"), ( "expected fallback to inline bytes when mpack index is empty, got mpack_fetch=True" ) assert result["mpack_bytes"] or result.get("presigned_url"), "no mpack bytes in fallback" async def session_add_all(session: AsyncSession, items: list[_Base]) -> None: for item in items: session.add(item)