"""TDD — process_mpack_index_job: resurrect background byte-range indexer. The function was deleted but the worker still dispatches to it. Without it: - mpack.index jobs fail with 'unknown job_type' - MusehubMPackIndex rows have byte_offset=NULL - Every blob fetch downloads the entire mpack instead of a byte-range GET - Page loads take 30-35 seconds instead of <1 second Tests: PIJ-1 process_mpack_index_job is importable from musehub_wire_push PIJ-2 Worker dispatch handles mpack.index job_type (no 'unknown job_type') PIJ-3 Job with missing mpack_key raises ValueError PIJ-4 After job runs, MusehubMPackIndex rows have byte_offset populated PIJ-5 After job runs, byte_offset + byte_length correctly locates bytes in mpack """ from __future__ import annotations import datetime import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id, blob_id from musehub.core.genesis import compute_identity_id, compute_repo_id, compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.db.musehub_repo_models import MusehubBranch, MusehubMPackIndex, MusehubRepo # --------------------------------------------------------------------------- # PIJ-1 importable # --------------------------------------------------------------------------- def test_PIJ1_process_mpack_index_job_importable() -> None: from musehub.services.musehub_wire_push import process_mpack_index_job # noqa: F401 assert callable(process_mpack_index_job) # --------------------------------------------------------------------------- # PIJ-2 worker dispatch handles mpack.index # --------------------------------------------------------------------------- def test_PIJ2_worker_handles_mpack_index_job_type() -> None: """Worker must have an elif branch for mpack.index — not fall through to get_provider.""" import inspect import musehub.worker as w src = inspect.getsource(w) assert 'mpack.index' in src, ( "Worker must have an explicit handler for mpack.index job_type" ) # --------------------------------------------------------------------------- # PIJ-3 missing mpack_key raises # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_PIJ3_missing_mpack_key_raises(db_session: AsyncSession) -> None: from musehub.services.musehub_wire_push import process_mpack_index_job now = datetime.datetime.now(tz=datetime.timezone.utc) owner_id = compute_identity_id(b"gabriel") repo_id = compute_repo_id(owner_id, "pij-test", "code", now.isoformat()) db_session.add(MusehubRepo( repo_id=repo_id, name="pij-test", owner="gabriel", slug="pij-test", visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now, )) await db_session.flush() job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) db_session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={}, # missing mpack_key status="pending", created_at=now, attempt=0, )) await db_session.flush() with pytest.raises(ValueError, match="mpack_key"): await process_mpack_index_job(db_session, job_id) # --------------------------------------------------------------------------- # PIJ-4 + PIJ-5 byte_offset populated after job runs # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_PIJ4_byte_offset_populated_after_job( db_session: AsyncSession, ) -> None: """After process_mpack_index_job runs, MusehubMPackIndex rows have byte_offset set.""" from muse.core.mpack import build_wire_mpack from musehub.services.musehub_wire_push import process_mpack_index_job from musehub.storage.backends import get_backend from unittest.mock import AsyncMock, MagicMock, patch # Build a real wire mpack with one blob content = b"hello byte range world" oid = blob_id(content) wire = build_wire_mpack({ "blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": [], }) mpack_key = blob_id(wire) now = datetime.datetime.now(tz=datetime.timezone.utc) owner_id = compute_identity_id(b"gabriel") repo_id = compute_repo_id(owner_id, f"pij-byte-{now.timestamp()}", "code", now.isoformat()) db_session.add(MusehubRepo( repo_id=repo_id, name="pij-byte", owner="gabriel", slug="pij-byte", visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now, )) await db_session.flush() job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) db_session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={"mpack_key": mpack_key, "head": "", "branch": "main"}, status="pending", created_at=now, attempt=0, )) await db_session.flush() # Mock backend to return our wire bytes mock_backend = MagicMock() mock_backend.get_mpack = AsyncMock(return_value=wire) mock_backend.put = AsyncMock(return_value=f"s3://test/{oid}") with patch("musehub.storage.backends.get_backend", return_value=mock_backend): result = await process_mpack_index_job(db_session, job_id) assert result.get("mpack_index_written", 0) > 0, ( "process_mpack_index_job must write MusehubMPackIndex rows" ) idx_rows = (await db_session.execute( select(MusehubMPackIndex).where( MusehubMPackIndex.mpack_id == mpack_key, MusehubMPackIndex.entity_type == "object", ) )).scalars().all() assert idx_rows, "MusehubMPackIndex must have rows for the mpack" rows_with_offset = [r for r in idx_rows if r.byte_offset is not None] assert rows_with_offset, ( "MusehubMPackIndex rows must have byte_offset populated after process_mpack_index_job. " "Without byte_offset, every blob fetch downloads the entire mpack." ) @pytest.mark.asyncio async def test_PIJ5_byte_range_locates_correct_bytes( db_session: AsyncSession, ) -> None: """byte_offset + byte_length must locate the exact blob bytes within the mpack.""" from muse.core.mpack import build_wire_mpack from musehub.services.musehub_wire_push import process_mpack_index_job from unittest.mock import AsyncMock, MagicMock, patch content = b"exact bytes for range test " * 10 oid = blob_id(content) wire = build_wire_mpack({ "blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": [], }) mpack_key = blob_id(wire) now = datetime.datetime.now(tz=datetime.timezone.utc) owner_id = compute_identity_id(b"gabriel") repo_id = compute_repo_id(owner_id, f"pij-range-{now.timestamp()}", "code", now.isoformat()) db_session.add(MusehubRepo( repo_id=repo_id, name="pij-range", owner="gabriel", slug="pij-range", visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now, )) await db_session.flush() job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) db_session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={"mpack_key": mpack_key, "head": "", "branch": "main"}, status="pending", created_at=now, attempt=0, )) await db_session.flush() mock_backend = MagicMock() mock_backend.get_mpack = AsyncMock(return_value=wire) mock_backend.put = AsyncMock(return_value=f"s3://test/{oid}") with patch("musehub.storage.backends.get_backend", return_value=mock_backend): await process_mpack_index_job(db_session, job_id) row = (await db_session.execute( select(MusehubMPackIndex).where( MusehubMPackIndex.entity_id == oid, MusehubMPackIndex.mpack_id == mpack_key, ) )).scalar_one_or_none() assert row is not None, f"No MPackIndex row for oid={oid[:16]}" assert row.byte_offset is not None, "byte_offset must be set" assert row.byte_length is not None, "byte_length must be set" # The byte range must locate the raw blob content within the wire mpack extracted = wire[row.byte_offset: row.byte_offset + row.byte_length] assert content in extracted or extracted == content, ( f"byte_range[{row.byte_offset}:{row.byte_offset+row.byte_length}] " f"does not contain blob content. extracted={extracted[:20]!r}" )