"""TDD — Phase 2: remove per-object MinIO writes from background job (issue #69). After Phase 1 the fetch path reads objects from the covering mpack. Phase 2 removes the bottleneck: the background job's parallel MinIO PUTs that took ~25 s of the ~40 s XL job latency. After this change: - process_mpack_index_job must NOT call backend.put(oid, data) for any object - MusehubObject rows must have storage_uri='mpack://{mpack_key}' (not a MinIO URI) - MPackIndex rows must exist for all objects (required by Phase 1 fetch) - wire_fetch_mpack must still serve correct data via the mpack path Test IDs: P2-1 process_mpack_index_job makes zero per-object backend.put() calls P2-2 MusehubObject.storage_uri set to mpack URI after background job P2-3 full round-trip: push-style mpack → bg job → fetch mpack → correct objects served """ from __future__ import annotations import hashlib from collections.abc import Mapping from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, call import msgpack import pytest import zstandard from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from muse.core.types import blob_id, fake_id from musehub.db import musehub_repo_models as db from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_wire import wire_fetch_mpack try: from musehub.services.musehub_wire import process_mpack_index_job _PROCESS_JOB_MISSING = False except ImportError: process_mpack_index_job = None # type: ignore[assignment] _PROCESS_JOB_MISSING = True from tests.factories import create_repo def _now() -> datetime: return datetime.now(tz=timezone.utc) def _mpack_id(raw: bytes) -> str: return "sha256:" + hashlib.sha256(raw).hexdigest() def _build_push_mpack(objects: Mapping[str, bytes]) -> bytes: """Build a realistic push mpack with zstd-compressed objects.""" cctx = zstandard.ZstdCompressor() entries = [ {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)} for oid, data in objects.items() ] return msgpack.packb( {"commits": [], "snapshots": [], "blobs": entries, "branch_heads": {}}, use_bin_type=True, ) class _FakeBackend: """Records all put() calls so tests can assert they were NOT made.""" def __init__(self, mpack_store: Mapping[str, bytes]) -> None: self._mpacks = mpack_store self._objects: dict[str, bytes] = {} self.put_calls: list[str] = [] async def put(self, oid: str, data: bytes) -> str: self.put_calls.append(oid) self._objects[oid] = data return f"mem://{oid}" async def get(self, oid: str) -> bytes | None: return self._objects.get(oid) async def get_mpack(self, mpack_id: str) -> bytes | None: return self._mpacks.get(mpack_id) async def put_mpack(self, mpack_id: str, data: bytes) -> None: self._mpacks[mpack_id] = data async def exists(self, oid: str) -> bool: return oid in self._objects async def delete(self, oid: str) -> None: self._objects.pop(oid, None) async def presign_get(self, oid: str, ttl: int) -> str: return f"https://minio.test/{oid}" async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str: return f"https://minio.test/mpacks/{mpack_id}" async def quarantine_mpack(self, mpack_key: str) -> None: pass def uri_for(self, oid: str) -> str: return f"mem://{oid}" supports_presign: bool = True async def _make_job( session: AsyncSession, repo_id: str, mpack_key: str, n_objects: int, ) -> str: """Insert a mpack.index background job row and return its job_id.""" from musehub.core.genesis import compute_job_id now = datetime.now(tz=timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "branch": "main", "head": "", "pusher_id": "gabriel", "declared_objects_count": n_objects, "declared_commits_count": 0, }, status="pending", created_at=now, attempt=0, )) await session.commit() return job_id # ── P2-1: no per-object backend.put() calls ─────────────────────────────────── @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") @pytest.mark.asyncio async def test_p2_1_no_per_object_put_calls( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """process_mpack_index_job must make zero per-object backend.put() calls. After Phase 2, objects are served from the covering mpack. Writing per-object MinIO keys is the bottleneck (~25s of ~40s XL job). Removing them eliminates the wait without breaking fetch (Phase 1 serves from mpack). """ raw_a = b"object content alpha" raw_b = b"object content beta" oid_a = blob_id(raw_a) oid_b = blob_id(raw_b) mpack_bytes = _build_push_mpack({oid_a: raw_a, oid_b: raw_b}) mpack_key = _mpack_id(mpack_bytes) mpack_store: dict[str, bytes] = {mpack_key: mpack_bytes} backend = _FakeBackend(mpack_store) monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend) monkeypatch.setattr("musehub.storage.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=2) await process_mpack_index_job(db_session, job_id) await db_session.commit() # No per-object puts at all — Phase 2 removes them. assert backend.put_calls == [], ( f"process_mpack_index_job called backend.put() for objects: {backend.put_calls}. " f"Phase 2 must not write per-object MinIO keys." ) # ── P2-2: storage_uri set to mpack URI ──────────────────────────────────────── @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") @pytest.mark.asyncio async def test_p2_2_storage_uri_is_mpack_uri( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """After process_mpack_index_job, MusehubObject.storage_uri must be mpack://... URI. Previously it was set to backend.uri_for(oid) (a MinIO object URI) after the per-object PUT. After Phase 2 there is no PUT, so the URI must reflect the actual storage location: the covering mpack. """ raw = b"phase2 storage uri test" oid = blob_id(raw) mpack_bytes = _build_push_mpack({oid: raw}) mpack_key = _mpack_id(mpack_bytes) backend = _FakeBackend({mpack_key: mpack_bytes}) monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend) monkeypatch.setattr("musehub.storage.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1) await process_mpack_index_job(db_session, job_id) await db_session.commit() row = await db_session.get(db.MusehubObject, oid) assert row is not None, f"MusehubObject row missing for {oid[:20]}" assert row.storage_uri.startswith("mpack://"), ( f"storage_uri should be mpack://... but got {row.storage_uri!r}. " f"Per-object MinIO key no longer exists after Phase 2." ) assert mpack_key in row.storage_uri, ( f"storage_uri {row.storage_uri!r} does not reference mpack_key {mpack_key[:20]}" ) # ── P2-3: full round-trip — push → bg job → fetch → correct objects ─────────── @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") @pytest.mark.asyncio async def test_p2_3_full_roundtrip_no_per_object_keys( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Full round-trip: mpack-in → background job (no MinIO PUT) → wire_fetch_mpack returns correct bytes. This is the end-to-end Phase 2 test: push mpack stored in MinIO, background job indexes without per-object puts, fetch serves from mpack. """ raw_content = b"roundtrip content for phase2" oid = blob_id(raw_content) mpack_bytes = _build_push_mpack({oid: raw_content}) mpack_key = _mpack_id(mpack_bytes) backend = _FakeBackend({mpack_key: mpack_bytes}) monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend) monkeypatch.setattr("musehub.storage.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1) await process_mpack_index_job(db_session, job_id) await db_session.commit() # Confirm no per-object MinIO key was written. assert backend.put_calls == [], "Phase 2: no per-object puts expected" assert oid not in backend._objects, "per-object key must not exist after Phase 2 job" # Now set up a commit/snapshot referencing this object so wire_fetch_mpack # has something to serve. snap_id = fake_id("snap-p2-roundtrip") snap = db.MusehubSnapshot( snapshot_id=snap_id, manifest_blob=msgpack.packb({"file.txt": oid}, use_bin_type=True), directories=[], entry_count=1, created_at=_now(), ) db_session.add(snap) commit_id = fake_id("commit-p2-roundtrip") commit = db.MusehubCommit( commit_id=commit_id, branch="main", parent_ids=[], message="p2 roundtrip", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) db_session.add(commit) await db_session.execute( pg_insert(db.MusehubCommitGraph) .values(commit_id=commit_id, parent_ids=[], generation=0, snapshot_id=snap_id) .on_conflict_do_nothing() ) await db_session.execute( pg_insert(db.MusehubCommitRef) .values(repo_id=repo.repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await db_session.commit() # wire_fetch_mpack must serve the object from the mpack, not per-object GET. result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit_id], have=[] ) assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data" assert result.get("blob_count", result.get("object_count", 0)) == 1 # Verify the assembled fetch mpack contains the correct bytes. fetch_mpack_id = result["mpack_id"] fetch_raw = backend._mpacks.get(fetch_mpack_id) assert fetch_raw is not None, "assembled fetch mpack not found in backend" if fetch_raw[:4] == b"MUSE": from muse.core.mpack import parse_wire_mpack as _parse_wm payload = _parse_wm(fetch_raw) else: payload = msgpack.unpackb(fetch_raw, raw=False) obj_map = {o["object_id"]: bytes(o["content"]) for o in (payload.get("blobs") or payload.get("objects") or [])} assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack" assert obj_map[oid] == raw_content, ( "object content from Phase 2 mpack-only path must match original bytes" ) # Per-object GET must not have been called — the mpack path served it. assert oid not in backend.put_calls