"""TDD — Phase 1: mpack-native object serving in wire_fetch_mpack (issue #69). Problem: the current fetch path calls backend.get(oid) once per object. For 25,000 objects that is 25,000 individual MinIO GETs. This only works because the push background job writes per-object MinIO keys — a 40s synchronous cost that creates an availability window after every large push. Goal: serve objects by downloading the covering mpack(s) and extracting in-memory. One mpack GET replaces N object GETs. Per-object keys are never written; the mpack is the source of truth. Test IDs: FN-1 content_cache hit → no mpack or per-object GET needed FN-2 object stored only in mpack (no per-object key) → extracted correctly FN-3 objects across two distinct mpacks → each mpack fetched exactly once FN-4 object in MPackIndex but mpack is missing from storage → falls back to per-object GET FN-5 object absent from MPackIndex → FetchNotIndexedError (unchanged gate) FN-6 mixed: content_cache + mpack + legacy per-object → all three sources merged """ from __future__ import annotations import hashlib from collections.abc import Mapping from datetime import datetime, timezone from unittest.mock import AsyncMock, call import msgpack import pytest from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id, fake_id from musehub.db import musehub_repo_models as db from musehub.services.musehub_wire import FetchNotIndexedError, wire_fetch_mpack from tests.factories import create_repo # ── fixtures & helpers ──────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _make_mpack_bytes(objects: Mapping[str, bytes]) -> bytes: """Encode objects into a wire mpack payload (uncompressed).""" return msgpack.packb( { "commits": [], "snapshots": [], "blobs": [{"object_id": oid, "content": data} for oid, data in objects.items()], "branch_heads": {}, }, use_bin_type=True, ) def _make_mpack_bytes_zstd(objects: Mapping[str, bytes]) -> bytes: """Encode objects into a wire mpack payload with zstd-compressed content. This mimics the real push mpack format: each object entry has encoding='zstd' and content=compressed_bytes, but object_id is the sha256 of the *uncompressed* bytes (content-addressed). """ import zstandard as _zstd cctx = _zstd.ZstdCompressor() entries = [] for oid, data in objects.items(): entries.append({ "object_id": oid, "encoding": "zstd", "content": cctx.compress(data), }) return msgpack.packb( { "commits": [], "snapshots": [], "blobs": entries, "branch_heads": {}, }, use_bin_type=True, ) def _mpack_key(payload: bytes) -> str: return "sha256:" + hashlib.sha256(payload).hexdigest() class _FakeBackend: """In-memory backend with separate per-object and mpack namespaces. Tracks how many times get() and get_mpack() are called so tests can assert that the mpack-native path is taken instead of per-object GETs. """ def __init__(self) -> None: self._objects: dict[str, bytes] = {} self._mpacks: dict[str, bytes] = {} self.get_calls: list[str] = [] self.get_mpack_calls: list[str] = [] async def put(self, oid: str, data: bytes) -> str: self._objects[oid] = data return f"mem://{oid}" async def get(self, oid: str) -> bytes | None: self.get_calls.append(oid) return self._objects.get(oid) async def get_mpack(self, mpack_id: str) -> bytes | None: self.get_mpack_calls.append(mpack_id) return self._mpacks.get(mpack_id) async def put_mpack(self, mpack_id: str, data: bytes) -> None: self._mpacks[mpack_id] = data async def exists(self, oid: str) -> bool: return oid in self._objects async def delete(self, oid: str) -> None: self._objects.pop(oid, None) async def presign_get(self, oid: str, ttl: int) -> str: return f"https://minio.test/{oid}?ttl={ttl}" async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str: return f"https://minio.test/mpacks/{mpack_id}?ttl={ttl}" def uri_for(self, oid: str) -> str: return f"mem://{oid}" supports_presign: bool = True async def _make_commit( session: AsyncSession, repo_id: str, *, manifest: dict[str, str], seed: str, parent_ids: list[str] | None = None, generation: int = 0, ) -> db.MusehubCommit: snap_id = fake_id(f"snap-{seed}") snap = db.MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), created_at=_now(), ) session.add(snap) await session.execute( pg_insert(db.MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() ) commit_id = fake_id(f"commit-{seed}") commit = db.MusehubCommit( commit_id=commit_id, branch="main", parent_ids=parent_ids or [], message=f"commit {seed}", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) session.add(commit) await session.execute( pg_insert(db.MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.execute( pg_insert(db.MusehubCommitGraph) .values( commit_id=commit_id, parent_ids=parent_ids or [], generation=generation, snapshot_id=snap_id, ) .on_conflict_do_nothing() ) await session.commit() return commit async def _index_object_in_mpack( session: AsyncSession, oid: str, mpack_id: str, ) -> None: """Write an MPackIndex row mapping oid → mpack_id (no per-object MinIO key).""" await session.execute( pg_insert(db.MusehubObject) .values( object_id=oid, path="file.dat", size_bytes=8, storage_uri=f"mpack://{mpack_id}", # mpack URI, not per-object ) .on_conflict_do_nothing(index_elements=["object_id"]) ) await session.execute( pg_insert(db.MusehubMPackIndex) .values(entity_id=oid, mpack_id=mpack_id, entity_type="object", created_at=_now()) .on_conflict_do_nothing(index_elements=["entity_id", "mpack_id"]) ) await session.commit() # ── FN-1: content_cache hit ─────────────────────────────────────────────────── @pytest.mark.asyncio async def test_fn1_content_cache_served_without_mpack_get( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Objects in content_cache are served from DB; no mpack or per-object GET fired.""" backend = _FakeBackend() monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"cached content" oid = blob_id(raw) await db_session.execute( pg_insert(db.MusehubObject) .values( object_id=oid, path="cached.txt", size_bytes=len(raw), storage_uri=f"mem://{oid}", content_cache=raw, ) .on_conflict_do_nothing(index_elements=["object_id"]) ) await db_session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo.repo_id, object_id=oid) .on_conflict_do_nothing() ) # Also add MPackIndex so the coverage check passes mpack_bytes = _make_mpack_bytes({oid: raw}) mpack_id = _mpack_key(mpack_bytes) await backend.put_mpack(mpack_id, mpack_bytes) await _index_object_in_mpack(db_session, oid, mpack_id) commit = await _make_commit( db_session, repo.repo_id, manifest={"cached.txt": oid}, seed="fn1" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) assert result["blob_count"] == 1 assert backend.get_calls == [], "content_cache hit must not trigger per-object GET" assert backend.get_mpack_calls == [], "content_cache hit must not trigger mpack GET" # ── FN-2: object stored only in mpack ──────────────────────────────────────── @pytest.mark.asyncio async def test_fn2_object_served_from_mpack_no_per_object_key( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Object exists only in mpack storage (no per-object key) → correctly extracted.""" backend = _FakeBackend() monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"mpack-only content" oid = blob_id(raw) mpack_bytes = _make_mpack_bytes({oid: raw}) mpack_id = _mpack_key(mpack_bytes) await backend.put_mpack(mpack_id, mpack_bytes) await _index_object_in_mpack(db_session, oid, mpack_id) await db_session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo.repo_id, object_id=oid) .on_conflict_do_nothing() ) await db_session.commit() commit = await _make_commit( db_session, repo.repo_id, manifest={"f.txt": oid}, seed="fn2" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) assert result["blob_count"] == 1 assert result["mpack_url"] is not None, "should return a presigned mpack URL" # Per-object key was never written — get() must not have been called assert backend.get_calls == [], "must not fall back to per-object GET when mpack covers the object" assert mpack_id in backend.get_mpack_calls, "must fetch the covering mpack" # Verify the assembled fetch mpack contains the correct bytes. from muse.core.mpack import parse_wire_mpack fetch_mpack_id = result["mpack_id"] fetch_raw = backend._mpacks.get(fetch_mpack_id) assert fetch_raw is not None payload = parse_wire_mpack(fetch_raw) obj_map = {o["object_id"]: o["content"] for o in payload.get("blobs", [])} assert obj_map.get(oid) == raw # ── FN-3: objects across two mpacks → each fetched exactly once ─────────────── @pytest.mark.asyncio async def test_fn3_two_mpacks_each_fetched_once( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Objects span two mpacks. Each mpack is downloaded exactly once (no redundant GETs).""" backend = _FakeBackend() monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw_a = b"object from mpack A" raw_b = b"object from mpack B" oid_a = blob_id(raw_a) oid_b = blob_id(raw_b) mpack_a_bytes = _make_mpack_bytes({oid_a: raw_a}) mpack_b_bytes = _make_mpack_bytes({oid_b: raw_b}) mpack_a_id = _mpack_key(mpack_a_bytes) mpack_b_id = _mpack_key(mpack_b_bytes) await backend.put_mpack(mpack_a_id, mpack_a_bytes) await backend.put_mpack(mpack_b_id, mpack_b_bytes) await _index_object_in_mpack(db_session, oid_a, mpack_a_id) await _index_object_in_mpack(db_session, oid_b, mpack_b_id) for oid in (oid_a, oid_b): await db_session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo.repo_id, object_id=oid) .on_conflict_do_nothing() ) await db_session.commit() commit = await _make_commit( db_session, repo.repo_id, manifest={"a.txt": oid_a, "b.txt": oid_b}, seed="fn3", ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) assert result["blob_count"] == 2 assert backend.get_calls == [], "no per-object GETs" assert sorted(backend.get_mpack_calls) == sorted([mpack_a_id, mpack_b_id]) assert backend.get_mpack_calls.count(mpack_a_id) == 1, "mpack A fetched exactly once" assert backend.get_mpack_calls.count(mpack_b_id) == 1, "mpack B fetched exactly once" # ── FN-4: mpack missing from storage → per-object fallback ─────────────────── @pytest.mark.asyncio async def test_fn4_missing_mpack_falls_back_to_per_object_get( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """MPackIndex points at a mpack that is missing from storage → falls back to backend.get().""" backend = _FakeBackend() monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"fallback content" oid = blob_id(raw) # MPackIndex row exists but mpack is NOT in backend._mpacks ghost_mpack_id = fake_id("ghost-mpack") await _index_object_in_mpack(db_session, oid, ghost_mpack_id) await db_session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo.repo_id, object_id=oid) .on_conflict_do_nothing() ) await db_session.commit() # Per-object key IS available (legacy fallback) await backend.put(oid, raw) commit = await _make_commit( db_session, repo.repo_id, manifest={"f.txt": oid}, seed="fn4" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) assert result["blob_count"] == 1 assert ghost_mpack_id in backend.get_mpack_calls, "attempted mpack GET" assert oid in backend.get_calls, "fell back to per-object GET after mpack miss" # ── FN-5: object absent from MPackIndex → FetchNotIndexedError ─────────────── @pytest.mark.asyncio async def test_fn5_unindexed_object_raises_fetch_not_indexed_error( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Object not present in MPackIndex at all → FetchNotIndexedError (background job pending).""" backend = _FakeBackend() monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"not indexed yet" oid = blob_id(raw) # Object row exists in DB but NO MPackIndex entry and NO per-object key await db_session.execute( pg_insert(db.MusehubObject) .values(object_id=oid, path="f.txt", size_bytes=len(raw), storage_uri=f"mem://{oid}") .on_conflict_do_nothing(index_elements=["object_id"]) ) await db_session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo.repo_id, object_id=oid) .on_conflict_do_nothing() ) await db_session.commit() commit = await _make_commit( db_session, repo.repo_id, manifest={"f.txt": oid}, seed="fn5" ) with pytest.raises(FetchNotIndexedError): await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) # ── FN-6: mixed sources — cache + mpack + legacy ───────────────────────────── @pytest.mark.asyncio async def test_fn6_mixed_sources_all_merged( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Three objects: one from content_cache, one from mpack, one from legacy per-object key.""" backend = _FakeBackend() monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw_cached = b"in content_cache" raw_mpack = b"in mpack storage" raw_legacy = b"in per-object storage" oid_cached = blob_id(raw_cached) oid_mpack = blob_id(raw_mpack) oid_legacy = blob_id(raw_legacy) # oid_cached: stored in content_cache column await db_session.execute( pg_insert(db.MusehubObject) .values( object_id=oid_cached, path="cached.txt", size_bytes=len(raw_cached), storage_uri=f"mem://{oid_cached}", content_cache=raw_cached, ) .on_conflict_do_nothing(index_elements=["object_id"]) ) # oid_mpack: stored only in mpack, indexed via MPackIndex mpack_bytes = _make_mpack_bytes({oid_mpack: raw_mpack}) mpack_id = _mpack_key(mpack_bytes) await backend.put_mpack(mpack_id, mpack_bytes) await _index_object_in_mpack(db_session, oid_mpack, mpack_id) # oid_legacy: stored as per-object key in backend, has MPackIndex (mpack present too) # but for the legacy case we'll make the mpack missing so it falls back to get() ghost_id = fake_id("ghost") await _index_object_in_mpack(db_session, oid_legacy, ghost_id) await backend.put(oid_legacy, raw_legacy) for oid in (oid_cached, oid_mpack, oid_legacy): await db_session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo.repo_id, object_id=oid) .on_conflict_do_nothing() ) # Also need MPackIndex for oid_cached so the coverage check passes cache_mpack_bytes = _make_mpack_bytes({oid_cached: raw_cached}) cache_mpack_id = _mpack_key(cache_mpack_bytes) await backend.put_mpack(cache_mpack_id, cache_mpack_bytes) await _index_object_in_mpack(db_session, oid_cached, cache_mpack_id) await db_session.commit() commit = await _make_commit( db_session, repo.repo_id, manifest={"cached.txt": oid_cached, "mpack.txt": oid_mpack, "legacy.txt": oid_legacy}, seed="fn6", ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) assert result["blob_count"] == 3 # Verify assembled mpack contains all three objects with correct bytes from muse.core.mpack import parse_wire_mpack fetch_raw = backend._mpacks.get(result["mpack_id"]) assert fetch_raw is not None payload = parse_wire_mpack(fetch_raw) obj_map = {o["object_id"]: bytes(o["content"]) for o in payload.get("blobs", [])} assert obj_map[oid_cached] == raw_cached assert obj_map[oid_mpack] == raw_mpack assert obj_map[oid_legacy] == raw_legacy # content_cache object must not trigger any GET assert oid_cached not in backend.get_calls # mpack object must not trigger per-object GET assert oid_mpack not in backend.get_calls # legacy fallback fired for oid_legacy assert oid_legacy in backend.get_calls # ── FN-7: zstd-compressed objects in mpack → decompressed before serving ────── @pytest.mark.asyncio async def test_fn7_zstd_compressed_mpack_objects_decompressed( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Objects stored with encoding='zstd' in the covering mpack must be decompressed before being placed in the assembled fetch mpack. Regression test for the bug where _extract_from_mpack served raw compressed bytes, causing the muse client's sha256 integrity check to fail: expected sha256(decompressed) but got sha256(compressed) This caused apply_mpack to skip all objects, leaving HEAD unadvanced and muse log returning 0 commits. """ backend = _FakeBackend() monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") # Object ID is sha256 of DECOMPRESSED bytes (content-addressed). raw = b"decompressed object content for fn7" oid = blob_id(raw) # Push mpack stores the object with encoding='zstd'. mpack_bytes_zstd = _make_mpack_bytes_zstd({oid: raw}) mpack_id = _mpack_key(mpack_bytes_zstd) await backend.put_mpack(mpack_id, mpack_bytes_zstd) await _index_object_in_mpack(db_session, oid, mpack_id) await db_session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo.repo_id, object_id=oid) .on_conflict_do_nothing() ) await db_session.commit() commit = await _make_commit( db_session, repo.repo_id, manifest={"file.txt": oid}, seed="fn7" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) assert result["blob_count"] == 1, "expected 1 object in fetch mpack" # The assembled fetch mpack must contain the DECOMPRESSED bytes. # If still compressed, the client's sha256(content) != oid integrity check fails. from muse.core.mpack import parse_wire_mpack fetch_raw = backend._mpacks.get(result["mpack_id"]) assert fetch_raw is not None, "fetch mpack not stored in backend" payload = parse_wire_mpack(fetch_raw) obj_map = {o["object_id"]: bytes(o["content"]) for o in payload.get("blobs", [])} assert oid in obj_map, f"object {oid[:20]} missing from assembled fetch mpack" assert obj_map[oid] == raw, ( "object content in assembled mpack must be DECOMPRESSED bytes matching oid; " "got compressed bytes — integrity check would fail on client" )