gabriel / musehub public
test_mpack_phase3.py python
286 lines 10.2 KB
Raw
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor ⚠ breaking 1 day ago
1 """TDD — Phase 3: make push fully synchronous (issue #69).
2
3 After Phase 2 the background job no longer does per-object MinIO writes.
4 Phase 3 removes the background job entirely: wire_push_unpack_mpack processes
5 the mpack inline for ALL sizes, makes objects immediately fetchable, and does
6 not enqueue a mpack.index background job.
7
8 After this change:
9 - wire_push_unpack_mpack must NOT enqueue a mpack.index MusehubBackgroundJob
10 - MusehubObject.storage_uri must be mpack://{mpack_key} immediately after push
11 - wire_fetch_mpack must succeed immediately after push (no FetchNotIndexedError)
12 - The large-mpack threshold (mpack_content_cache_max_bytes) is removed as a gate
13
14 Test IDs:
15 P3-1 no mpack.index background job enqueued after wire_push_unpack_mpack
16 P3-2 MusehubObject.storage_uri is mpack:// URI immediately after push
17 P3-3 full round-trip: push → immediate fetch → correct objects served
18 """
19 from __future__ import annotations
20
21 import hashlib
22 from collections.abc import Mapping
23 from datetime import datetime, timezone
24
25 import msgpack
26 import pytest
27 import zstandard
28
29 pytestmark = pytest.mark.skip(reason="muse wire protocol in flux")
30 from sqlalchemy.dialects.postgresql import insert as pg_insert
31 from sqlalchemy.ext.asyncio import AsyncSession
32 from sqlalchemy import select
33
34 from muse.core.types import blob_id, fake_id
35 from musehub.db import musehub_repo_models as db
36 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
37 from musehub.services.musehub_wire import (
38 wire_push_unpack_mpack,
39 wire_fetch_mpack,
40 )
41 from tests.factories import create_repo
42
43
44 def _now() -> datetime:
45 return datetime.now(tz=timezone.utc)
46
47
48 def _mpack_id(raw: bytes) -> str:
49 return "sha256:" + hashlib.sha256(raw).hexdigest()
50
51
52 def _build_push_mpack(objects: Mapping[str, bytes]) -> bytes:
53 """Build a push mpack with zstd-compressed objects and no commits/snapshots."""
54 cctx = zstandard.ZstdCompressor()
55 entries = [
56 {"object_id": oid, "encoding": "zstd", "content": cctx.compress(data)}
57 for oid, data in objects.items()
58 ]
59 return msgpack.packb(
60 {"commits": [], "snapshots": [], "objects": entries, "branch_heads": {}},
61 use_bin_type=True,
62 )
63
64
65 class _FakeBackend:
66 """Backend that stores mpacks and records put() calls."""
67
68 def __init__(self, mpack_store: Mapping[str, bytes]) -> None:
69 self._mpacks = mpack_store
70 self._objects: dict[str, bytes] = {}
71 self.put_calls: list[str] = []
72
73 async def put(self, oid: str, data: bytes) -> str:
74 self.put_calls.append(oid)
75 self._objects[oid] = data
76 return f"mem://{oid}"
77
78 async def get(self, oid: str) -> bytes | None:
79 return self._objects.get(oid)
80
81 async def get_mpack(self, mpack_id: str) -> bytes | None:
82 return self._mpacks.get(mpack_id)
83
84 async def put_mpack(self, mpack_id: str, data: bytes) -> None:
85 self._mpacks[mpack_id] = data
86
87 async def exists(self, oid: str) -> bool:
88 return oid in self._objects
89
90 async def delete(self, oid: str) -> None:
91 self._objects.pop(oid, None)
92
93 async def presign_get(self, oid: str, ttl: int) -> str:
94 return f"https://minio.test/{oid}"
95
96 async def presign_mpack_get(self, mpack_id: str, ttl: int) -> str:
97 return f"https://minio.test/mpacks/{mpack_id}"
98
99 async def quarantine_mpack(self, mpack_key: str) -> None:
100 pass
101
102 def uri_for(self, oid: str) -> str:
103 return f"mem://{oid}"
104
105 supports_presign: bool = True
106
107
108 # ── P3-1: no mpack.index background job enqueued ─────────────────────────────
109
110 @pytest.mark.asyncio
111 async def test_p3_1_no_mpack_index_job_enqueued(
112 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
113 ) -> None:
114 """wire_push_unpack_mpack must not enqueue a mpack.index background job.
115
116 Phase 3 removes the background job entirely: the push path processes
117 everything inline. A mpack.index job row in DB is a Phase 3 regression.
118 """
119 raw = b"phase3 object alpha"
120 oid = blob_id(raw)
121
122 mpack_bytes = _build_push_mpack({oid: raw})
123 mpack_key = _mpack_id(mpack_bytes)
124
125 backend = _FakeBackend({mpack_key: mpack_bytes})
126 monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend)
127
128 repo = await create_repo(db_session, owner="gabriel", visibility="public")
129
130 await wire_push_unpack_mpack(
131 db_session, repo.repo_id, mpack_key,
132 pusher_id="gabriel", branch="main", head_commit_id="",
133 commits_count=0, objects_count=1,
134 )
135 await db_session.commit()
136
137 job_rows = (await db_session.execute(
138 select(MusehubBackgroundJob)
139 .where(MusehubBackgroundJob.repo_id == repo.repo_id)
140 .where(MusehubBackgroundJob.job_type == "mpack.index")
141 )).scalars().all()
142
143 assert job_rows == [], (
144 f"Phase 3: wire_push_unpack_mpack enqueued {len(job_rows)} mpack.index job(s). "
145 f"The background job must be removed — push is now fully synchronous."
146 )
147
148
149 # ── P3-2: storage_uri is mpack:// immediately after push ─────────────────────
150
151 @pytest.mark.asyncio
152 async def test_p3_2_storage_uri_is_mpack_uri_after_push(
153 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
154 ) -> None:
155 """MusehubObject.storage_uri must be mpack://... immediately after push.
156
157 Previously small mpacks set storage_uri='pending' (to be promoted by the
158 background job). After Phase 3 there is no job, so the URI must be set
159 to the covering mpack key on the push path itself.
160 """
161 raw = b"phase3 storage uri content"
162 oid = blob_id(raw)
163
164 mpack_bytes = _build_push_mpack({oid: raw})
165 mpack_key = _mpack_id(mpack_bytes)
166
167 backend = _FakeBackend({mpack_key: mpack_bytes})
168 monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend)
169
170 repo = await create_repo(db_session, owner="gabriel", visibility="public")
171
172 await wire_push_unpack_mpack(
173 db_session, repo.repo_id, mpack_key,
174 pusher_id="gabriel", branch="main", head_commit_id="",
175 commits_count=0, objects_count=1,
176 )
177 await db_session.commit()
178
179 row = await db_session.get(db.MusehubObject, oid)
180 assert row is not None, f"MusehubObject row missing for {oid[:20]}"
181 assert row.storage_uri.startswith("mpack://"), (
182 f"storage_uri should be mpack://... immediately after push but got {row.storage_uri!r}. "
183 f"Phase 3 sets the mpack URI inline without a background job."
184 )
185 assert mpack_key in row.storage_uri, (
186 f"storage_uri {row.storage_uri!r} does not reference mpack_key"
187 )
188
189
190 # ── P3-3: full round-trip — push → immediate fetch ───────────────────────────
191
192 @pytest.mark.asyncio
193 async def test_p3_3_immediate_fetch_after_push(
194 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
195 ) -> None:
196 """Full round-trip: push → wire_fetch_mpack succeeds immediately (no job wait).
197
198 This is the end-to-end Phase 3 test: after wire_push_unpack_mpack returns,
199 wire_fetch_mpack must be able to serve the object without a FetchNotIndexedError.
200 """
201 raw_content = b"phase3 roundtrip content"
202 oid = blob_id(raw_content)
203
204 # Build a mpack with a commit referencing this object via a snapshot.
205 cctx = zstandard.ZstdCompressor()
206 snap_id = fake_id("snap-p3-roundtrip")
207 commit_id = fake_id("commit-p3-roundtrip")
208
209 mpack_bytes = msgpack.packb(
210 {
211 "commits": [
212 {
213 "commit_id": commit_id,
214 "parent_commit_id": None,
215 "parent2_commit_id": None,
216 "branch": "main",
217 "message": "p3 roundtrip",
218 "author": "gabriel",
219 "committed_at": _now().isoformat(),
220 "snapshot_id": snap_id,
221 "agent_id": "",
222 "model_id": "",
223 "toolchain_id": "",
224 "signature": "",
225 "signer_key_id": "",
226 "sem_ver_bump": "patch",
227 "breaking_changes": [],
228 "prompt_hash": "",
229 }
230 ],
231 "snapshots": [
232 {
233 "snapshot_id": snap_id,
234 "parent_snapshot_id": None,
235 "delta_upsert": {"file.txt": oid},
236 "delta_remove": [],
237 }
238 ],
239 "objects": [
240 {
241 "object_id": oid,
242 "encoding": "zstd",
243 "content": cctx.compress(raw_content),
244 }
245 ],
246 "branch_heads": {"main": commit_id},
247 },
248 use_bin_type=True,
249 )
250 mpack_key = _mpack_id(mpack_bytes)
251
252 backend = _FakeBackend({mpack_key: mpack_bytes})
253 monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend)
254
255 repo = await create_repo(db_session, owner="gabriel", visibility="public")
256
257 await wire_push_unpack_mpack(
258 db_session, repo.repo_id, mpack_key,
259 pusher_id="gabriel", branch="main", head_commit_id=commit_id,
260 commits_count=1, objects_count=1,
261 )
262 await db_session.commit()
263
264 # Immediately fetch — no background job has run, no wait.
265 result = await wire_fetch_mpack(
266 db_session, repo.repo_id, want=[commit_id], have=[]
267 )
268
269 assert result["mpack_url"] is not None, "fetch returned up-to-date but should have data"
270 assert result["object_count"] == 1
271
272 # Verify the assembled fetch mpack contains correct bytes.
273 fetch_mpack_id = result["mpack_id"]
274 fetch_raw = backend._mpacks.get(fetch_mpack_id)
275 assert fetch_raw is not None, "assembled fetch mpack not found in backend"
276 if fetch_raw[:4] == b"MUSE":
277 from muse.core.mpack import parse_wire_mpack as _parse_wm
278 payload = _parse_wm(fetch_raw)
279 else:
280 payload = msgpack.unpackb(fetch_raw, raw=False)
281 obj_map = {o["object_id"]: bytes(o["content"]) for o in payload["objects"]}
282
283 assert oid in obj_map, f"object {oid[:20]} missing from fetch mpack"
284 assert obj_map[oid] == raw_content, (
285 "object content from Phase 3 synchronous push must match original bytes"
286 )
File History 1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor 1 day ago