gabriel / musehub public
test_mpack_phase2.py python
306 lines 11.2 KB
Raw
sha256:009b5a222314f47640a58d75ce5a1f428f1624cf0b51384dfcdfbdfab3cc42a4 feat: migration idempotency, file attribution DAG walk, mpa… Sonnet 4.6 minor ⚠ breaking 15 days ago
1 """TDD — Phase 2: remove per-object MinIO writes from background job (issue #69).
2
3 After Phase 1 the fetch path reads objects from the covering mpack.
4 Phase 2 removes the bottleneck: the background job's parallel MinIO PUTs
5 that took ~25 s of the ~40 s XL job latency.
6
7 After this change:
8 - process_mpack_index_job must NOT call backend.put(oid, data) for any object
9 - MusehubObject rows must have storage_uri='mpack://{mpack_key}' (not a MinIO URI)
10 - MPackIndex rows must exist for all objects (required by Phase 1 fetch)
11 - wire_fetch_mpack must still serve correct data via the mpack path
12
13 Test IDs:
14 P2-1 process_mpack_index_job makes zero per-object backend.put() calls
15 P2-2 MusehubObject.storage_uri set to mpack URI after background job
16 P2-3 full round-trip: push-style mpack → bg job → fetch mpack → correct objects served
17 """
18 from __future__ import annotations
19
20 import hashlib
21 from collections.abc import Mapping
22 from datetime import datetime, timezone
23 from unittest.mock import AsyncMock, MagicMock, call
24
25 import msgpack
26 import pytest
27 import zstandard
28 from sqlalchemy.dialects.postgresql import insert as pg_insert
29 from sqlalchemy.ext.asyncio import AsyncSession
30 from sqlalchemy import select
31
32 from muse.core.types import blob_id, fake_id
33 from musehub.db import musehub_repo_models as db
34 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
35 from musehub.services.musehub_wire import wire_fetch_mpack
36 try:
37 from musehub.services.musehub_wire import process_mpack_index_job
38 _PROCESS_JOB_MISSING = False
39 except ImportError:
40 process_mpack_index_job = None # type: ignore[assignment]
41 _PROCESS_JOB_MISSING = True
42 from tests.factories import create_repo
43
44
45 def _now() -> datetime:
46 return datetime.now(tz=timezone.utc)
47
48
49 def _mpack_id(raw: bytes) -> str:
50 return "sha256:" + hashlib.sha256(raw).hexdigest()
51
52
53 def _build_push_mpack(objects: Mapping[str, bytes]) -> bytes:
54 """Build a realistic push mpack with zstd-compressed objects."""
55 cctx = zstandard.ZstdCompressor()
56 entries = [
57 {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)}
58 for oid, data in objects.items()
59 ]
60 return msgpack.packb(
61 {"commits": [], "snapshots": [], "objects": entries, "branch_heads": {}},
62 use_bin_type=True,
63 )
64
65
66 class _FakeBackend:
67 """Records all put() calls so tests can assert they were NOT made."""
68
69 def __init__(self, mpack_store: Mapping[str, bytes]) -> None:
70 self._mpacks = mpack_store
71 self._objects: dict[str, bytes] = {}
72 self.put_calls: list[str] = []
73
74 async def put(self, oid: str, data: bytes) -> str:
75 self.put_calls.append(oid)
76 self._objects[oid] = data
77 return f"mem://{oid}"
78
79 async def get(self, oid: str) -> bytes | None:
80 return self._objects.get(oid)
81
82 async def get_mpack(self, mpack_id: str) -> bytes | None:
83 return self._mpacks.get(mpack_id)
84
85 async def put_mpack(self, mpack_id: str, data: bytes) -> None:
86 self._mpacks[mpack_id] = data
87
88 async def exists(self, oid: str) -> bool:
89 return oid in self._objects
90
91 async def delete(self, oid: str) -> None:
92 self._objects.pop(oid, None)
93
94 async def presign_get(self, oid: str, ttl: int) -> str:
95 return f"https://minio.test/{oid}"
96
97 async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str:
98 return f"https://minio.test/mpacks/{mpack_id}"
99
100 async def quarantine_mpack(self, mpack_key: str) -> None:
101 pass
102
103 def uri_for(self, oid: str) -> str:
104 return f"mem://{oid}"
105
106 supports_presign: bool = True
107
108
109 async def _make_job(
110 session: AsyncSession,
111 repo_id: str,
112 mpack_key: str,
113 n_objects: int,
114 ) -> str:
115 """Insert a mpack.index background job row and return its job_id."""
116 from musehub.core.genesis import compute_job_id
117 now = datetime.now(tz=timezone.utc)
118 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
119 session.add(MusehubBackgroundJob(
120 job_id=job_id,
121 repo_id=repo_id,
122 job_type="mpack.index",
123 payload={
124 "mpack_key": mpack_key,
125 "branch": "main",
126 "head": "",
127 "pusher_id": "gabriel",
128 "declared_objects_count": n_objects,
129 "declared_commits_count": 0,
130 },
131 status="pending",
132 created_at=now,
133 attempt=0,
134 ))
135 await session.commit()
136 return job_id
137
138
139 # ── P2-1: no per-object backend.put() calls ───────────────────────────────────
140
141 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
142 @pytest.mark.asyncio
143 async def test_p2_1_no_per_object_put_calls(
144 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
145 ) -> None:
146 """process_mpack_index_job must make zero per-object backend.put() calls.
147
148 After Phase 2, objects are served from the covering mpack. Writing
149 per-object MinIO keys is the bottleneck (~25s of ~40s XL job). Removing
150 them eliminates the wait without breaking fetch (Phase 1 serves from mpack).
151 """
152 raw_a = b"object content alpha"
153 raw_b = b"object content beta"
154 oid_a = blob_id(raw_a)
155 oid_b = blob_id(raw_b)
156
157 mpack_bytes = _build_push_mpack({oid_a: raw_a, oid_b: raw_b})
158 mpack_key = _mpack_id(mpack_bytes)
159
160 mpack_store: dict[str, bytes] = {mpack_key: mpack_bytes}
161 backend = _FakeBackend(mpack_store)
162 monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend)
163
164 repo = await create_repo(db_session, owner="gabriel", visibility="public")
165 job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=2)
166
167 await process_mpack_index_job(db_session, job_id)
168 await db_session.commit()
169
170 # No per-object puts at all — Phase 2 removes them.
171 assert backend.put_calls == [], (
172 f"process_mpack_index_job called backend.put() for objects: {backend.put_calls}. "
173 f"Phase 2 must not write per-object MinIO keys."
174 )
175
176
177 # ── P2-2: storage_uri set to mpack URI ────────────────────────────────────────
178
179 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
180 @pytest.mark.asyncio
181 async def test_p2_2_storage_uri_is_mpack_uri(
182 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
183 ) -> None:
184 """After process_mpack_index_job, MusehubObject.storage_uri must be mpack://... URI.
185
186 Previously it was set to backend.uri_for(oid) (a MinIO object URI) after
187 the per-object PUT. After Phase 2 there is no PUT, so the URI must reflect
188 the actual storage location: the covering mpack.
189 """
190 raw = b"phase2 storage uri test"
191 oid = blob_id(raw)
192
193 mpack_bytes = _build_push_mpack({oid: raw})
194 mpack_key = _mpack_id(mpack_bytes)
195
196 backend = _FakeBackend({mpack_key: mpack_bytes})
197 monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend)
198
199 repo = await create_repo(db_session, owner="gabriel", visibility="public")
200 job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1)
201
202 await process_mpack_index_job(db_session, job_id)
203 await db_session.commit()
204
205 row = await db_session.get(db.MusehubObject, oid)
206 assert row is not None, f"MusehubObject row missing for {oid[:20]}"
207 assert row.storage_uri.startswith("mpack://"), (
208 f"storage_uri should be mpack://... but got {row.storage_uri!r}. "
209 f"Per-object MinIO key no longer exists after Phase 2."
210 )
211 assert mpack_key in row.storage_uri, (
212 f"storage_uri {row.storage_uri!r} does not reference mpack_key {mpack_key[:20]}"
213 )
214
215
216 # ── P2-3: full round-trip — push → bg job → fetch → correct objects ───────────
217
218 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
219 @pytest.mark.asyncio
220 async def test_p2_3_full_roundtrip_no_per_object_keys(
221 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
222 ) -> None:
223 """Full round-trip: mpack-in → background job (no MinIO PUT) → wire_fetch_mpack returns correct bytes.
224
225 This is the end-to-end Phase 2 test: push mpack stored in MinIO,
226 background job indexes without per-object puts, fetch serves from mpack.
227 """
228 raw_content = b"roundtrip content for phase2"
229 oid = blob_id(raw_content)
230
231 mpack_bytes = _build_push_mpack({oid: raw_content})
232 mpack_key = _mpack_id(mpack_bytes)
233
234 backend = _FakeBackend({mpack_key: mpack_bytes})
235 monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend)
236
237 repo = await create_repo(db_session, owner="gabriel", visibility="public")
238 job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1)
239
240 await process_mpack_index_job(db_session, job_id)
241 await db_session.commit()
242
243 # Confirm no per-object MinIO key was written.
244 assert backend.put_calls == [], "Phase 2: no per-object puts expected"
245 assert oid not in backend._objects, "per-object key must not exist after Phase 2 job"
246
247 # Now set up a commit/snapshot referencing this object so wire_fetch_mpack
248 # has something to serve.
249 snap_id = fake_id("snap-p2-roundtrip")
250 snap = db.MusehubSnapshot(
251 snapshot_id=snap_id,
252 manifest_blob=msgpack.packb({"file.txt": oid}, use_bin_type=True),
253 directories=[],
254 entry_count=1,
255 created_at=_now(),
256 )
257 db_session.add(snap)
258 commit_id = fake_id("commit-p2-roundtrip")
259 commit = db.MusehubCommit(
260 commit_id=commit_id,
261 branch="main",
262 parent_ids=[],
263 message="p2 roundtrip",
264 author="gabriel",
265 timestamp=_now(),
266 snapshot_id=snap_id,
267 )
268 db_session.add(commit)
269 await db_session.execute(
270 pg_insert(db.MusehubCommitGraph)
271 .values(commit_id=commit_id, parent_ids=[], generation=0, snapshot_id=snap_id)
272 .on_conflict_do_nothing()
273 )
274 await db_session.execute(
275 pg_insert(db.MusehubCommitRef)
276 .values(repo_id=repo.repo_id, commit_id=commit_id)
277 .on_conflict_do_nothing()
278 )
279 await db_session.commit()
280
281 # wire_fetch_mpack must serve the object from the mpack, not per-object GET.
282 result = await wire_fetch_mpack(
283 db_session, repo.repo_id, want=[commit_id], have=[]
284 )
285
286 assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data"
287 assert result["object_count"] == 1
288
289 # Verify the assembled fetch mpack contains the correct bytes.
290 fetch_mpack_id = result["mpack_id"]
291 fetch_raw = backend._mpacks.get(fetch_mpack_id)
292 assert fetch_raw is not None, "assembled fetch mpack not found in backend"
293 if fetch_raw[:4] == b"MUSE":
294 from muse.core.mpack import parse_wire_mpack as _parse_wm
295 payload = _parse_wm(fetch_raw)
296 else:
297 payload = msgpack.unpackb(fetch_raw, raw=False)
298 obj_map = {o["object_id"]: bytes(o["content"]) for o in payload["objects"]}
299
300 assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack"
301 assert obj_map[oid] == raw_content, (
302 "object content from Phase 2 mpack-only path must match original bytes"
303 )
304
305 # Per-object GET must not have been called — the mpack path served it.
306 assert oid not in backend.put_calls
File History 1 commit
sha256:009b5a222314f47640a58d75ce5a1f428f1624cf0b51384dfcdfbdfab3cc42a4 feat: migration idempotency, file attribution DAG walk, mpa… Sonnet 4.6 minor 15 days ago