"""TDD — Phase 2: remove per-object MinIO writes from background job (issue #69). After Phase 1 the fetch path reads objects from the covering mpack. Phase 2 removes the bottleneck: the background job's parallel MinIO PUTs that took ~25 s of the ~40 s XL job latency. After this change: - process_mpack_index_job must NOT call backend.put(oid, data) for any object - MusehubObject rows must have storage_uri='mpack://{mpack_key}' (not a MinIO URI) - MPackIndex rows must exist for all objects (required by Phase 1 fetch) - wire_fetch_mpack must still serve correct data via the mpack path Test IDs: P2-1 process_mpack_index_job makes zero per-object backend.put() calls P2-2 MusehubObject.storage_uri set to mpack URI after background job P2-3 full round-trip: push-style mpack → bg job → fetch mpack → correct objects served """ from __future__ import annotations import hashlib from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, call import msgpack import pytest import zstandard from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from muse.core.types import blob_id, fake_id from musehub.db import musehub_repo_models as db from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_wire import ( process_mpack_index_job, wire_fetch_mpack, ) from tests.factories import create_repo def _now() -> datetime: return datetime.now(tz=timezone.utc) def _mpack_id(raw: bytes) -> str: return "sha256:" + hashlib.sha256(raw).hexdigest() def _build_push_mpack(objects: dict[str, bytes]) -> bytes: """Build a realistic push mpack with zstd-compressed objects.""" cctx = zstandard.ZstdCompressor() entries = [ {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)} for oid, data in objects.items() ] return msgpack.packb( {"commits": [], "snapshots": [], "objects": entries, "branch_heads": {}}, use_bin_type=True, ) class _FakeBackend: """Records all put() calls so tests can assert they were NOT made.""" def __init__(self, mpack_store: dict[str, bytes]) -> None: self._mpacks = mpack_store self._objects: dict[str, bytes] = {} self.put_calls: list[str] = [] async def put(self, oid: str, data: bytes) -> str: self.put_calls.append(oid) self._objects[oid] = data return f"mem://{oid}" async def get(self, oid: str) -> bytes | None: return self._objects.get(oid) async def get_mpack(self, mpack_id: str) -> bytes | None: return self._mpacks.get(mpack_id) async def put_mpack(self, mpack_id: str, data: bytes) -> None: self._mpacks[mpack_id] = data async def exists(self, oid: str) -> bool: return oid in self._objects async def delete(self, oid: str) -> None: self._objects.pop(oid, None) async def presign_get(self, oid: str, ttl: int) -> str: return f"https://minio.test/{oid}" async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str: return f"https://minio.test/mpacks/{mpack_id}" async def quarantine_mpack(self, mpack_key: str) -> None: pass def uri_for(self, oid: str) -> str: return f"mem://{oid}" supports_presign: bool = True async def _make_job( session: AsyncSession, repo_id: str, mpack_key: str, n_objects: int, ) -> str: """Insert a mpack.index background job row and return its job_id.""" from musehub.core.genesis import compute_job_id now = datetime.now(tz=timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "branch": "main", "head": "", "pusher_id": "gabriel", "declared_objects_count": n_objects, "declared_commits_count": 0, }, status="pending", created_at=now, attempt=0, )) await session.commit() return job_id # ── P2-1: no per-object backend.put() calls ─────────────────────────────────── @pytest.mark.asyncio async def test_p2_1_no_per_object_put_calls( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """process_mpack_index_job must make zero per-object backend.put() calls. After Phase 2, objects are served from the covering mpack. Writing per-object MinIO keys is the bottleneck (~25s of ~40s XL job). Removing them eliminates the wait without breaking fetch (Phase 1 serves from mpack). """ raw_a = b"object content alpha" raw_b = b"object content beta" oid_a = blob_id(raw_a) oid_b = blob_id(raw_b) mpack_bytes = _build_push_mpack({oid_a: raw_a, oid_b: raw_b}) mpack_key = _mpack_id(mpack_bytes) mpack_store: dict[str, bytes] = {mpack_key: mpack_bytes} backend = _FakeBackend(mpack_store) monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=2) await process_mpack_index_job(db_session, job_id) await db_session.commit() # No per-object puts at all — Phase 2 removes them. assert backend.put_calls == [], ( f"process_mpack_index_job called backend.put() for objects: {backend.put_calls}. " f"Phase 2 must not write per-object MinIO keys." ) # ── P2-2: storage_uri set to mpack URI ──────────────────────────────────────── @pytest.mark.asyncio async def test_p2_2_storage_uri_is_mpack_uri( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """After process_mpack_index_job, MusehubObject.storage_uri must be mpack://... URI. Previously it was set to backend.uri_for(oid) (a MinIO object URI) after the per-object PUT. After Phase 2 there is no PUT, so the URI must reflect the actual storage location: the covering mpack. """ raw = b"phase2 storage uri test" oid = blob_id(raw) mpack_bytes = _build_push_mpack({oid: raw}) mpack_key = _mpack_id(mpack_bytes) backend = _FakeBackend({mpack_key: mpack_bytes}) monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1) await process_mpack_index_job(db_session, job_id) await db_session.commit() row = await db_session.get(db.MusehubObject, oid) assert row is not None, f"MusehubObject row missing for {oid[:20]}" assert row.storage_uri.startswith("mpack://"), ( f"storage_uri should be mpack://... but got {row.storage_uri!r}. " f"Per-object MinIO key no longer exists after Phase 2." ) assert mpack_key in row.storage_uri, ( f"storage_uri {row.storage_uri!r} does not reference mpack_key {mpack_key[:20]}" ) # ── P2-3: full round-trip — push → bg job → fetch → correct objects ─────────── @pytest.mark.asyncio async def test_p2_3_full_roundtrip_no_per_object_keys( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Full round-trip: mpack-in → background job (no MinIO PUT) → wire_fetch_mpack returns correct bytes. This is the end-to-end Phase 2 test: push mpack stored in MinIO, background job indexes without per-object puts, fetch serves from mpack. """ raw_content = b"roundtrip content for phase2" oid = blob_id(raw_content) mpack_bytes = _build_push_mpack({oid: raw_content}) mpack_key = _mpack_id(mpack_bytes) backend = _FakeBackend({mpack_key: mpack_bytes}) monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1) await process_mpack_index_job(db_session, job_id) await db_session.commit() # Confirm no per-object MinIO key was written. assert backend.put_calls == [], "Phase 2: no per-object puts expected" assert oid not in backend._objects, "per-object key must not exist after Phase 2 job" # Now set up a commit/snapshot referencing this object so wire_fetch_mpack # has something to serve. snap_id = fake_id("snap-p2-roundtrip") snap = db.MusehubSnapshot( snapshot_id=snap_id, manifest_blob=msgpack.packb({"file.txt": oid}, use_bin_type=True), directories=[], entry_count=1, created_at=_now(), ) db_session.add(snap) commit_id = fake_id("commit-p2-roundtrip") commit = db.MusehubCommit( commit_id=commit_id, branch="main", parent_ids=[], message="p2 roundtrip", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) db_session.add(commit) await db_session.execute( pg_insert(db.MusehubCommitGraph) .values(commit_id=commit_id, parent_ids=[], generation=0, snapshot_id=snap_id) .on_conflict_do_nothing() ) await db_session.execute( pg_insert(db.MusehubCommitRef) .values(repo_id=repo.repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await db_session.commit() # wire_fetch_mpack must serve the object from the mpack, not per-object GET. result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit_id], have=[] ) assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data" assert result["object_count"] == 1 # Verify the assembled fetch mpack contains the correct bytes. fetch_mpack_id = result["mpack_id"] fetch_raw = backend._mpacks.get(fetch_mpack_id) assert fetch_raw is not None, "assembled fetch mpack not found in backend" if fetch_raw[:4] == b"MUSE": from muse.core.mpack import parse_wire_mpack as _parse_wm payload = _parse_wm(fetch_raw) else: payload = msgpack.unpackb(fetch_raw, raw=False) obj_map = {o["object_id"]: bytes(o["content"]) for o in payload["objects"]} assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack" assert obj_map[oid] == raw_content, ( "object content from Phase 2 mpack-only path must match original bytes" ) # Per-object GET must not have been called — the mpack path served it. assert oid not in backend.put_calls