"""Phase 3 — Failing test for Bug D (issue #76). Bug D: process_mpack_index_job reads the mpack, computes byte offsets, and upserts mpack_index rows and musehub_objects.storage_uri — but never calls _upsert_object_refs. So even after the job completes, the repo's object_refs table stays empty. These tests must be RED before the Bug D fix lands. They become the regression guard once Bug D is fixed. Tests ----- MPIJ-1 process_mpack_index_job writes mpack_index byte ranges (baseline — PASS). MPIJ-2 process_mpack_index_job writes object_refs for the job's repo (Bug D — FAIL). """ from __future__ import annotations import hashlib from unittest.mock import AsyncMock, MagicMock, patch import pytest from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from muse.core.mpack import build_wire_mpack from muse.core.types import blob_id, fake_id from musehub.core.genesis import compute_identity_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.db.musehub_repo_models import MusehubMPackIndex, MusehubObjectRef from musehub.services.musehub_repository import create_repo from musehub.services.musehub_wire_push import process_mpack_index_job # --------------------------------------------------------------------------- # Shared fixtures # --------------------------------------------------------------------------- _OWNER = "gabriel" _IDENTITY_ID = compute_identity_id(b"gabriel") _BLOB_A_CONTENT = b"blob-alpha-mpij-test" _BLOB_B_CONTENT = b"blob-beta-mpij-test" _BLOB_C_CONTENT = b"blob-gamma-mpij-test" def _sha256_id(data: bytes) -> str: return f"sha256:{hashlib.sha256(data).hexdigest()}" _BLOB_A_OID = _sha256_id(_BLOB_A_CONTENT) _BLOB_B_OID = _sha256_id(_BLOB_B_CONTENT) _BLOB_C_OID = _sha256_id(_BLOB_C_CONTENT) _MPACK_BYTES = build_wire_mpack({ "blobs": [ {"object_id": _BLOB_A_OID, "content": _BLOB_A_CONTENT}, {"object_id": _BLOB_B_OID, "content": _BLOB_B_CONTENT}, {"object_id": _BLOB_C_OID, "content": _BLOB_C_CONTENT}, ], "commits": [], "snapshots": [], "tags": [], }) _MPACK_KEY = blob_id(_MPACK_BYTES) async def _make_repo(session: AsyncSession, name: str): r = await create_repo( session, name=name, owner=_OWNER, owner_user_id=_IDENTITY_ID, visibility="public", initialize=False, ) await session.commit() return r async def _make_mpack_index_job( session: AsyncSession, repo_id: str, mpack_key: str ) -> MusehubBackgroundJob: job = MusehubBackgroundJob( job_id=fake_id(f"job-{repo_id[:8]}"), repo_id=repo_id, job_type="mpack.index", status="running", payload={"mpack_key": mpack_key, "head": fake_id("head"), "branch": "main"}, ) session.add(job) await session.flush() return job async def _run_mpack_index_job( session: AsyncSession, job_id: str, mpack_bytes: bytes ) -> dict: backend = MagicMock() backend.get_mpack = AsyncMock(return_value=mpack_bytes) with patch("musehub.storage.backends.get_backend", return_value=backend): return await process_mpack_index_job(session, job_id) async def _object_ref_count(session: AsyncSession, repo_id: str) -> int: return (await session.execute( select(func.count()).where(MusehubObjectRef.repo_id == repo_id) )).scalar_one() async def _mpack_index_byte_range_count(session: AsyncSession, mpack_id: str) -> int: return (await session.execute( select(func.count()).where( MusehubMPackIndex.mpack_id == mpack_id, MusehubMPackIndex.entity_type == "object", MusehubMPackIndex.byte_offset.is_not(None), ) )).scalar_one() # --------------------------------------------------------------------------- # MPIJ-1 Baseline: job writes mpack_index byte ranges # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_MPIJ1_job_writes_mpack_index_byte_ranges(db_session: AsyncSession) -> None: """Baseline: process_mpack_index_job writes byte_offset/byte_length for all blobs.""" repo = await _make_repo(db_session, "mpij-repo-1") job = await _make_mpack_index_job(db_session, repo.repo_id, _MPACK_KEY) await _run_mpack_index_job(db_session, job.job_id, _MPACK_BYTES) count = await _mpack_index_byte_range_count(db_session, _MPACK_KEY) assert count == 3, f"Expected 3 mpack_index byte-range rows from job, got {count}" # --------------------------------------------------------------------------- # MPIJ-2 Bug D: job does NOT write object_refs for the repo # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_MPIJ2_job_writes_object_refs_for_repo(db_session: AsyncSession) -> None: """Bug D: process_mpack_index_job must write object_refs for job_row.repo_id. The job upserts mpack_index byte ranges and musehub_objects.storage_uri — but never calls _upsert_object_refs. With no object_refs, wire fetch has no object-to-mpack mapping for this repo. Fix: add _upsert_object_refs(session, repo_id, all_blob_oids) to process_mpack_index_job after the mpack_index upsert step. """ repo = await _make_repo(db_session, "mpij-repo-2") job = await _make_mpack_index_job(db_session, repo.repo_id, _MPACK_KEY) await _run_mpack_index_job(db_session, job.job_id, _MPACK_BYTES) count = await _object_ref_count(db_session, repo.repo_id) assert count == 3, ( f"Bug D: expected 3 object_refs for repo after mpack.index job, got {count}. " f"process_mpack_index_job must call _upsert_object_refs(session, repo_id, all_blob_oids)." )