test_fetch_mpack_native_phase1.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
20 days ago
| 1 | """TDD — Phase 1: mpack-native object serving in wire_fetch_mpack (issue #69). |
| 2 | |
| 3 | Problem: the current fetch path calls backend.get(oid) once per object. |
| 4 | For 25,000 objects that is 25,000 individual MinIO GETs. This only works |
| 5 | because the push background job writes per-object MinIO keys — a 40s |
| 6 | synchronous cost that creates an availability window after every large push. |
| 7 | |
| 8 | Goal: serve objects by downloading the covering mpack(s) and extracting |
| 9 | in-memory. One mpack GET replaces N object GETs. Per-object keys are |
| 10 | never written; the mpack is the source of truth. |
| 11 | |
| 12 | Test IDs: |
| 13 | FN-1 content_cache hit → no mpack or per-object GET needed |
| 14 | FN-2 object stored only in mpack (no per-object key) → extracted correctly |
| 15 | FN-3 objects across two distinct mpacks → each mpack fetched exactly once |
| 16 | FN-4 object in MPackIndex but mpack is missing from storage → falls back to per-object GET |
| 17 | FN-5 object absent from MPackIndex → FetchNotIndexedError (unchanged gate) |
| 18 | FN-6 mixed: content_cache + mpack + legacy per-object → all three sources merged |
| 19 | """ |
| 20 | from __future__ import annotations |
| 21 | |
| 22 | import hashlib |
| 23 | from collections.abc import Mapping |
| 24 | from datetime import datetime, timezone |
| 25 | from unittest.mock import AsyncMock, call |
| 26 | |
| 27 | import msgpack |
| 28 | import pytest |
| 29 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 30 | from sqlalchemy.ext.asyncio import AsyncSession |
| 31 | |
| 32 | from muse.core.types import blob_id, fake_id |
| 33 | from musehub.db import musehub_repo_models as db |
| 34 | from musehub.services.musehub_wire import FetchNotIndexedError, wire_fetch_mpack |
| 35 | from tests.factories import create_repo |
| 36 | |
| 37 | |
| 38 | # ── fixtures & helpers ──────────────────────────────────────────────────────── |
| 39 | |
| 40 | def _now() -> datetime: |
| 41 | return datetime.now(tz=timezone.utc) |
| 42 | |
| 43 | |
| 44 | def _make_mpack_bytes(objects: Mapping[str, bytes]) -> bytes: |
| 45 | """Encode objects into a wire mpack payload (uncompressed).""" |
| 46 | return msgpack.packb( |
| 47 | { |
| 48 | "commits": [], |
| 49 | "snapshots": [], |
| 50 | "blobs": [{"object_id": oid, "content": data} for oid, data in objects.items()], |
| 51 | "branch_heads": {}, |
| 52 | }, |
| 53 | use_bin_type=True, |
| 54 | ) |
| 55 | |
| 56 | |
| 57 | def _make_mpack_bytes_zstd(objects: Mapping[str, bytes]) -> bytes: |
| 58 | """Encode objects into a wire mpack payload with zstd-compressed content. |
| 59 | |
| 60 | This mimics the real push mpack format: each object entry has |
| 61 | encoding='zstd' and content=compressed_bytes, but object_id is the |
| 62 | sha256 of the *uncompressed* bytes (content-addressed). |
| 63 | """ |
| 64 | import zstandard as _zstd |
| 65 | cctx = _zstd.ZstdCompressor() |
| 66 | entries = [] |
| 67 | for oid, data in objects.items(): |
| 68 | entries.append({ |
| 69 | "object_id": oid, |
| 70 | "encoding": "zstd", |
| 71 | "content": cctx.compress(data), |
| 72 | }) |
| 73 | return msgpack.packb( |
| 74 | { |
| 75 | "commits": [], |
| 76 | "snapshots": [], |
| 77 | "blobs": entries, |
| 78 | "branch_heads": {}, |
| 79 | }, |
| 80 | use_bin_type=True, |
| 81 | ) |
| 82 | |
| 83 | |
| 84 | def _mpack_key(payload: bytes) -> str: |
| 85 | return "sha256:" + hashlib.sha256(payload).hexdigest() |
| 86 | |
| 87 | |
| 88 | class _FakeBackend: |
| 89 | """In-memory backend with separate per-object and mpack namespaces. |
| 90 | |
| 91 | Tracks how many times get() and get_mpack() are called so tests can |
| 92 | assert that the mpack-native path is taken instead of per-object GETs. |
| 93 | """ |
| 94 | |
| 95 | def __init__(self) -> None: |
| 96 | self._objects: dict[str, bytes] = {} |
| 97 | self._mpacks: dict[str, bytes] = {} |
| 98 | self.get_calls: list[str] = [] |
| 99 | self.get_mpack_calls: list[str] = [] |
| 100 | |
| 101 | async def put(self, oid: str, data: bytes) -> str: |
| 102 | self._objects[oid] = data |
| 103 | return f"mem://{oid}" |
| 104 | |
| 105 | async def get(self, oid: str) -> bytes | None: |
| 106 | self.get_calls.append(oid) |
| 107 | return self._objects.get(oid) |
| 108 | |
| 109 | async def get_mpack(self, mpack_id: str) -> bytes | None: |
| 110 | self.get_mpack_calls.append(mpack_id) |
| 111 | return self._mpacks.get(mpack_id) |
| 112 | |
| 113 | async def put_mpack(self, mpack_id: str, data: bytes) -> None: |
| 114 | self._mpacks[mpack_id] = data |
| 115 | |
| 116 | async def exists(self, oid: str) -> bool: |
| 117 | return oid in self._objects |
| 118 | |
| 119 | async def delete(self, oid: str) -> None: |
| 120 | self._objects.pop(oid, None) |
| 121 | |
| 122 | async def presign_get(self, oid: str, ttl: int) -> str: |
| 123 | return f"https://minio.test/{oid}?ttl={ttl}" |
| 124 | |
| 125 | async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str: |
| 126 | return f"https://minio.test/mpacks/{mpack_id}?ttl={ttl}" |
| 127 | |
| 128 | def uri_for(self, oid: str) -> str: |
| 129 | return f"mem://{oid}" |
| 130 | |
| 131 | supports_presign: bool = True |
| 132 | |
| 133 | |
| 134 | async def _make_commit( |
| 135 | session: AsyncSession, |
| 136 | repo_id: str, |
| 137 | *, |
| 138 | manifest: dict[str, str], |
| 139 | seed: str, |
| 140 | parent_ids: list[str] | None = None, |
| 141 | generation: int = 0, |
| 142 | ) -> db.MusehubCommit: |
| 143 | snap_id = fake_id(f"snap-{seed}") |
| 144 | snap = db.MusehubSnapshot( |
| 145 | snapshot_id=snap_id, |
| 146 | directories=[], |
| 147 | manifest_blob=msgpack.packb(manifest, use_bin_type=True), |
| 148 | entry_count=len(manifest), |
| 149 | created_at=_now(), |
| 150 | ) |
| 151 | session.add(snap) |
| 152 | await session.execute( |
| 153 | pg_insert(db.MusehubSnapshotRef) |
| 154 | .values(repo_id=repo_id, snapshot_id=snap_id) |
| 155 | .on_conflict_do_nothing() |
| 156 | ) |
| 157 | commit_id = fake_id(f"commit-{seed}") |
| 158 | commit = db.MusehubCommit( |
| 159 | commit_id=commit_id, |
| 160 | branch="main", |
| 161 | parent_ids=parent_ids or [], |
| 162 | message=f"commit {seed}", |
| 163 | author="gabriel", |
| 164 | timestamp=_now(), |
| 165 | snapshot_id=snap_id, |
| 166 | ) |
| 167 | session.add(commit) |
| 168 | await session.execute( |
| 169 | pg_insert(db.MusehubCommitRef) |
| 170 | .values(repo_id=repo_id, commit_id=commit_id) |
| 171 | .on_conflict_do_nothing() |
| 172 | ) |
| 173 | await session.execute( |
| 174 | pg_insert(db.MusehubCommitGraph) |
| 175 | .values( |
| 176 | commit_id=commit_id, |
| 177 | parent_ids=parent_ids or [], |
| 178 | generation=generation, |
| 179 | snapshot_id=snap_id, |
| 180 | ) |
| 181 | .on_conflict_do_nothing() |
| 182 | ) |
| 183 | await session.commit() |
| 184 | return commit |
| 185 | |
| 186 | |
| 187 | async def _index_object_in_mpack( |
| 188 | session: AsyncSession, |
| 189 | oid: str, |
| 190 | mpack_id: str, |
| 191 | ) -> None: |
| 192 | """Write an MPackIndex row mapping oid → mpack_id (no per-object MinIO key).""" |
| 193 | await session.execute( |
| 194 | pg_insert(db.MusehubObject) |
| 195 | .values( |
| 196 | object_id=oid, |
| 197 | path="file.dat", |
| 198 | size_bytes=8, |
| 199 | storage_uri=f"mpack://{mpack_id}", # mpack URI, not per-object |
| 200 | ) |
| 201 | .on_conflict_do_nothing(index_elements=["object_id"]) |
| 202 | ) |
| 203 | await session.execute( |
| 204 | pg_insert(db.MusehubMPackIndex) |
| 205 | .values(entity_id=oid, mpack_id=mpack_id, entity_type="object", created_at=_now()) |
| 206 | .on_conflict_do_nothing(index_elements=["entity_id", "mpack_id"]) |
| 207 | ) |
| 208 | await session.commit() |
| 209 | |
| 210 | |
| 211 | # ── FN-1: content_cache hit ─────────────────────────────────────────────────── |
| 212 | |
| 213 | @pytest.mark.asyncio |
| 214 | async def test_fn1_content_cache_served_without_mpack_get( |
| 215 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 216 | ) -> None: |
| 217 | """Objects in content_cache are served from DB; no mpack or per-object GET fired.""" |
| 218 | backend = _FakeBackend() |
| 219 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 220 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 221 | |
| 222 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 223 | raw = b"cached content" |
| 224 | oid = blob_id(raw) |
| 225 | |
| 226 | await db_session.execute( |
| 227 | pg_insert(db.MusehubObject) |
| 228 | .values( |
| 229 | object_id=oid, |
| 230 | path="cached.txt", |
| 231 | size_bytes=len(raw), |
| 232 | storage_uri=f"mem://{oid}", |
| 233 | content_cache=raw, |
| 234 | ) |
| 235 | .on_conflict_do_nothing(index_elements=["object_id"]) |
| 236 | ) |
| 237 | await db_session.execute( |
| 238 | pg_insert(db.MusehubObjectRef) |
| 239 | .values(repo_id=repo.repo_id, object_id=oid) |
| 240 | .on_conflict_do_nothing() |
| 241 | ) |
| 242 | # Also add MPackIndex so the coverage check passes |
| 243 | mpack_bytes = _make_mpack_bytes({oid: raw}) |
| 244 | mpack_id = _mpack_key(mpack_bytes) |
| 245 | await backend.put_mpack(mpack_id, mpack_bytes) |
| 246 | await _index_object_in_mpack(db_session, oid, mpack_id) |
| 247 | |
| 248 | commit = await _make_commit( |
| 249 | db_session, repo.repo_id, manifest={"cached.txt": oid}, seed="fn1" |
| 250 | ) |
| 251 | |
| 252 | result = await wire_fetch_mpack( |
| 253 | db_session, repo.repo_id, want=[commit.commit_id], have=[] |
| 254 | ) |
| 255 | |
| 256 | assert result["blob_count"] == 1 |
| 257 | assert backend.get_calls == [], "content_cache hit must not trigger per-object GET" |
| 258 | assert backend.get_mpack_calls == [], "content_cache hit must not trigger mpack GET" |
| 259 | |
| 260 | |
| 261 | # ── FN-2: object stored only in mpack ──────────────────────────────────────── |
| 262 | |
| 263 | @pytest.mark.asyncio |
| 264 | async def test_fn2_object_served_from_mpack_no_per_object_key( |
| 265 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 266 | ) -> None: |
| 267 | """Object exists only in mpack storage (no per-object key) → correctly extracted.""" |
| 268 | backend = _FakeBackend() |
| 269 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 270 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 271 | |
| 272 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 273 | raw = b"mpack-only content" |
| 274 | oid = blob_id(raw) |
| 275 | |
| 276 | mpack_bytes = _make_mpack_bytes({oid: raw}) |
| 277 | mpack_id = _mpack_key(mpack_bytes) |
| 278 | await backend.put_mpack(mpack_id, mpack_bytes) |
| 279 | await _index_object_in_mpack(db_session, oid, mpack_id) |
| 280 | await db_session.execute( |
| 281 | pg_insert(db.MusehubObjectRef) |
| 282 | .values(repo_id=repo.repo_id, object_id=oid) |
| 283 | .on_conflict_do_nothing() |
| 284 | ) |
| 285 | await db_session.commit() |
| 286 | |
| 287 | commit = await _make_commit( |
| 288 | db_session, repo.repo_id, manifest={"f.txt": oid}, seed="fn2" |
| 289 | ) |
| 290 | |
| 291 | result = await wire_fetch_mpack( |
| 292 | db_session, repo.repo_id, want=[commit.commit_id], have=[] |
| 293 | ) |
| 294 | |
| 295 | assert result["blob_count"] == 1 |
| 296 | assert result["mpack_url"] is not None, "should return a presigned mpack URL" |
| 297 | # Per-object key was never written — get() must not have been called |
| 298 | assert backend.get_calls == [], "must not fall back to per-object GET when mpack covers the object" |
| 299 | assert mpack_id in backend.get_mpack_calls, "must fetch the covering mpack" |
| 300 | |
| 301 | # Verify the assembled fetch mpack contains the correct bytes. |
| 302 | from muse.core.mpack import parse_wire_mpack |
| 303 | fetch_mpack_id = result["mpack_id"] |
| 304 | fetch_raw = backend._mpacks.get(fetch_mpack_id) |
| 305 | assert fetch_raw is not None |
| 306 | payload = parse_wire_mpack(fetch_raw) |
| 307 | obj_map = {o["object_id"]: o["content"] for o in payload.get("blobs", [])} |
| 308 | assert obj_map.get(oid) == raw |
| 309 | |
| 310 | |
| 311 | # ── FN-3: objects across two mpacks → each fetched exactly once ─────────────── |
| 312 | |
| 313 | @pytest.mark.asyncio |
| 314 | async def test_fn3_two_mpacks_each_fetched_once( |
| 315 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 316 | ) -> None: |
| 317 | """Objects span two mpacks. Each mpack is downloaded exactly once (no redundant GETs).""" |
| 318 | backend = _FakeBackend() |
| 319 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 320 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 321 | |
| 322 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 323 | |
| 324 | raw_a = b"object from mpack A" |
| 325 | raw_b = b"object from mpack B" |
| 326 | oid_a = blob_id(raw_a) |
| 327 | oid_b = blob_id(raw_b) |
| 328 | |
| 329 | mpack_a_bytes = _make_mpack_bytes({oid_a: raw_a}) |
| 330 | mpack_b_bytes = _make_mpack_bytes({oid_b: raw_b}) |
| 331 | mpack_a_id = _mpack_key(mpack_a_bytes) |
| 332 | mpack_b_id = _mpack_key(mpack_b_bytes) |
| 333 | |
| 334 | await backend.put_mpack(mpack_a_id, mpack_a_bytes) |
| 335 | await backend.put_mpack(mpack_b_id, mpack_b_bytes) |
| 336 | await _index_object_in_mpack(db_session, oid_a, mpack_a_id) |
| 337 | await _index_object_in_mpack(db_session, oid_b, mpack_b_id) |
| 338 | for oid in (oid_a, oid_b): |
| 339 | await db_session.execute( |
| 340 | pg_insert(db.MusehubObjectRef) |
| 341 | .values(repo_id=repo.repo_id, object_id=oid) |
| 342 | .on_conflict_do_nothing() |
| 343 | ) |
| 344 | await db_session.commit() |
| 345 | |
| 346 | commit = await _make_commit( |
| 347 | db_session, repo.repo_id, |
| 348 | manifest={"a.txt": oid_a, "b.txt": oid_b}, |
| 349 | seed="fn3", |
| 350 | ) |
| 351 | |
| 352 | result = await wire_fetch_mpack( |
| 353 | db_session, repo.repo_id, want=[commit.commit_id], have=[] |
| 354 | ) |
| 355 | |
| 356 | assert result["blob_count"] == 2 |
| 357 | assert backend.get_calls == [], "no per-object GETs" |
| 358 | assert sorted(backend.get_mpack_calls) == sorted([mpack_a_id, mpack_b_id]) |
| 359 | assert backend.get_mpack_calls.count(mpack_a_id) == 1, "mpack A fetched exactly once" |
| 360 | assert backend.get_mpack_calls.count(mpack_b_id) == 1, "mpack B fetched exactly once" |
| 361 | |
| 362 | |
| 363 | # ── FN-4: mpack missing from storage → per-object fallback ─────────────────── |
| 364 | |
| 365 | @pytest.mark.asyncio |
| 366 | async def test_fn4_missing_mpack_falls_back_to_per_object_get( |
| 367 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 368 | ) -> None: |
| 369 | """MPackIndex points at a mpack that is missing from storage → falls back to backend.get().""" |
| 370 | backend = _FakeBackend() |
| 371 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 372 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 373 | |
| 374 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 375 | raw = b"fallback content" |
| 376 | oid = blob_id(raw) |
| 377 | |
| 378 | # MPackIndex row exists but mpack is NOT in backend._mpacks |
| 379 | ghost_mpack_id = fake_id("ghost-mpack") |
| 380 | await _index_object_in_mpack(db_session, oid, ghost_mpack_id) |
| 381 | await db_session.execute( |
| 382 | pg_insert(db.MusehubObjectRef) |
| 383 | .values(repo_id=repo.repo_id, object_id=oid) |
| 384 | .on_conflict_do_nothing() |
| 385 | ) |
| 386 | await db_session.commit() |
| 387 | # Per-object key IS available (legacy fallback) |
| 388 | await backend.put(oid, raw) |
| 389 | |
| 390 | commit = await _make_commit( |
| 391 | db_session, repo.repo_id, manifest={"f.txt": oid}, seed="fn4" |
| 392 | ) |
| 393 | |
| 394 | result = await wire_fetch_mpack( |
| 395 | db_session, repo.repo_id, want=[commit.commit_id], have=[] |
| 396 | ) |
| 397 | |
| 398 | assert result["blob_count"] == 1 |
| 399 | assert ghost_mpack_id in backend.get_mpack_calls, "attempted mpack GET" |
| 400 | assert oid in backend.get_calls, "fell back to per-object GET after mpack miss" |
| 401 | |
| 402 | |
| 403 | # ── FN-5: object absent from MPackIndex → FetchNotIndexedError ─────────────── |
| 404 | |
| 405 | @pytest.mark.asyncio |
| 406 | async def test_fn5_unindexed_object_raises_fetch_not_indexed_error( |
| 407 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 408 | ) -> None: |
| 409 | """Object not present in MPackIndex at all → FetchNotIndexedError (background job pending).""" |
| 410 | backend = _FakeBackend() |
| 411 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 412 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 413 | |
| 414 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 415 | raw = b"not indexed yet" |
| 416 | oid = blob_id(raw) |
| 417 | |
| 418 | # Object row exists in DB but NO MPackIndex entry and NO per-object key |
| 419 | await db_session.execute( |
| 420 | pg_insert(db.MusehubObject) |
| 421 | .values(object_id=oid, path="f.txt", size_bytes=len(raw), storage_uri=f"mem://{oid}") |
| 422 | .on_conflict_do_nothing(index_elements=["object_id"]) |
| 423 | ) |
| 424 | await db_session.execute( |
| 425 | pg_insert(db.MusehubObjectRef) |
| 426 | .values(repo_id=repo.repo_id, object_id=oid) |
| 427 | .on_conflict_do_nothing() |
| 428 | ) |
| 429 | await db_session.commit() |
| 430 | |
| 431 | commit = await _make_commit( |
| 432 | db_session, repo.repo_id, manifest={"f.txt": oid}, seed="fn5" |
| 433 | ) |
| 434 | |
| 435 | with pytest.raises(FetchNotIndexedError): |
| 436 | await wire_fetch_mpack( |
| 437 | db_session, repo.repo_id, want=[commit.commit_id], have=[] |
| 438 | ) |
| 439 | |
| 440 | |
| 441 | # ── FN-6: mixed sources — cache + mpack + legacy ───────────────────────────── |
| 442 | |
| 443 | @pytest.mark.asyncio |
| 444 | async def test_fn6_mixed_sources_all_merged( |
| 445 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 446 | ) -> None: |
| 447 | """Three objects: one from content_cache, one from mpack, one from legacy per-object key.""" |
| 448 | backend = _FakeBackend() |
| 449 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 450 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 451 | |
| 452 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 453 | |
| 454 | raw_cached = b"in content_cache" |
| 455 | raw_mpack = b"in mpack storage" |
| 456 | raw_legacy = b"in per-object storage" |
| 457 | |
| 458 | oid_cached = blob_id(raw_cached) |
| 459 | oid_mpack = blob_id(raw_mpack) |
| 460 | oid_legacy = blob_id(raw_legacy) |
| 461 | |
| 462 | # oid_cached: stored in content_cache column |
| 463 | await db_session.execute( |
| 464 | pg_insert(db.MusehubObject) |
| 465 | .values( |
| 466 | object_id=oid_cached, path="cached.txt", |
| 467 | size_bytes=len(raw_cached), storage_uri=f"mem://{oid_cached}", |
| 468 | content_cache=raw_cached, |
| 469 | ) |
| 470 | .on_conflict_do_nothing(index_elements=["object_id"]) |
| 471 | ) |
| 472 | # oid_mpack: stored only in mpack, indexed via MPackIndex |
| 473 | mpack_bytes = _make_mpack_bytes({oid_mpack: raw_mpack}) |
| 474 | mpack_id = _mpack_key(mpack_bytes) |
| 475 | await backend.put_mpack(mpack_id, mpack_bytes) |
| 476 | await _index_object_in_mpack(db_session, oid_mpack, mpack_id) |
| 477 | |
| 478 | # oid_legacy: stored as per-object key in backend, has MPackIndex (mpack present too) |
| 479 | # but for the legacy case we'll make the mpack missing so it falls back to get() |
| 480 | ghost_id = fake_id("ghost") |
| 481 | await _index_object_in_mpack(db_session, oid_legacy, ghost_id) |
| 482 | await backend.put(oid_legacy, raw_legacy) |
| 483 | |
| 484 | for oid in (oid_cached, oid_mpack, oid_legacy): |
| 485 | await db_session.execute( |
| 486 | pg_insert(db.MusehubObjectRef) |
| 487 | .values(repo_id=repo.repo_id, object_id=oid) |
| 488 | .on_conflict_do_nothing() |
| 489 | ) |
| 490 | |
| 491 | # Also need MPackIndex for oid_cached so the coverage check passes |
| 492 | cache_mpack_bytes = _make_mpack_bytes({oid_cached: raw_cached}) |
| 493 | cache_mpack_id = _mpack_key(cache_mpack_bytes) |
| 494 | await backend.put_mpack(cache_mpack_id, cache_mpack_bytes) |
| 495 | await _index_object_in_mpack(db_session, oid_cached, cache_mpack_id) |
| 496 | |
| 497 | await db_session.commit() |
| 498 | |
| 499 | commit = await _make_commit( |
| 500 | db_session, repo.repo_id, |
| 501 | manifest={"cached.txt": oid_cached, "mpack.txt": oid_mpack, "legacy.txt": oid_legacy}, |
| 502 | seed="fn6", |
| 503 | ) |
| 504 | |
| 505 | result = await wire_fetch_mpack( |
| 506 | db_session, repo.repo_id, want=[commit.commit_id], have=[] |
| 507 | ) |
| 508 | |
| 509 | assert result["blob_count"] == 3 |
| 510 | |
| 511 | # Verify assembled mpack contains all three objects with correct bytes |
| 512 | from muse.core.mpack import parse_wire_mpack |
| 513 | fetch_raw = backend._mpacks.get(result["mpack_id"]) |
| 514 | assert fetch_raw is not None |
| 515 | payload = parse_wire_mpack(fetch_raw) |
| 516 | obj_map = {o["object_id"]: bytes(o["content"]) for o in payload.get("blobs", [])} |
| 517 | |
| 518 | assert obj_map[oid_cached] == raw_cached |
| 519 | assert obj_map[oid_mpack] == raw_mpack |
| 520 | assert obj_map[oid_legacy] == raw_legacy |
| 521 | |
| 522 | # content_cache object must not trigger any GET |
| 523 | assert oid_cached not in backend.get_calls |
| 524 | # mpack object must not trigger per-object GET |
| 525 | assert oid_mpack not in backend.get_calls |
| 526 | # legacy fallback fired for oid_legacy |
| 527 | assert oid_legacy in backend.get_calls |
| 528 | |
| 529 | |
| 530 | # ── FN-7: zstd-compressed objects in mpack → decompressed before serving ────── |
| 531 | |
| 532 | @pytest.mark.asyncio |
| 533 | async def test_fn7_zstd_compressed_mpack_objects_decompressed( |
| 534 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 535 | ) -> None: |
| 536 | """Objects stored with encoding='zstd' in the covering mpack must be |
| 537 | decompressed before being placed in the assembled fetch mpack. |
| 538 | |
| 539 | Regression test for the bug where _extract_from_mpack served raw compressed |
| 540 | bytes, causing the muse client's sha256 integrity check to fail: |
| 541 | expected sha256(decompressed) but got sha256(compressed) |
| 542 | This caused apply_mpack to skip all objects, leaving HEAD unadvanced and |
| 543 | muse log returning 0 commits. |
| 544 | """ |
| 545 | backend = _FakeBackend() |
| 546 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 547 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 548 | |
| 549 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 550 | |
| 551 | # Object ID is sha256 of DECOMPRESSED bytes (content-addressed). |
| 552 | raw = b"decompressed object content for fn7" |
| 553 | oid = blob_id(raw) |
| 554 | |
| 555 | # Push mpack stores the object with encoding='zstd'. |
| 556 | mpack_bytes_zstd = _make_mpack_bytes_zstd({oid: raw}) |
| 557 | mpack_id = _mpack_key(mpack_bytes_zstd) |
| 558 | await backend.put_mpack(mpack_id, mpack_bytes_zstd) |
| 559 | await _index_object_in_mpack(db_session, oid, mpack_id) |
| 560 | await db_session.execute( |
| 561 | pg_insert(db.MusehubObjectRef) |
| 562 | .values(repo_id=repo.repo_id, object_id=oid) |
| 563 | .on_conflict_do_nothing() |
| 564 | ) |
| 565 | await db_session.commit() |
| 566 | |
| 567 | commit = await _make_commit( |
| 568 | db_session, repo.repo_id, manifest={"file.txt": oid}, seed="fn7" |
| 569 | ) |
| 570 | |
| 571 | result = await wire_fetch_mpack( |
| 572 | db_session, repo.repo_id, want=[commit.commit_id], have=[] |
| 573 | ) |
| 574 | |
| 575 | assert result["blob_count"] == 1, "expected 1 object in fetch mpack" |
| 576 | |
| 577 | # The assembled fetch mpack must contain the DECOMPRESSED bytes. |
| 578 | # If still compressed, the client's sha256(content) != oid integrity check fails. |
| 579 | from muse.core.mpack import parse_wire_mpack |
| 580 | fetch_raw = backend._mpacks.get(result["mpack_id"]) |
| 581 | assert fetch_raw is not None, "fetch mpack not stored in backend" |
| 582 | payload = parse_wire_mpack(fetch_raw) |
| 583 | obj_map = {o["object_id"]: bytes(o["content"]) for o in payload.get("blobs", [])} |
| 584 | |
| 585 | assert oid in obj_map, f"object {oid[:20]} missing from assembled fetch mpack" |
| 586 | assert obj_map[oid] == raw, ( |
| 587 | "object content in assembled mpack must be DECOMPRESSED bytes matching oid; " |
| 588 | "got compressed bytes — integrity check would fail on client" |
| 589 | ) |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
20 days ago