"""TDD — Phase 3: make push fully synchronous (issue #69). After Phase 2 the background job no longer does per-object MinIO writes. Phase 3 removes the background job entirely: wire_push_unpack_mpack processes the mpack inline for ALL sizes, makes objects immediately fetchable, and does not enqueue a mpack.index background job. After this change: - wire_push_unpack_mpack must NOT enqueue a mpack.index MusehubBackgroundJob - MusehubObject.storage_uri must be mpack://{mpack_key} immediately after push - wire_fetch_mpack must succeed immediately after push (no FetchNotIndexedError) - The large-mpack threshold (mpack_content_cache_max_bytes) is removed as a gate Test IDs: P3-1 no mpack.index background job enqueued after wire_push_unpack_mpack P3-2 MusehubObject.storage_uri is mpack:// URI immediately after push P3-3 full round-trip: push → immediate fetch → correct objects served """ from __future__ import annotations import hashlib from collections.abc import Mapping from datetime import datetime, timezone import msgpack import pytest import zstandard pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from muse.core.types import blob_id, fake_id from musehub.db import musehub_repo_models as db from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_wire import ( wire_push_unpack_mpack, wire_fetch_mpack, ) from tests.factories import create_repo def _now() -> datetime: return datetime.now(tz=timezone.utc) def _mpack_id(raw: bytes) -> str: return "sha256:" + hashlib.sha256(raw).hexdigest() def _build_push_mpack(objects: Mapping[str, bytes]) -> bytes: """Build a push mpack with zstd-compressed objects and no commits/snapshots.""" cctx = zstandard.ZstdCompressor() entries = [ {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)} for oid, data in objects.items() ] return msgpack.packb( {"commits": [], "snapshots": [], "objects": entries, "branch_heads": {}}, use_bin_type=True, ) class _FakeBackend: """Backend that stores mpacks and records put() calls.""" def __init__(self, mpack_store: Mapping[str, bytes]) -> None: self._mpacks = mpack_store self._objects: dict[str, bytes] = {} self.put_calls: list[str] = [] async def put(self, oid: str, data: bytes) -> str: self.put_calls.append(oid) self._objects[oid] = data return f"mem://{oid}" async def get(self, oid: str) -> bytes | None: return self._objects.get(oid) async def get_mpack(self, mpack_id: str) -> bytes | None: return self._mpacks.get(mpack_id) async def put_mpack(self, mpack_id: str, data: bytes) -> None: self._mpacks[mpack_id] = data async def exists(self, oid: str) -> bool: return oid in self._objects async def delete(self, oid: str) -> None: self._objects.pop(oid, None) async def presign_get(self, oid: str, ttl: int) -> str: return f"https://minio.test/{oid}" async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str: return f"https://minio.test/mpacks/{mpack_id}" async def quarantine_mpack(self, mpack_key: str) -> None: pass def uri_for(self, oid: str) -> str: return f"mem://{oid}" supports_presign: bool = True # ── P3-1: no mpack.index background job enqueued ───────────────────────────── @pytest.mark.asyncio async def test_p3_1_no_mpack_index_job_enqueued( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """wire_push_unpack_mpack must not enqueue a mpack.index background job. Phase 3 removes the background job entirely: the push path processes everything inline. A mpack.index job row in DB is a Phase 3 regression. """ raw = b"phase3 object alpha" oid = blob_id(raw) mpack_bytes = _build_push_mpack({oid: raw}) mpack_key = _mpack_id(mpack_bytes) backend = _FakeBackend({mpack_key: mpack_bytes}) monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") await wire_push_unpack_mpack( db_session, repo.repo_id, mpack_key, pusher_id="gabriel", branch="main", head_commit_id="", commits_count=0, objects_count=1, ) await db_session.commit() job_rows = (await db_session.execute( select(MusehubBackgroundJob) .where(MusehubBackgroundJob.repo_id == repo.repo_id) .where(MusehubBackgroundJob.job_type == "mpack.index") )).scalars().all() assert job_rows == [], ( f"Phase 3: wire_push_unpack_mpack enqueued {len(job_rows)} mpack.index job(s). " f"The background job must be removed — push is now fully synchronous." ) # ── P3-2: storage_uri is mpack:// immediately after push ───────────────────── @pytest.mark.asyncio async def test_p3_2_storage_uri_is_mpack_uri_after_push( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """MusehubObject.storage_uri must be mpack://... immediately after push. Previously small mpacks set storage_uri='pending' (to be promoted by the background job). After Phase 3 there is no job, so the URI must be set to the covering mpack key on the push path itself. """ raw = b"phase3 storage uri content" oid = blob_id(raw) mpack_bytes = _build_push_mpack({oid: raw}) mpack_key = _mpack_id(mpack_bytes) backend = _FakeBackend({mpack_key: mpack_bytes}) monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") await wire_push_unpack_mpack( db_session, repo.repo_id, mpack_key, pusher_id="gabriel", branch="main", head_commit_id="", commits_count=0, objects_count=1, ) await db_session.commit() row = await db_session.get(db.MusehubObject, oid) assert row is not None, f"MusehubObject row missing for {oid[:20]}" assert row.storage_uri.startswith("mpack://"), ( f"storage_uri should be mpack://... immediately after push but got {row.storage_uri!r}. " f"Phase 3 sets the mpack URI inline without a background job." ) assert mpack_key in row.storage_uri, ( f"storage_uri {row.storage_uri!r} does not reference mpack_key" ) # ── P3-3: full round-trip — push → immediate fetch ─────────────────────────── @pytest.mark.asyncio async def test_p3_3_immediate_fetch_after_push( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Full round-trip: push → wire_fetch_mpack succeeds immediately (no job wait). This is the end-to-end Phase 3 test: after wire_push_unpack_mpack returns, wire_fetch_mpack must be able to serve the object without a FetchNotIndexedError. """ raw_content = b"phase3 roundtrip content" oid = blob_id(raw_content) # Build a mpack with a commit referencing this object via a snapshot. cctx = zstandard.ZstdCompressor() snap_id = fake_id("snap-p3-roundtrip") commit_id = fake_id("commit-p3-roundtrip") mpack_bytes = msgpack.packb( { "commits": [ { "commit_id": commit_id, "parent_commit_id": None, "parent2_commit_id": None, "branch": "main", "message": "p3 roundtrip", "author": "gabriel", "committed_at": _now().isoformat(), "snapshot_id": snap_id, "agent_id": "", "model_id": "", "toolchain_id": "", "signature": "", "signer_key_id": "", "sem_ver_bump": "patch", "breaking_changes": [], "prompt_hash": "", } ], "snapshots": [ { "snapshot_id": snap_id, "parent_snapshot_id": None, "delta_upsert": {"file.txt": oid}, "delta_remove": [], } ], "objects": [ { "object_id": oid, "encoding": "zstd", "content": cctx.compress(raw_content), } ], "branch_heads": {"main": commit_id}, }, use_bin_type=True, ) mpack_key = _mpack_id(mpack_bytes) backend = _FakeBackend({mpack_key: mpack_bytes}) monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) repo = await create_repo(db_session, owner="gabriel", visibility="public") await wire_push_unpack_mpack( db_session, repo.repo_id, mpack_key, pusher_id="gabriel", branch="main", head_commit_id=commit_id, commits_count=1, objects_count=1, ) await db_session.commit() # Immediately fetch — no background job has run, no wait. result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit_id], have=[] ) assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data" assert result["object_count"] == 1 # Verify the assembled fetch mpack contains correct bytes. fetch_mpack_id = result["mpack_id"] fetch_raw = backend._mpacks.get(fetch_mpack_id) assert fetch_raw is not None, "assembled fetch mpack not found in backend" if fetch_raw[:4] == b"MUSE": from muse.core.mpack import parse_wire_mpack as _parse_wm payload = _parse_wm(fetch_raw) else: payload = msgpack.unpackb(fetch_raw, raw=False) obj_map = {o["object_id"]: bytes(o["content"]) for o in payload["objects"]} assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack" assert obj_map[oid] == raw_content, ( "object content from Phase 3 synchronous push must match original bytes" )