gabriel / musehub public
test_mpack_phase2.py python
312 lines 11.7 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 2 days ago
1 """TDD — Phase 2: remove per-object MinIO writes from background job (issue #69).
2
3 After Phase 1 the fetch path reads objects from the covering mpack.
4 Phase 2 removes the bottleneck: the background job's parallel MinIO PUTs
5 that took ~25 s of the ~40 s XL job latency.
6
7 After this change:
8 - process_mpack_index_job must NOT call backend.put(oid, data) for any object
9 - MusehubObject rows must have storage_uri='mpack://{mpack_key}' (not a MinIO URI)
10 - MPackIndex rows must exist for all objects (required by Phase 1 fetch)
11 - wire_fetch_mpack must still serve correct data via the mpack path
12
13 Test IDs:
14 P2-1 process_mpack_index_job makes zero per-object backend.put() calls
15 P2-2 MusehubObject.storage_uri set to mpack URI after background job
16 P2-3 full round-trip: push-style mpack → bg job → fetch mpack → correct objects served
17 """
18 from __future__ import annotations
19
20 import hashlib
21 from collections.abc import Mapping
22 from datetime import datetime, timezone
23 from unittest.mock import AsyncMock, MagicMock, call
24
25 import msgpack
26 import pytest
27 import zstandard
28 from sqlalchemy.dialects.postgresql import insert as pg_insert
29 from sqlalchemy.ext.asyncio import AsyncSession
30 from sqlalchemy import select
31
32 from muse.core.types import blob_id, fake_id
33 from musehub.db import musehub_repo_models as db
34 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
35 from musehub.services.musehub_wire import wire_fetch_mpack
36 try:
37 from musehub.services.musehub_wire import process_mpack_index_job
38 _PROCESS_JOB_MISSING = False
39 except ImportError:
40 process_mpack_index_job = None # type: ignore[assignment]
41 _PROCESS_JOB_MISSING = True
42 from tests.factories import create_repo
43
44
45 def _now() -> datetime:
46 return datetime.now(tz=timezone.utc)
47
48
49 def _mpack_id(raw: bytes) -> str:
50 return "sha256:" + hashlib.sha256(raw).hexdigest()
51
52
53 def _build_push_mpack(objects: Mapping[str, bytes]) -> bytes:
54 """Build a realistic push mpack with zstd-compressed objects."""
55 cctx = zstandard.ZstdCompressor()
56 entries = [
57 {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)}
58 for oid, data in objects.items()
59 ]
60 return msgpack.packb(
61 {"commits": [], "snapshots": [], "blobs": entries, "branch_heads": {}},
62 use_bin_type=True,
63 )
64
65
66 class _FakeBackend:
67 """Records all put() calls so tests can assert they were NOT made."""
68
69 def __init__(self, mpack_store: Mapping[str, bytes]) -> None:
70 self._mpacks = mpack_store
71 self._objects: dict[str, bytes] = {}
72 self.put_calls: list[str] = []
73
74 async def put(self, oid: str, data: bytes) -> str:
75 self.put_calls.append(oid)
76 self._objects[oid] = data
77 return f"mem://{oid}"
78
79 async def get(self, oid: str) -> bytes | None:
80 return self._objects.get(oid)
81
82 async def get_mpack(self, mpack_id: str) -> bytes | None:
83 return self._mpacks.get(mpack_id)
84
85 async def put_mpack(self, mpack_id: str, data: bytes) -> None:
86 self._mpacks[mpack_id] = data
87
88 async def exists(self, oid: str) -> bool:
89 return oid in self._objects
90
91 async def delete(self, oid: str) -> None:
92 self._objects.pop(oid, None)
93
94 async def presign_get(self, oid: str, ttl: int) -> str:
95 return f"https://minio.test/{oid}"
96
97 async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str:
98 return f"https://minio.test/mpacks/{mpack_id}"
99
100 async def quarantine_mpack(self, mpack_key: str) -> None:
101 pass
102
103 def uri_for(self, oid: str) -> str:
104 return f"mem://{oid}"
105
106 supports_presign: bool = True
107
108
109 async def _make_job(
110 session: AsyncSession,
111 repo_id: str,
112 mpack_key: str,
113 n_objects: int,
114 ) -> str:
115 """Insert a mpack.index background job row and return its job_id."""
116 from musehub.core.genesis import compute_job_id
117 now = datetime.now(tz=timezone.utc)
118 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
119 session.add(MusehubBackgroundJob(
120 job_id=job_id,
121 repo_id=repo_id,
122 job_type="mpack.index",
123 payload={
124 "mpack_key": mpack_key,
125 "branch": "main",
126 "head": "",
127 "pusher_id": "gabriel",
128 "declared_objects_count": n_objects,
129 "declared_commits_count": 0,
130 },
131 status="pending",
132 created_at=now,
133 attempt=0,
134 ))
135 await session.commit()
136 return job_id
137
138
139 # ── P2-1: no per-object backend.put() calls ───────────────────────────────────
140
141 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
142 @pytest.mark.asyncio
143 async def test_p2_1_no_per_object_put_calls(
144 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
145 ) -> None:
146 """process_mpack_index_job must make zero per-object backend.put() calls.
147
148 After Phase 2, objects are served from the covering mpack. Writing
149 per-object MinIO keys is the bottleneck (~25s of ~40s XL job). Removing
150 them eliminates the wait without breaking fetch (Phase 1 serves from mpack).
151 """
152 raw_a = b"object content alpha"
153 raw_b = b"object content beta"
154 oid_a = blob_id(raw_a)
155 oid_b = blob_id(raw_b)
156
157 mpack_bytes = _build_push_mpack({oid_a: raw_a, oid_b: raw_b})
158 mpack_key = _mpack_id(mpack_bytes)
159
160 mpack_store: dict[str, bytes] = {mpack_key: mpack_bytes}
161 backend = _FakeBackend(mpack_store)
162 monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend)
163 monkeypatch.setattr("musehub.storage.get_backend", lambda: backend)
164 monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend)
165
166 repo = await create_repo(db_session, owner="gabriel", visibility="public")
167 job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=2)
168
169 await process_mpack_index_job(db_session, job_id)
170 await db_session.commit()
171
172 # No per-object puts at all — Phase 2 removes them.
173 assert backend.put_calls == [], (
174 f"process_mpack_index_job called backend.put() for objects: {backend.put_calls}. "
175 f"Phase 2 must not write per-object MinIO keys."
176 )
177
178
179 # ── P2-2: storage_uri set to mpack URI ────────────────────────────────────────
180
181 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
182 @pytest.mark.asyncio
183 async def test_p2_2_storage_uri_is_mpack_uri(
184 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
185 ) -> None:
186 """After process_mpack_index_job, MusehubObject.storage_uri must be mpack://... URI.
187
188 Previously it was set to backend.uri_for(oid) (a MinIO object URI) after
189 the per-object PUT. After Phase 2 there is no PUT, so the URI must reflect
190 the actual storage location: the covering mpack.
191 """
192 raw = b"phase2 storage uri test"
193 oid = blob_id(raw)
194
195 mpack_bytes = _build_push_mpack({oid: raw})
196 mpack_key = _mpack_id(mpack_bytes)
197
198 backend = _FakeBackend({mpack_key: mpack_bytes})
199 monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend)
200 monkeypatch.setattr("musehub.storage.get_backend", lambda: backend)
201 monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend)
202
203 repo = await create_repo(db_session, owner="gabriel", visibility="public")
204 job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1)
205
206 await process_mpack_index_job(db_session, job_id)
207 await db_session.commit()
208
209 row = await db_session.get(db.MusehubObject, oid)
210 assert row is not None, f"MusehubObject row missing for {oid[:20]}"
211 assert row.storage_uri.startswith("mpack://"), (
212 f"storage_uri should be mpack://... but got {row.storage_uri!r}. "
213 f"Per-object MinIO key no longer exists after Phase 2."
214 )
215 assert mpack_key in row.storage_uri, (
216 f"storage_uri {row.storage_uri!r} does not reference mpack_key {mpack_key[:20]}"
217 )
218
219
220 # ── P2-3: full round-trip — push → bg job → fetch → correct objects ───────────
221
222 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
223 @pytest.mark.asyncio
224 async def test_p2_3_full_roundtrip_no_per_object_keys(
225 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
226 ) -> None:
227 """Full round-trip: mpack-in → background job (no MinIO PUT) → wire_fetch_mpack returns correct bytes.
228
229 This is the end-to-end Phase 2 test: push mpack stored in MinIO,
230 background job indexes without per-object puts, fetch serves from mpack.
231 """
232 raw_content = b"roundtrip content for phase2"
233 oid = blob_id(raw_content)
234
235 mpack_bytes = _build_push_mpack({oid: raw_content})
236 mpack_key = _mpack_id(mpack_bytes)
237
238 backend = _FakeBackend({mpack_key: mpack_bytes})
239 monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend)
240 monkeypatch.setattr("musehub.storage.get_backend", lambda: backend)
241 monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend)
242
243 repo = await create_repo(db_session, owner="gabriel", visibility="public")
244 job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1)
245
246 await process_mpack_index_job(db_session, job_id)
247 await db_session.commit()
248
249 # Confirm no per-object MinIO key was written.
250 assert backend.put_calls == [], "Phase 2: no per-object puts expected"
251 assert oid not in backend._objects, "per-object key must not exist after Phase 2 job"
252
253 # Now set up a commit/snapshot referencing this object so wire_fetch_mpack
254 # has something to serve.
255 snap_id = fake_id("snap-p2-roundtrip")
256 snap = db.MusehubSnapshot(
257 snapshot_id=snap_id,
258 manifest_blob=msgpack.packb({"file.txt": oid}, use_bin_type=True),
259 directories=[],
260 entry_count=1,
261 created_at=_now(),
262 )
263 db_session.add(snap)
264 commit_id = fake_id("commit-p2-roundtrip")
265 commit = db.MusehubCommit(
266 commit_id=commit_id,
267 branch="main",
268 parent_ids=[],
269 message="p2 roundtrip",
270 author="gabriel",
271 timestamp=_now(),
272 snapshot_id=snap_id,
273 )
274 db_session.add(commit)
275 await db_session.execute(
276 pg_insert(db.MusehubCommitGraph)
277 .values(commit_id=commit_id, parent_ids=[], generation=0, snapshot_id=snap_id)
278 .on_conflict_do_nothing()
279 )
280 await db_session.execute(
281 pg_insert(db.MusehubCommitRef)
282 .values(repo_id=repo.repo_id, commit_id=commit_id)
283 .on_conflict_do_nothing()
284 )
285 await db_session.commit()
286
287 # wire_fetch_mpack must serve the object from the mpack, not per-object GET.
288 result = await wire_fetch_mpack(
289 db_session, repo.repo_id, want=[commit_id], have=[]
290 )
291
292 assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data"
293 assert result.get("blob_count", result.get("object_count", 0)) == 1
294
295 # Verify the assembled fetch mpack contains the correct bytes.
296 fetch_mpack_id = result["mpack_id"]
297 fetch_raw = backend._mpacks.get(fetch_mpack_id)
298 assert fetch_raw is not None, "assembled fetch mpack not found in backend"
299 if fetch_raw[:4] == b"MUSE":
300 from muse.core.mpack import parse_wire_mpack as _parse_wm
301 payload = _parse_wm(fetch_raw)
302 else:
303 payload = msgpack.unpackb(fetch_raw, raw=False)
304 obj_map = {o["object_id"]: bytes(o["content"]) for o in (payload.get("blobs") or payload.get("objects") or [])}
305
306 assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack"
307 assert obj_map[oid] == raw_content, (
308 "object content from Phase 2 mpack-only path must match original bytes"
309 )
310
311 # Per-object GET must not have been called — the mpack path served it.
312 assert oid not in backend.put_calls
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 2 days ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 10 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 13 days ago