gabriel / musehub public
test_process_mpack_index_job.py python
205 lines 8.4 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """TDD — process_mpack_index_job: resurrect background byte-range indexer.
2
3 The function was deleted but the worker still dispatches to it. Without it:
4 - mpack.index jobs fail with 'unknown job_type'
5 - MusehubMPackIndex rows have byte_offset=NULL
6 - Every blob fetch downloads the entire mpack instead of a byte-range GET
7 - Page loads take 30-35 seconds instead of <1 second
8
9 Tests:
10 PIJ-1 process_mpack_index_job is importable from musehub_wire_push
11 PIJ-2 Worker dispatch handles mpack.index job_type (no 'unknown job_type')
12 PIJ-3 Job with missing mpack_key raises ValueError
13 PIJ-4 After job runs, MusehubMPackIndex rows have byte_offset populated
14 PIJ-5 After job runs, byte_offset + byte_length correctly locates bytes in mpack
15 """
16 from __future__ import annotations
17
18 import datetime
19 import pytest
20 from sqlalchemy import select
21 from sqlalchemy.ext.asyncio import AsyncSession
22
23 from muse.core.types import fake_id, blob_id
24 from musehub.core.genesis import compute_identity_id, compute_repo_id, compute_job_id
25 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
26 from musehub.db.musehub_repo_models import MusehubBranch, MusehubMPackIndex, MusehubRepo
27
28
29 # ---------------------------------------------------------------------------
30 # PIJ-1 importable
31 # ---------------------------------------------------------------------------
32
33 def test_PIJ1_process_mpack_index_job_importable() -> None:
34 from musehub.services.musehub_wire_push import process_mpack_index_job # noqa: F401
35 assert callable(process_mpack_index_job)
36
37
38 # ---------------------------------------------------------------------------
39 # PIJ-2 worker dispatch handles mpack.index
40 # ---------------------------------------------------------------------------
41
42 def test_PIJ2_worker_handles_mpack_index_job_type() -> None:
43 """Worker must have an elif branch for mpack.index — not fall through to get_provider."""
44 import inspect
45 import musehub.worker as w
46 src = inspect.getsource(w)
47 assert 'mpack.index' in src, (
48 "Worker must have an explicit handler for mpack.index job_type"
49 )
50
51
52 # ---------------------------------------------------------------------------
53 # PIJ-3 missing mpack_key raises
54 # ---------------------------------------------------------------------------
55
56 @pytest.mark.asyncio
57 async def test_PIJ3_missing_mpack_key_raises(db_session: AsyncSession) -> None:
58 from musehub.services.musehub_wire_push import process_mpack_index_job
59
60 now = datetime.datetime.now(tz=datetime.timezone.utc)
61 owner_id = compute_identity_id(b"gabriel")
62 repo_id = compute_repo_id(owner_id, "pij-test", "code", now.isoformat())
63 db_session.add(MusehubRepo(
64 repo_id=repo_id, name="pij-test", owner="gabriel", slug="pij-test",
65 visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now,
66 ))
67 await db_session.flush()
68 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
69 db_session.add(MusehubBackgroundJob(
70 job_id=job_id, repo_id=repo_id, job_type="mpack.index",
71 payload={}, # missing mpack_key
72 status="pending", created_at=now, attempt=0,
73 ))
74 await db_session.flush()
75
76 with pytest.raises(ValueError, match="mpack_key"):
77 await process_mpack_index_job(db_session, job_id)
78
79
80 # ---------------------------------------------------------------------------
81 # PIJ-4 + PIJ-5 byte_offset populated after job runs
82 # ---------------------------------------------------------------------------
83
84 @pytest.mark.asyncio
85 async def test_PIJ4_byte_offset_populated_after_job(
86 db_session: AsyncSession,
87 ) -> None:
88 """After process_mpack_index_job runs, MusehubMPackIndex rows have byte_offset set."""
89 from muse.core.mpack import build_wire_mpack
90 from musehub.services.musehub_wire_push import process_mpack_index_job
91 from musehub.storage.backends import get_backend
92 from unittest.mock import AsyncMock, MagicMock, patch
93
94 # Build a real wire mpack with one blob
95 content = b"hello byte range world"
96 oid = blob_id(content)
97 wire = build_wire_mpack({
98 "blobs": [{"object_id": oid, "content": content}],
99 "commits": [], "snapshots": [],
100 })
101 mpack_key = blob_id(wire)
102
103 now = datetime.datetime.now(tz=datetime.timezone.utc)
104 owner_id = compute_identity_id(b"gabriel")
105 repo_id = compute_repo_id(owner_id, f"pij-byte-{now.timestamp()}", "code", now.isoformat())
106 db_session.add(MusehubRepo(
107 repo_id=repo_id, name="pij-byte", owner="gabriel", slug="pij-byte",
108 visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now,
109 ))
110 await db_session.flush()
111
112 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
113 db_session.add(MusehubBackgroundJob(
114 job_id=job_id, repo_id=repo_id, job_type="mpack.index",
115 payload={"mpack_key": mpack_key, "head": "", "branch": "main"},
116 status="pending", created_at=now, attempt=0,
117 ))
118 await db_session.flush()
119
120 # Mock backend to return our wire bytes
121 mock_backend = MagicMock()
122 mock_backend.get_mpack = AsyncMock(return_value=wire)
123 mock_backend.put = AsyncMock(return_value=f"s3://test/{oid}")
124
125 with patch("musehub.storage.backends.get_backend", return_value=mock_backend):
126 result = await process_mpack_index_job(db_session, job_id)
127
128 assert result.get("mpack_index_written", 0) > 0, (
129 "process_mpack_index_job must write MusehubMPackIndex rows"
130 )
131
132 idx_rows = (await db_session.execute(
133 select(MusehubMPackIndex).where(
134 MusehubMPackIndex.mpack_id == mpack_key,
135 MusehubMPackIndex.entity_type == "object",
136 )
137 )).scalars().all()
138
139 assert idx_rows, "MusehubMPackIndex must have rows for the mpack"
140
141 rows_with_offset = [r for r in idx_rows if r.byte_offset is not None]
142 assert rows_with_offset, (
143 "MusehubMPackIndex rows must have byte_offset populated after process_mpack_index_job. "
144 "Without byte_offset, every blob fetch downloads the entire mpack."
145 )
146
147
148 @pytest.mark.asyncio
149 async def test_PIJ5_byte_range_locates_correct_bytes(
150 db_session: AsyncSession,
151 ) -> None:
152 """byte_offset + byte_length must locate the exact blob bytes within the mpack."""
153 from muse.core.mpack import build_wire_mpack
154 from musehub.services.musehub_wire_push import process_mpack_index_job
155 from unittest.mock import AsyncMock, MagicMock, patch
156
157 content = b"exact bytes for range test " * 10
158 oid = blob_id(content)
159 wire = build_wire_mpack({
160 "blobs": [{"object_id": oid, "content": content}],
161 "commits": [], "snapshots": [],
162 })
163 mpack_key = blob_id(wire)
164
165 now = datetime.datetime.now(tz=datetime.timezone.utc)
166 owner_id = compute_identity_id(b"gabriel")
167 repo_id = compute_repo_id(owner_id, f"pij-range-{now.timestamp()}", "code", now.isoformat())
168 db_session.add(MusehubRepo(
169 repo_id=repo_id, name="pij-range", owner="gabriel", slug="pij-range",
170 visibility="public", owner_user_id=owner_id, created_at=now, updated_at=now,
171 ))
172 await db_session.flush()
173
174 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
175 db_session.add(MusehubBackgroundJob(
176 job_id=job_id, repo_id=repo_id, job_type="mpack.index",
177 payload={"mpack_key": mpack_key, "head": "", "branch": "main"},
178 status="pending", created_at=now, attempt=0,
179 ))
180 await db_session.flush()
181
182 mock_backend = MagicMock()
183 mock_backend.get_mpack = AsyncMock(return_value=wire)
184 mock_backend.put = AsyncMock(return_value=f"s3://test/{oid}")
185
186 with patch("musehub.storage.backends.get_backend", return_value=mock_backend):
187 await process_mpack_index_job(db_session, job_id)
188
189 row = (await db_session.execute(
190 select(MusehubMPackIndex).where(
191 MusehubMPackIndex.entity_id == oid,
192 MusehubMPackIndex.mpack_id == mpack_key,
193 )
194 )).scalar_one_or_none()
195
196 assert row is not None, f"No MPackIndex row for oid={oid[:16]}"
197 assert row.byte_offset is not None, "byte_offset must be set"
198 assert row.byte_length is not None, "byte_length must be set"
199
200 # The byte range must locate the raw blob content within the wire mpack
201 extracted = wire[row.byte_offset: row.byte_offset + row.byte_length]
202 assert content in extracted or extracted == content, (
203 f"byte_range[{row.byte_offset}:{row.byte_offset+row.byte_length}] "
204 f"does not contain blob content. extracted={extracted[:20]!r}"
205 )
File History 1 commit
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago