test_mpack_phase2.py
file-level
1
files
1
commits
0
hotspots
0
π§ dead
0
π₯ blast risk
| 1 | """TDD β Phase 2: remove per-object MinIO writes from background job (issue #69). |
| 2 | |
| 3 | After Phase 1 the fetch path reads objects from the covering mpack. |
| 4 | Phase 2 removes the bottleneck: the background job's parallel MinIO PUTs |
| 5 | that took ~25 s of the ~40 s XL job latency. |
| 6 | |
| 7 | After this change: |
| 8 | - process_mpack_index_job must NOT call backend.put(oid, data) for any object |
| 9 | - MusehubObject rows must have storage_uri='mpack://{mpack_key}' (not a MinIO URI) |
| 10 | - MPackIndex rows must exist for all objects (required by Phase 1 fetch) |
| 11 | - wire_fetch_mpack must still serve correct data via the mpack path |
| 12 | |
| 13 | Test IDs: |
| 14 | P2-1 process_mpack_index_job makes zero per-object backend.put() calls |
| 15 | P2-2 MusehubObject.storage_uri set to mpack URI after background job |
| 16 | P2-3 full round-trip: push-style mpack β bg job β fetch mpack β correct objects served |
| 17 | """ |
| 18 | from __future__ import annotations |
| 19 | |
| 20 | import hashlib |
| 21 | from collections.abc import Mapping |
| 22 | from datetime import datetime, timezone |
| 23 | from unittest.mock import AsyncMock, MagicMock, call |
| 24 | |
| 25 | import msgpack |
| 26 | import pytest |
| 27 | import zstandard |
| 28 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 29 | from sqlalchemy.ext.asyncio import AsyncSession |
| 30 | from sqlalchemy import select |
| 31 | |
| 32 | from muse.core.types import blob_id, fake_id |
| 33 | from musehub.db import musehub_repo_models as db |
| 34 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 35 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 36 | try: |
| 37 | from musehub.services.musehub_wire import process_mpack_index_job |
| 38 | _PROCESS_JOB_MISSING = False |
| 39 | except ImportError: |
| 40 | process_mpack_index_job = None # type: ignore[assignment] |
| 41 | _PROCESS_JOB_MISSING = True |
| 42 | from tests.factories import create_repo |
| 43 | |
| 44 | |
| 45 | def _now() -> datetime: |
| 46 | return datetime.now(tz=timezone.utc) |
| 47 | |
| 48 | |
| 49 | def _mpack_id(raw: bytes) -> str: |
| 50 | return "sha256:" + hashlib.sha256(raw).hexdigest() |
| 51 | |
| 52 | |
| 53 | def _build_push_mpack(objects: Mapping[str, bytes]) -> bytes: |
| 54 | """Build a realistic push mpack with zstd-compressed objects.""" |
| 55 | cctx = zstandard.ZstdCompressor() |
| 56 | entries = [ |
| 57 | {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)} |
| 58 | for oid, data in objects.items() |
| 59 | ] |
| 60 | return msgpack.packb( |
| 61 | {"commits": [], "snapshots": [], "blobs": entries, "branch_heads": {}}, |
| 62 | use_bin_type=True, |
| 63 | ) |
| 64 | |
| 65 | |
| 66 | class _FakeBackend: |
| 67 | """Records all put() calls so tests can assert they were NOT made.""" |
| 68 | |
| 69 | def __init__(self, mpack_store: Mapping[str, bytes]) -> None: |
| 70 | self._mpacks = mpack_store |
| 71 | self._objects: dict[str, bytes] = {} |
| 72 | self.put_calls: list[str] = [] |
| 73 | |
| 74 | async def put(self, oid: str, data: bytes) -> str: |
| 75 | self.put_calls.append(oid) |
| 76 | self._objects[oid] = data |
| 77 | return f"mem://{oid}" |
| 78 | |
| 79 | async def get(self, oid: str) -> bytes | None: |
| 80 | return self._objects.get(oid) |
| 81 | |
| 82 | async def get_mpack(self, mpack_id: str) -> bytes | None: |
| 83 | return self._mpacks.get(mpack_id) |
| 84 | |
| 85 | async def put_mpack(self, mpack_id: str, data: bytes) -> None: |
| 86 | self._mpacks[mpack_id] = data |
| 87 | |
| 88 | async def exists(self, oid: str) -> bool: |
| 89 | return oid in self._objects |
| 90 | |
| 91 | async def delete(self, oid: str) -> None: |
| 92 | self._objects.pop(oid, None) |
| 93 | |
| 94 | async def presign_get(self, oid: str, ttl: int) -> str: |
| 95 | return f"https://minio.test/{oid}" |
| 96 | |
| 97 | async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str: |
| 98 | return f"https://minio.test/mpacks/{mpack_id}" |
| 99 | |
| 100 | async def quarantine_mpack(self, mpack_key: str) -> None: |
| 101 | pass |
| 102 | |
| 103 | def uri_for(self, oid: str) -> str: |
| 104 | return f"mem://{oid}" |
| 105 | |
| 106 | supports_presign: bool = True |
| 107 | |
| 108 | |
| 109 | async def _make_job( |
| 110 | session: AsyncSession, |
| 111 | repo_id: str, |
| 112 | mpack_key: str, |
| 113 | n_objects: int, |
| 114 | ) -> str: |
| 115 | """Insert a mpack.index background job row and return its job_id.""" |
| 116 | from musehub.core.genesis import compute_job_id |
| 117 | now = datetime.now(tz=timezone.utc) |
| 118 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 119 | session.add(MusehubBackgroundJob( |
| 120 | job_id=job_id, |
| 121 | repo_id=repo_id, |
| 122 | job_type="mpack.index", |
| 123 | payload={ |
| 124 | "mpack_key": mpack_key, |
| 125 | "branch": "main", |
| 126 | "head": "", |
| 127 | "pusher_id": "gabriel", |
| 128 | "declared_objects_count": n_objects, |
| 129 | "declared_commits_count": 0, |
| 130 | }, |
| 131 | status="pending", |
| 132 | created_at=now, |
| 133 | attempt=0, |
| 134 | )) |
| 135 | await session.commit() |
| 136 | return job_id |
| 137 | |
| 138 | |
| 139 | # ββ P2-1: no per-object backend.put() calls βββββββββββββββββββββββββββββββββββ |
| 140 | |
| 141 | @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") |
| 142 | @pytest.mark.asyncio |
| 143 | async def test_p2_1_no_per_object_put_calls( |
| 144 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 145 | ) -> None: |
| 146 | """process_mpack_index_job must make zero per-object backend.put() calls. |
| 147 | |
| 148 | After Phase 2, objects are served from the covering mpack. Writing |
| 149 | per-object MinIO keys is the bottleneck (~25s of ~40s XL job). Removing |
| 150 | them eliminates the wait without breaking fetch (Phase 1 serves from mpack). |
| 151 | """ |
| 152 | raw_a = b"object content alpha" |
| 153 | raw_b = b"object content beta" |
| 154 | oid_a = blob_id(raw_a) |
| 155 | oid_b = blob_id(raw_b) |
| 156 | |
| 157 | mpack_bytes = _build_push_mpack({oid_a: raw_a, oid_b: raw_b}) |
| 158 | mpack_key = _mpack_id(mpack_bytes) |
| 159 | |
| 160 | mpack_store: dict[str, bytes] = {mpack_key: mpack_bytes} |
| 161 | backend = _FakeBackend(mpack_store) |
| 162 | monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend) |
| 163 | monkeypatch.setattr("musehub.storage.get_backend", lambda: backend) |
| 164 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 165 | |
| 166 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 167 | job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=2) |
| 168 | |
| 169 | await process_mpack_index_job(db_session, job_id) |
| 170 | await db_session.commit() |
| 171 | |
| 172 | # No per-object puts at all β Phase 2 removes them. |
| 173 | assert backend.put_calls == [], ( |
| 174 | f"process_mpack_index_job called backend.put() for objects: {backend.put_calls}. " |
| 175 | f"Phase 2 must not write per-object MinIO keys." |
| 176 | ) |
| 177 | |
| 178 | |
| 179 | # ββ P2-2: storage_uri set to mpack URI ββββββββββββββββββββββββββββββββββββββββ |
| 180 | |
| 181 | @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") |
| 182 | @pytest.mark.asyncio |
| 183 | async def test_p2_2_storage_uri_is_mpack_uri( |
| 184 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 185 | ) -> None: |
| 186 | """After process_mpack_index_job, MusehubObject.storage_uri must be mpack://... URI. |
| 187 | |
| 188 | Previously it was set to backend.uri_for(oid) (a MinIO object URI) after |
| 189 | the per-object PUT. After Phase 2 there is no PUT, so the URI must reflect |
| 190 | the actual storage location: the covering mpack. |
| 191 | """ |
| 192 | raw = b"phase2 storage uri test" |
| 193 | oid = blob_id(raw) |
| 194 | |
| 195 | mpack_bytes = _build_push_mpack({oid: raw}) |
| 196 | mpack_key = _mpack_id(mpack_bytes) |
| 197 | |
| 198 | backend = _FakeBackend({mpack_key: mpack_bytes}) |
| 199 | monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend) |
| 200 | monkeypatch.setattr("musehub.storage.get_backend", lambda: backend) |
| 201 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 202 | |
| 203 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 204 | job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1) |
| 205 | |
| 206 | await process_mpack_index_job(db_session, job_id) |
| 207 | await db_session.commit() |
| 208 | |
| 209 | row = await db_session.get(db.MusehubObject, oid) |
| 210 | assert row is not None, f"MusehubObject row missing for {oid[:20]}" |
| 211 | assert row.storage_uri.startswith("mpack://"), ( |
| 212 | f"storage_uri should be mpack://... but got {row.storage_uri!r}. " |
| 213 | f"Per-object MinIO key no longer exists after Phase 2." |
| 214 | ) |
| 215 | assert mpack_key in row.storage_uri, ( |
| 216 | f"storage_uri {row.storage_uri!r} does not reference mpack_key {mpack_key[:20]}" |
| 217 | ) |
| 218 | |
| 219 | |
| 220 | # ββ P2-3: full round-trip β push β bg job β fetch β correct objects βββββββββββ |
| 221 | |
| 222 | @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") |
| 223 | @pytest.mark.asyncio |
| 224 | async def test_p2_3_full_roundtrip_no_per_object_keys( |
| 225 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 226 | ) -> None: |
| 227 | """Full round-trip: mpack-in β background job (no MinIO PUT) β wire_fetch_mpack returns correct bytes. |
| 228 | |
| 229 | This is the end-to-end Phase 2 test: push mpack stored in MinIO, |
| 230 | background job indexes without per-object puts, fetch serves from mpack. |
| 231 | """ |
| 232 | raw_content = b"roundtrip content for phase2" |
| 233 | oid = blob_id(raw_content) |
| 234 | |
| 235 | mpack_bytes = _build_push_mpack({oid: raw_content}) |
| 236 | mpack_key = _mpack_id(mpack_bytes) |
| 237 | |
| 238 | backend = _FakeBackend({mpack_key: mpack_bytes}) |
| 239 | monkeypatch.setattr("musehub.storage.backends.get_backend", lambda: backend) |
| 240 | monkeypatch.setattr("musehub.storage.get_backend", lambda: backend) |
| 241 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 242 | |
| 243 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 244 | job_id = await _make_job(db_session, repo.repo_id, mpack_key, n_objects=1) |
| 245 | |
| 246 | await process_mpack_index_job(db_session, job_id) |
| 247 | await db_session.commit() |
| 248 | |
| 249 | # Confirm no per-object MinIO key was written. |
| 250 | assert backend.put_calls == [], "Phase 2: no per-object puts expected" |
| 251 | assert oid not in backend._objects, "per-object key must not exist after Phase 2 job" |
| 252 | |
| 253 | # Now set up a commit/snapshot referencing this object so wire_fetch_mpack |
| 254 | # has something to serve. |
| 255 | snap_id = fake_id("snap-p2-roundtrip") |
| 256 | snap = db.MusehubSnapshot( |
| 257 | snapshot_id=snap_id, |
| 258 | manifest_blob=msgpack.packb({"file.txt": oid}, use_bin_type=True), |
| 259 | directories=[], |
| 260 | entry_count=1, |
| 261 | created_at=_now(), |
| 262 | ) |
| 263 | db_session.add(snap) |
| 264 | commit_id = fake_id("commit-p2-roundtrip") |
| 265 | commit = db.MusehubCommit( |
| 266 | commit_id=commit_id, |
| 267 | branch="main", |
| 268 | parent_ids=[], |
| 269 | message="p2 roundtrip", |
| 270 | author="gabriel", |
| 271 | timestamp=_now(), |
| 272 | snapshot_id=snap_id, |
| 273 | ) |
| 274 | db_session.add(commit) |
| 275 | await db_session.execute( |
| 276 | pg_insert(db.MusehubCommitGraph) |
| 277 | .values(commit_id=commit_id, parent_ids=[], generation=0, snapshot_id=snap_id) |
| 278 | .on_conflict_do_nothing() |
| 279 | ) |
| 280 | await db_session.execute( |
| 281 | pg_insert(db.MusehubCommitRef) |
| 282 | .values(repo_id=repo.repo_id, commit_id=commit_id) |
| 283 | .on_conflict_do_nothing() |
| 284 | ) |
| 285 | await db_session.commit() |
| 286 | |
| 287 | # wire_fetch_mpack must serve the object from the mpack, not per-object GET. |
| 288 | result = await wire_fetch_mpack( |
| 289 | db_session, repo.repo_id, want=[commit_id], have=[] |
| 290 | ) |
| 291 | |
| 292 | assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data" |
| 293 | assert result.get("blob_count", result.get("object_count", 0)) == 1 |
| 294 | |
| 295 | # Verify the assembled fetch mpack contains the correct bytes. |
| 296 | fetch_mpack_id = result["mpack_id"] |
| 297 | fetch_raw = backend._mpacks.get(fetch_mpack_id) |
| 298 | assert fetch_raw is not None, "assembled fetch mpack not found in backend" |
| 299 | if fetch_raw[:4] == b"MUSE": |
| 300 | from muse.core.mpack import parse_wire_mpack as _parse_wm |
| 301 | payload = _parse_wm(fetch_raw) |
| 302 | else: |
| 303 | payload = msgpack.unpackb(fetch_raw, raw=False) |
| 304 | obj_map = {o["object_id"]: bytes(o["content"]) for o in (payload.get("blobs") or payload.get("objects") or [])} |
| 305 | |
| 306 | assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack" |
| 307 | assert obj_map[oid] == raw_content, ( |
| 308 | "object content from Phase 2 mpack-only path must match original bytes" |
| 309 | ) |
| 310 | |
| 311 | # Per-object GET must not have been called β the mpack path served it. |
| 312 | assert oid not in backend.put_calls |