test_mpack_phase3.py
python
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠ breaking
1 day ago
| 1 | """TDD — Phase 3: make push fully synchronous (issue #69). |
| 2 | |
| 3 | After Phase 2 the background job no longer does per-object MinIO writes. |
| 4 | Phase 3 removes the background job entirely: wire_push_unpack_mpack processes |
| 5 | the mpack inline for ALL sizes, makes objects immediately fetchable, and does |
| 6 | not enqueue a mpack.index background job. |
| 7 | |
| 8 | After this change: |
| 9 | - wire_push_unpack_mpack must NOT enqueue a mpack.index MusehubBackgroundJob |
| 10 | - MusehubObject.storage_uri must be mpack://{mpack_key} immediately after push |
| 11 | - wire_fetch_mpack must succeed immediately after push (no FetchNotIndexedError) |
| 12 | - The large-mpack threshold (mpack_content_cache_max_bytes) is removed as a gate |
| 13 | |
| 14 | Test IDs: |
| 15 | P3-1 no mpack.index background job enqueued after wire_push_unpack_mpack |
| 16 | P3-2 MusehubObject.storage_uri is mpack:// URI immediately after push |
| 17 | P3-3 full round-trip: push → immediate fetch → correct objects served |
| 18 | """ |
| 19 | from __future__ import annotations |
| 20 | |
| 21 | import hashlib |
| 22 | from collections.abc import Mapping |
| 23 | from datetime import datetime, timezone |
| 24 | |
| 25 | import msgpack |
| 26 | import pytest |
| 27 | import zstandard |
| 28 | |
| 29 | pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") |
| 30 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 31 | from sqlalchemy.ext.asyncio import AsyncSession |
| 32 | from sqlalchemy import select |
| 33 | |
| 34 | from muse.core.types import blob_id, fake_id |
| 35 | from musehub.db import musehub_repo_models as db |
| 36 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 37 | from musehub.services.musehub_wire import ( |
| 38 | wire_push_unpack_mpack, |
| 39 | wire_fetch_mpack, |
| 40 | ) |
| 41 | from tests.factories import create_repo |
| 42 | |
| 43 | |
| 44 | def _now() -> datetime: |
| 45 | return datetime.now(tz=timezone.utc) |
| 46 | |
| 47 | |
| 48 | def _mpack_id(raw: bytes) -> str: |
| 49 | return "sha256:" + hashlib.sha256(raw).hexdigest() |
| 50 | |
| 51 | |
| 52 | def _build_push_mpack(objects: Mapping[str, bytes]) -> bytes: |
| 53 | """Build a push mpack with zstd-compressed objects and no commits/snapshots.""" |
| 54 | cctx = zstandard.ZstdCompressor() |
| 55 | entries = [ |
| 56 | {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)} |
| 57 | for oid, data in objects.items() |
| 58 | ] |
| 59 | return msgpack.packb( |
| 60 | {"commits": [], "snapshots": [], "objects": entries, "branch_heads": {}}, |
| 61 | use_bin_type=True, |
| 62 | ) |
| 63 | |
| 64 | |
| 65 | class _FakeBackend: |
| 66 | """Backend that stores mpacks and records put() calls.""" |
| 67 | |
| 68 | def __init__(self, mpack_store: Mapping[str, bytes]) -> None: |
| 69 | self._mpacks = mpack_store |
| 70 | self._objects: dict[str, bytes] = {} |
| 71 | self.put_calls: list[str] = [] |
| 72 | |
| 73 | async def put(self, oid: str, data: bytes) -> str: |
| 74 | self.put_calls.append(oid) |
| 75 | self._objects[oid] = data |
| 76 | return f"mem://{oid}" |
| 77 | |
| 78 | async def get(self, oid: str) -> bytes | None: |
| 79 | return self._objects.get(oid) |
| 80 | |
| 81 | async def get_mpack(self, mpack_id: str) -> bytes | None: |
| 82 | return self._mpacks.get(mpack_id) |
| 83 | |
| 84 | async def put_mpack(self, mpack_id: str, data: bytes) -> None: |
| 85 | self._mpacks[mpack_id] = data |
| 86 | |
| 87 | async def exists(self, oid: str) -> bool: |
| 88 | return oid in self._objects |
| 89 | |
| 90 | async def delete(self, oid: str) -> None: |
| 91 | self._objects.pop(oid, None) |
| 92 | |
| 93 | async def presign_get(self, oid: str, ttl: int) -> str: |
| 94 | return f"https://minio.test/{oid}" |
| 95 | |
| 96 | async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str: |
| 97 | return f"https://minio.test/mpacks/{mpack_id}" |
| 98 | |
| 99 | async def quarantine_mpack(self, mpack_key: str) -> None: |
| 100 | pass |
| 101 | |
| 102 | def uri_for(self, oid: str) -> str: |
| 103 | return f"mem://{oid}" |
| 104 | |
| 105 | supports_presign: bool = True |
| 106 | |
| 107 | |
| 108 | # ── P3-1: no mpack.index background job enqueued ───────────────────────────── |
| 109 | |
| 110 | @pytest.mark.asyncio |
| 111 | async def test_p3_1_no_mpack_index_job_enqueued( |
| 112 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 113 | ) -> None: |
| 114 | """wire_push_unpack_mpack must not enqueue a mpack.index background job. |
| 115 | |
| 116 | Phase 3 removes the background job entirely: the push path processes |
| 117 | everything inline. A mpack.index job row in DB is a Phase 3 regression. |
| 118 | """ |
| 119 | raw = b"phase3 object alpha" |
| 120 | oid = blob_id(raw) |
| 121 | |
| 122 | mpack_bytes = _build_push_mpack({oid: raw}) |
| 123 | mpack_key = _mpack_id(mpack_bytes) |
| 124 | |
| 125 | backend = _FakeBackend({mpack_key: mpack_bytes}) |
| 126 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 127 | |
| 128 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 129 | |
| 130 | await wire_push_unpack_mpack( |
| 131 | db_session, repo.repo_id, mpack_key, |
| 132 | pusher_id="gabriel", branch="main", head_commit_id="", |
| 133 | commits_count=0, objects_count=1, |
| 134 | ) |
| 135 | await db_session.commit() |
| 136 | |
| 137 | job_rows = (await db_session.execute( |
| 138 | select(MusehubBackgroundJob) |
| 139 | .where(MusehubBackgroundJob.repo_id == repo.repo_id) |
| 140 | .where(MusehubBackgroundJob.job_type == "mpack.index") |
| 141 | )).scalars().all() |
| 142 | |
| 143 | assert job_rows == [], ( |
| 144 | f"Phase 3: wire_push_unpack_mpack enqueued {len(job_rows)} mpack.index job(s). " |
| 145 | f"The background job must be removed — push is now fully synchronous." |
| 146 | ) |
| 147 | |
| 148 | |
| 149 | # ── P3-2: storage_uri is mpack:// immediately after push ───────────────────── |
| 150 | |
| 151 | @pytest.mark.asyncio |
| 152 | async def test_p3_2_storage_uri_is_mpack_uri_after_push( |
| 153 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 154 | ) -> None: |
| 155 | """MusehubObject.storage_uri must be mpack://... immediately after push. |
| 156 | |
| 157 | Previously small mpacks set storage_uri='pending' (to be promoted by the |
| 158 | background job). After Phase 3 there is no job, so the URI must be set |
| 159 | to the covering mpack key on the push path itself. |
| 160 | """ |
| 161 | raw = b"phase3 storage uri content" |
| 162 | oid = blob_id(raw) |
| 163 | |
| 164 | mpack_bytes = _build_push_mpack({oid: raw}) |
| 165 | mpack_key = _mpack_id(mpack_bytes) |
| 166 | |
| 167 | backend = _FakeBackend({mpack_key: mpack_bytes}) |
| 168 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 169 | |
| 170 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 171 | |
| 172 | await wire_push_unpack_mpack( |
| 173 | db_session, repo.repo_id, mpack_key, |
| 174 | pusher_id="gabriel", branch="main", head_commit_id="", |
| 175 | commits_count=0, objects_count=1, |
| 176 | ) |
| 177 | await db_session.commit() |
| 178 | |
| 179 | row = await db_session.get(db.MusehubObject, oid) |
| 180 | assert row is not None, f"MusehubObject row missing for {oid[:20]}" |
| 181 | assert row.storage_uri.startswith("mpack://"), ( |
| 182 | f"storage_uri should be mpack://... immediately after push but got {row.storage_uri!r}. " |
| 183 | f"Phase 3 sets the mpack URI inline without a background job." |
| 184 | ) |
| 185 | assert mpack_key in row.storage_uri, ( |
| 186 | f"storage_uri {row.storage_uri!r} does not reference mpack_key" |
| 187 | ) |
| 188 | |
| 189 | |
| 190 | # ── P3-3: full round-trip — push → immediate fetch ─────────────────────────── |
| 191 | |
| 192 | @pytest.mark.asyncio |
| 193 | async def test_p3_3_immediate_fetch_after_push( |
| 194 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 195 | ) -> None: |
| 196 | """Full round-trip: push → wire_fetch_mpack succeeds immediately (no job wait). |
| 197 | |
| 198 | This is the end-to-end Phase 3 test: after wire_push_unpack_mpack returns, |
| 199 | wire_fetch_mpack must be able to serve the object without a FetchNotIndexedError. |
| 200 | """ |
| 201 | raw_content = b"phase3 roundtrip content" |
| 202 | oid = blob_id(raw_content) |
| 203 | |
| 204 | # Build a mpack with a commit referencing this object via a snapshot. |
| 205 | cctx = zstandard.ZstdCompressor() |
| 206 | snap_id = fake_id("snap-p3-roundtrip") |
| 207 | commit_id = fake_id("commit-p3-roundtrip") |
| 208 | |
| 209 | mpack_bytes = msgpack.packb( |
| 210 | { |
| 211 | "commits": [ |
| 212 | { |
| 213 | "commit_id": commit_id, |
| 214 | "parent_commit_id": None, |
| 215 | "parent2_commit_id": None, |
| 216 | "branch": "main", |
| 217 | "message": "p3 roundtrip", |
| 218 | "author": "gabriel", |
| 219 | "committed_at": _now().isoformat(), |
| 220 | "snapshot_id": snap_id, |
| 221 | "agent_id": "", |
| 222 | "model_id": "", |
| 223 | "toolchain_id": "", |
| 224 | "signature": "", |
| 225 | "signer_key_id": "", |
| 226 | "sem_ver_bump": "patch", |
| 227 | "breaking_changes": [], |
| 228 | "prompt_hash": "", |
| 229 | } |
| 230 | ], |
| 231 | "snapshots": [ |
| 232 | { |
| 233 | "snapshot_id": snap_id, |
| 234 | "parent_snapshot_id": None, |
| 235 | "delta_upsert": {"file.txt": oid}, |
| 236 | "delta_remove": [], |
| 237 | } |
| 238 | ], |
| 239 | "objects": [ |
| 240 | { |
| 241 | "object_id": oid, |
| 242 | "encoding": "zstd", |
| 243 | "content": cctx.compress(raw_content), |
| 244 | } |
| 245 | ], |
| 246 | "branch_heads": {"main": commit_id}, |
| 247 | }, |
| 248 | use_bin_type=True, |
| 249 | ) |
| 250 | mpack_key = _mpack_id(mpack_bytes) |
| 251 | |
| 252 | backend = _FakeBackend({mpack_key: mpack_bytes}) |
| 253 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 254 | |
| 255 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 256 | |
| 257 | await wire_push_unpack_mpack( |
| 258 | db_session, repo.repo_id, mpack_key, |
| 259 | pusher_id="gabriel", branch="main", head_commit_id=commit_id, |
| 260 | commits_count=1, objects_count=1, |
| 261 | ) |
| 262 | await db_session.commit() |
| 263 | |
| 264 | # Immediately fetch — no background job has run, no wait. |
| 265 | result = await wire_fetch_mpack( |
| 266 | db_session, repo.repo_id, want=[commit_id], have=[] |
| 267 | ) |
| 268 | |
| 269 | assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data" |
| 270 | assert result["object_count"] == 1 |
| 271 | |
| 272 | # Verify the assembled fetch mpack contains correct bytes. |
| 273 | fetch_mpack_id = result["mpack_id"] |
| 274 | fetch_raw = backend._mpacks.get(fetch_mpack_id) |
| 275 | assert fetch_raw is not None, "assembled fetch mpack not found in backend" |
| 276 | if fetch_raw[:4] == b"MUSE": |
| 277 | from muse.core.mpack import parse_wire_mpack as _parse_wm |
| 278 | payload = _parse_wm(fetch_raw) |
| 279 | else: |
| 280 | payload = msgpack.unpackb(fetch_raw, raw=False) |
| 281 | obj_map = {o["object_id"]: bytes(o["content"]) for o in payload["objects"]} |
| 282 | |
| 283 | assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack" |
| 284 | assert obj_map[oid] == raw_content, ( |
| 285 | "object content from Phase 3 synchronous push must match original bytes" |
| 286 | ) |
File History
1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠
1 day ago