test_process_mpack_index_job.py
python
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
| 1 | """TDD — process_mpack_index_job: resurrect background byte-range indexer. |
| 2 | |
| 3 | The function was deleted but the worker still dispatches to it. Without it: |
| 4 | - mpack.index jobs fail with 'unknown job_type' |
| 5 | - MusehubMPackIndex rows have byte_offset=NULL |
| 6 | - Every blob fetch downloads the entire mpack instead of a byte-range GET |
| 7 | - Page loads take 30-35 seconds instead of <1 second |
| 8 | |
| 9 | Tests: |
| 10 | PIJ-1 process_mpack_index_job is importable from musehub_wire_push |
| 11 | PIJ-2 Worker dispatch handles mpack.index job_type (no 'unknown job_type') |
| 12 | PIJ-3 Job with missing mpack_key raises ValueError |
| 13 | PIJ-4 After job runs, MusehubMPackIndex rows have byte_offset populated |
| 14 | PIJ-5 After job runs, byte_offset + byte_length correctly locates bytes in mpack |
| 15 | """ |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import datetime |
| 19 | import pytest |
| 20 | from sqlalchemy import select |
| 21 | from sqlalchemy.ext.asyncio import AsyncSession |
| 22 | |
| 23 | from muse.core.types import fake_id, blob_id |
| 24 | from musehub.core.genesis import compute_identity_id, compute_repo_id, compute_job_id |
| 25 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 26 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubMPackIndex, MusehubRepo |
| 27 | |
| 28 | |
| 29 | # --------------------------------------------------------------------------- |
| 30 | # PIJ-1 importable |
| 31 | # --------------------------------------------------------------------------- |
| 32 | |
| 33 | def test_PIJ1_process_mpack_index_job_importable() -> None: |
| 34 | from musehub.services.musehub_wire_push import process_mpack_index_job # noqa: F401 |
| 35 | assert callable(process_mpack_index_job) |
| 36 | |
| 37 | |
| 38 | # --------------------------------------------------------------------------- |
| 39 | # PIJ-2 worker dispatch handles mpack.index |
| 40 | # --------------------------------------------------------------------------- |
| 41 | |
| 42 | def test_PIJ2_worker_handles_mpack_index_job_type() -> None: |
| 43 | """Worker must have an elif branch for mpack.index — not fall through to get_provider.""" |
| 44 | import inspect |
| 45 | import musehub.worker as w |
| 46 | src = inspect.getsource(w) |
| 47 | assert 'mpack.index' in src, ( |
| 48 | "Worker must have an explicit handler for mpack.index job_type" |
| 49 | ) |
| 50 | |
| 51 | |
| 52 | # --------------------------------------------------------------------------- |
| 53 | # PIJ-3 missing mpack_key raises |
| 54 | # --------------------------------------------------------------------------- |
| 55 | |
| 56 | @pytest.mark.asyncio |
| 57 | async def test_PIJ3_missing_mpack_key_raises(db_session: AsyncSession) -> None: |
| 58 | from musehub.services.musehub_wire_push import process_mpack_index_job |
| 59 | |
| 60 | now = datetime.datetime.now(tz=datetime.timezone.utc) |
| 61 | owner_id = compute_identity_id(b"gabriel") |
| 62 | repo_id = compute_repo_id(owner_id, "pij-test", "code", now.isoformat()) |
| 63 | db_session.add(MusehubRepo( |
| 64 | repo_id=repo_id, name="pij-test", owner="gabriel", slug="pij-test", |
| 65 | visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now, |
| 66 | )) |
| 67 | await db_session.flush() |
| 68 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 69 | db_session.add(MusehubBackgroundJob( |
| 70 | job_id=job_id, repo_id=repo_id, job_type="mpack.index", |
| 71 | payload={}, # missing mpack_key |
| 72 | status="pending", created_at=now, attempt=0, |
| 73 | )) |
| 74 | await db_session.flush() |
| 75 | |
| 76 | with pytest.raises(ValueError, match="mpack_key"): |
| 77 | await process_mpack_index_job(db_session, job_id) |
| 78 | |
| 79 | |
| 80 | # --------------------------------------------------------------------------- |
| 81 | # PIJ-4 + PIJ-5 byte_offset populated after job runs |
| 82 | # --------------------------------------------------------------------------- |
| 83 | |
| 84 | @pytest.mark.asyncio |
| 85 | async def test_PIJ4_byte_offset_populated_after_job( |
| 86 | db_session: AsyncSession, |
| 87 | ) -> None: |
| 88 | """After process_mpack_index_job runs, MusehubMPackIndex rows have byte_offset set.""" |
| 89 | from muse.core.mpack import build_wire_mpack |
| 90 | from musehub.services.musehub_wire_push import process_mpack_index_job |
| 91 | from musehub.storage.backends import get_backend |
| 92 | from unittest.mock import AsyncMock, MagicMock, patch |
| 93 | |
| 94 | # Build a real wire mpack with one blob |
| 95 | content = b"hello byte range world" |
| 96 | oid = blob_id(content) |
| 97 | wire = build_wire_mpack({ |
| 98 | "blobs": [{"object_id": oid, "content": content}], |
| 99 | "commits": [], "snapshots": [], |
| 100 | }) |
| 101 | mpack_key = blob_id(wire) |
| 102 | |
| 103 | now = datetime.datetime.now(tz=datetime.timezone.utc) |
| 104 | owner_id = compute_identity_id(b"gabriel") |
| 105 | repo_id = compute_repo_id(owner_id, f"pij-byte-{now.timestamp()}", "code", now.isoformat()) |
| 106 | db_session.add(MusehubRepo( |
| 107 | repo_id=repo_id, name="pij-byte", owner="gabriel", slug="pij-byte", |
| 108 | visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now, |
| 109 | )) |
| 110 | await db_session.flush() |
| 111 | |
| 112 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 113 | db_session.add(MusehubBackgroundJob( |
| 114 | job_id=job_id, repo_id=repo_id, job_type="mpack.index", |
| 115 | payload={"mpack_key": mpack_key, "head": "", "branch": "main"}, |
| 116 | status="pending", created_at=now, attempt=0, |
| 117 | )) |
| 118 | await db_session.flush() |
| 119 | |
| 120 | # Mock backend to return our wire bytes |
| 121 | mock_backend = MagicMock() |
| 122 | mock_backend.get_mpack = AsyncMock(return_value=wire) |
| 123 | mock_backend.put = AsyncMock(return_value=f"s3://test/{oid}") |
| 124 | |
| 125 | with patch("musehub.storage.backends.get_backend", return_value=mock_backend): |
| 126 | result = await process_mpack_index_job(db_session, job_id) |
| 127 | |
| 128 | assert result.get("mpack_index_written", 0) > 0, ( |
| 129 | "process_mpack_index_job must write MusehubMPackIndex rows" |
| 130 | ) |
| 131 | |
| 132 | idx_rows = (await db_session.execute( |
| 133 | select(MusehubMPackIndex).where( |
| 134 | MusehubMPackIndex.mpack_id == mpack_key, |
| 135 | MusehubMPackIndex.entity_type == "object", |
| 136 | ) |
| 137 | )).scalars().all() |
| 138 | |
| 139 | assert idx_rows, "MusehubMPackIndex must have rows for the mpack" |
| 140 | |
| 141 | rows_with_offset = [r for r in idx_rows if r.byte_offset is not None] |
| 142 | assert rows_with_offset, ( |
| 143 | "MusehubMPackIndex rows must have byte_offset populated after process_mpack_index_job. " |
| 144 | "Without byte_offset, every blob fetch downloads the entire mpack." |
| 145 | ) |
| 146 | |
| 147 | |
| 148 | @pytest.mark.asyncio |
| 149 | async def test_PIJ5_byte_range_locates_correct_bytes( |
| 150 | db_session: AsyncSession, |
| 151 | ) -> None: |
| 152 | """byte_offset + byte_length must locate the exact blob bytes within the mpack.""" |
| 153 | from muse.core.mpack import build_wire_mpack |
| 154 | from musehub.services.musehub_wire_push import process_mpack_index_job |
| 155 | from unittest.mock import AsyncMock, MagicMock, patch |
| 156 | |
| 157 | content = b"exact bytes for range test " * 10 |
| 158 | oid = blob_id(content) |
| 159 | wire = build_wire_mpack({ |
| 160 | "blobs": [{"object_id": oid, "content": content}], |
| 161 | "commits": [], "snapshots": [], |
| 162 | }) |
| 163 | mpack_key = blob_id(wire) |
| 164 | |
| 165 | now = datetime.datetime.now(tz=datetime.timezone.utc) |
| 166 | owner_id = compute_identity_id(b"gabriel") |
| 167 | repo_id = compute_repo_id(owner_id, f"pij-range-{now.timestamp()}", "code", now.isoformat()) |
| 168 | db_session.add(MusehubRepo( |
| 169 | repo_id=repo_id, name="pij-range", owner="gabriel", slug="pij-range", |
| 170 | visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now, |
| 171 | )) |
| 172 | await db_session.flush() |
| 173 | |
| 174 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 175 | db_session.add(MusehubBackgroundJob( |
| 176 | job_id=job_id, repo_id=repo_id, job_type="mpack.index", |
| 177 | payload={"mpack_key": mpack_key, "head": "", "branch": "main"}, |
| 178 | status="pending", created_at=now, attempt=0, |
| 179 | )) |
| 180 | await db_session.flush() |
| 181 | |
| 182 | mock_backend = MagicMock() |
| 183 | mock_backend.get_mpack = AsyncMock(return_value=wire) |
| 184 | mock_backend.put = AsyncMock(return_value=f"s3://test/{oid}") |
| 185 | |
| 186 | with patch("musehub.storage.backends.get_backend", return_value=mock_backend): |
| 187 | await process_mpack_index_job(db_session, job_id) |
| 188 | |
| 189 | row = (await db_session.execute( |
| 190 | select(MusehubMPackIndex).where( |
| 191 | MusehubMPackIndex.entity_id == oid, |
| 192 | MusehubMPackIndex.mpack_id == mpack_key, |
| 193 | ) |
| 194 | )).scalar_one_or_none() |
| 195 | |
| 196 | assert row is not None, f"No MPackIndex row for oid={oid[:16]}" |
| 197 | assert row.byte_offset is not None, "byte_offset must be set" |
| 198 | assert row.byte_length is not None, "byte_length must be set" |
| 199 | |
| 200 | # The byte range must locate the raw blob content within the wire mpack |
| 201 | extracted = wire[row.byte_offset: row.byte_offset + row.byte_length] |
| 202 | assert content in extracted or extracted == content, ( |
| 203 | f"byte_range[{row.byte_offset}:{row.byte_offset+row.byte_length}] " |
| 204 | f"does not contain blob content. extracted={extracted[:20]!r}" |
| 205 | ) |
File History
1 commit
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago