gabriel / musehub public
test_mpack_index_job.py python
410 lines 15.2 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 21 days ago
1 """TDD — mpack.index job: object writes, object refs, idempotency.
2
3 The one principle: Content-addressing is a proof, not a label.
4 sha256(wire_bytes) == mpack_key already authenticated every byte in the
5 mpack — including every object's content and its declared ID.
6
7 Phase 1 invariants:
8 1. After job runs: every object blob is in MinIO under its sha256 key.
9 2. After job runs: musehub_objects has one row per object.
10 3. After job runs: musehub_object_refs has rows for every (repo_id, object_id).
11 4. No per-object sha256 re-verification — that is checking the work of the hash.
12 5. Ghost guard passes on stream push immediately after job completes.
13 6. Job is idempotent — running it twice produces identical DB state.
14 7. Return dict includes objects_written, object_refs_written, elapsed_ms.
15 """
16 from __future__ import annotations
17
18 import datetime
19 import hashlib
20 import pathlib
21
22 import msgpack
23 import pytest
24 import pytest_asyncio
25
26 pytestmark = pytest.mark.skip(reason="muse wire protocol in flux")
27 from httpx import AsyncClient, ASGITransport
28 from sqlalchemy import func, select
29 from sqlalchemy.ext.asyncio import AsyncSession
30
31 from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request
32 from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef, MusehubRepo
33 from musehub.db.database import get_db
34 from musehub.main import app
35
36 from muse.core.object_store import write_object
37 from muse.core.mpack import build_mpack
38 from muse.core.paths import muse_dir
39 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
40 from muse.core.commits import CommitRecord, write_commit
41 from muse.core.refs import write_branch_ref
42 from muse.core.snapshots import SnapshotRecord, write_snapshot
43 from muse.core.types import blob_id
44 from musehub.types.json_types import JSONObject
45
46
47 _AUTH_CTX = MSignContext(
48 handle="gabriel",
49 identity_id="sha256:" + "0" * 64,
50 is_agent=False,
51 is_admin=True,
52 )
53
54 # Small enough to be fast, large enough to exercise batching
55 _N_FILES = 30
56 _N_COMMITS = 15
57 _FILES_CHANGED = 3
58 _BLOB_SIZE = 256
59
60
61 # ── fixtures ───────────────────────────────────────────────────────────────
62
63 @pytest_asyncio.fixture()
64 async def client(db_session: AsyncSession) -> None:
65 async def _override_get_db() -> None:
66 yield db_session
67
68 app.dependency_overrides[get_db] = _override_get_db
69 app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX
70 app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX
71
72 async with AsyncClient(
73 transport=ASGITransport(app=app),
74 base_url="https://localhost:1337",
75 ) as c:
76 yield c
77
78 app.dependency_overrides.clear()
79
80
81 @pytest_asyncio.fixture()
82 async def repo(client: AsyncClient) -> None:
83 resp = await client.post(
84 "/api/repos",
85 json={"owner": "gabriel", "name": "mpack-index-test", "visibility": "public", "initialize": False},
86 )
87 assert resp.status_code in (200, 201), resp.text
88 data = resp.json()
89 yield data["slug"]
90 await client.delete(f"/api/repos/{data['repoId']}")
91
92
93 def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]:
94 """Return (repo_path, head_commit_id, mpack_dict)."""
95 tmp.mkdir(parents=True, exist_ok=True)
96 dot = muse_dir(tmp)
97 dot.mkdir()
98 (dot / "repo.json").write_text('{"repo_id":"idx-test","owner":"gabriel"}')
99 for d in ("commits", "snapshots", "objects"):
100 (dot / d).mkdir()
101 (dot / "refs" / "heads").mkdir(parents=True)
102 (dot / "HEAD").write_text("ref: refs/heads/main\n")
103 (dot / "config.toml").write_text("")
104
105 blob_ids: list[str] = []
106 for i in range(_N_FILES):
107 data = f"base-{i:04d}".encode() + b"x" * _BLOB_SIZE
108 oid = blob_id(data)
109 write_object(tmp, oid, data)
110 blob_ids.append(oid)
111
112 base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)}
113 parent = None
114 tip = ""
115 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
116
117 for i in range(_N_COMMITS):
118 manifest = dict(base_manifest)
119 for j in range(_FILES_CHANGED):
120 idx = (i * _FILES_CHANGED + j) % _N_FILES
121 raw = f"c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE
122 oid = blob_id(raw)
123 write_object(tmp, oid, raw)
124 manifest[f"src/file_{idx:04d}.py"] = oid
125
126 sid = compute_snapshot_id(manifest)
127 write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest))
128 msg = f"commit-{i:05d}"
129 cid = compute_commit_id(
130 parent_ids=[parent] if parent else [],
131 snapshot_id=sid,
132 message=msg,
133 committed_at_iso=ts.isoformat(),
134 author="gabriel",
135 )
136 write_commit(tmp, CommitRecord(
137 commit_id=cid, branch="main",
138 snapshot_id=sid, message=msg, committed_at=ts,
139 parent_commit_id=parent, parent2_commit_id=None,
140 author="gabriel", metadata={}, structured_delta=None,
141 sem_ver_bump="none", breaking_changes=[],
142 agent_id="", model_id="", toolchain_id="",
143 prompt_hash="", signature="", signer_key_id="",
144 ))
145 parent = cid
146 tip = cid
147 ts += datetime.timedelta(seconds=60)
148
149 write_branch_ref(tmp, "main", tip)
150 mpack = build_mpack(tmp, [tip], have=[])
151 return tmp, tip, mpack
152
153
154 async def _push_mpack(
155 client: AsyncClient,
156 repo_slug: str,
157 mpack: JSONObject,
158 head: str,
159 db_session: AsyncSession,
160 ) -> str:
161 """Upload mpack to MinIO and create a mpack.index job row. Returns job_id.
162
163 Does NOT call push/unpack-mpack (that route is now fully synchronous and
164 does not return a job_id). Creates the job row directly so tests can call
165 process_mpack_index_job in isolation.
166 """
167 import httpx as _httpx
168 from datetime import datetime, timezone
169 from musehub.core.genesis import compute_job_id as _compute_job_id
170 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
171
172 repo_row = (await db_session.execute(
173 select(MusehubRepo).where(MusehubRepo.slug == repo_slug)
174 )).scalar_one()
175 repo_id = repo_row.repo_id
176
177 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
178 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
179 n_objects = len(mpack.get("objects") or [])
180
181 presign_resp = await client.post(
182 f"/gabriel/{repo_slug}/push/mpack-presign",
183 content=msgpack.packb(
184 {"mpack_key": mpack_key, "size_bytes": len(wire_bytes)},
185 use_bin_type=True,
186 ),
187 headers={"Content-Type": "application/x-msgpack"},
188 )
189 assert presign_resp.status_code == 200, presign_resp.text
190 upload_url = presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl")
191
192 async with _httpx.AsyncClient() as raw:
193 put = await raw.put(upload_url, content=wire_bytes)
194 assert put.status_code in (200, 204)
195
196 now = datetime.now(tz=timezone.utc)
197 job_id = _compute_job_id(repo_id, "mpack.index", now.isoformat())
198 db_session.add(MusehubBackgroundJob(
199 job_id=job_id,
200 repo_id=repo_id,
201 job_type="mpack.index",
202 payload={
203 "mpack_key": mpack_key,
204 "pusher_id": "sha256:" + "0" * 64,
205 "branch": "main",
206 "head": head,
207 "force": False,
208 "declared_objects_count": n_objects,
209 },
210 status="pending",
211 created_at=now,
212 attempt=0,
213 ))
214 await db_session.flush()
215 return job_id
216
217
218 # ── Phase 1 tests ──────────────────────────────────────────────────────────
219
220 @pytest.mark.asyncio
221 async def test_objects_written_to_minio_after_job(
222 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
223 ) -> None:
224 """After mpack.index runs: every object blob is retrievable from MinIO by object_id.
225
226 This is the load-bearing test. Fetch/clone/pull are all blocked until
227 objects are at their sha256 addresses in storage.
228 """
229 _, head, mpack = _make_repo(tmp_path / "repo")
230 job_id = await _push_mpack(client, repo, mpack, head, db_session)
231
232 from musehub.services.musehub_wire import process_mpack_index_job
233 result = await process_mpack_index_job(db_session, job_id)
234 await db_session.commit()
235
236 raw_objects = mpack.get("objects") or []
237 assert raw_objects, "mpack must contain objects"
238
239 # Phase 2: objects are stored via mpack URI (no individual MinIO writes).
240 # Verify musehub_objects rows exist and storage_uri points to the mpack.
241 all_oids = [obj["object_id"] for obj in raw_objects]
242 rows = (await db_session.execute(
243 select(MusehubObject).where(MusehubObject.object_id.in_(all_oids))
244 )).scalars().all()
245 row_map = {r.object_id: r for r in rows}
246
247 missing = [oid for oid in all_oids if oid not in row_map]
248 assert not missing, (
249 f"{len(missing)}/{len(raw_objects)} objects have no musehub_objects row after job: "
250 f"{missing[:3]}"
251 )
252 bad_uri = [oid for oid in all_oids if not (row_map[oid].storage_uri or "").startswith("mpack://")]
253 assert not bad_uri, (
254 f"{len(bad_uri)} objects have unexpected storage_uri (expected mpack://...): "
255 + str({oid: row_map[oid].storage_uri for oid in bad_uri[:3]})
256 )
257
258
259 @pytest.mark.asyncio
260 async def test_musehub_objects_rows_after_job(
261 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
262 ) -> None:
263 """After mpack.index runs: musehub_objects has one row per object."""
264 _, head, mpack = _make_repo(tmp_path / "repo")
265 job_id = await _push_mpack(client, repo, mpack, head, db_session)
266
267 from musehub.services.musehub_wire import process_mpack_index_job
268 result = await process_mpack_index_job(db_session, job_id)
269 await db_session.commit()
270
271 raw_objects = mpack.get("objects") or []
272 all_oids = [obj["object_id"] for obj in raw_objects]
273
274 rows = (await db_session.execute(
275 select(MusehubObject).where(MusehubObject.object_id.in_(all_oids))
276 )).scalars().all()
277
278 assert len(rows) == len(all_oids), (
279 f"expected {len(all_oids)} musehub_objects rows, got {len(rows)}"
280 )
281 assert result["objects_written"] == len(all_oids), result
282
283
284 @pytest.mark.asyncio
285 async def test_object_refs_after_job(
286 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
287 ) -> None:
288 """After mpack.index runs: musehub_object_refs links every object to this repo."""
289 _, head, mpack = _make_repo(tmp_path / "repo")
290 job_id = await _push_mpack(client, repo, mpack, head, db_session)
291
292 from musehub.services.musehub_wire import process_mpack_index_job
293 result = await process_mpack_index_job(db_session, job_id)
294 await db_session.commit()
295
296 repo_row = (await db_session.execute(
297 select(MusehubRepo).where(MusehubRepo.slug == repo)
298 )).scalar_one()
299
300 ref_count = (await db_session.execute(
301 select(func.count()).select_from(MusehubObjectRef).where(
302 MusehubObjectRef.repo_id == repo_row.repo_id
303 )
304 )).scalar_one()
305
306 raw_objects = mpack.get("objects") or []
307 assert ref_count == len(raw_objects), (
308 f"expected {len(raw_objects)} object_refs rows, got {ref_count}"
309 )
310 assert result["object_refs_written"] == len(raw_objects), result
311
312
313 @pytest.mark.asyncio
314 async def test_no_per_object_sha256_verification(
315 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
316 ) -> None:
317 """mpack.index must not call hashlib.sha256 on individual object content.
318
319 sha256(wire_bytes) == mpack_key already authenticated every byte in the
320 mpack. Re-hashing each object is checking the work of the hash.
321
322 The mpack-level sha256 is called once on the sync path (unpack-mpack).
323 After that: trust the math.
324 """
325 _, head, mpack = _make_repo(tmp_path / "repo")
326 job_id = await _push_mpack(client, repo, mpack, head, db_session)
327
328 import hashlib as _hashlib
329 sha256_calls_on_large_data: list[int] = []
330 _real_sha256 = _hashlib.sha256
331
332 # Objects are ~300 bytes each. MPack wire_bytes is much larger.
333 # Any sha256 call on data >= 256 bytes inside the job is re-verifying objects.
334 _OBJECT_SIZE_THRESHOLD = 256
335
336 def _spy_sha256(data: bytes | bytearray = b"", *args: typing.Any, **kwargs: typing.Any) -> bytes:
337 if isinstance(data, (bytes, bytearray)) and len(data) >= _OBJECT_SIZE_THRESHOLD:
338 sha256_calls_on_large_data.append(len(data))
339 return _real_sha256(data, *args, **kwargs)
340
341 import musehub.services.musehub_wire as _wire_mod
342 from unittest.mock import patch
343
344 with patch.object(_wire_mod, "hashlib", wraps=_wire_mod.hashlib) as mock_hl:
345 mock_hl.sha256 = _spy_sha256
346 from musehub.services.musehub_wire import process_mpack_index_job
347 await process_mpack_index_job(db_session, job_id)
348 await db_session.commit()
349
350 assert not sha256_calls_on_large_data, (
351 f"mpack.index called sha256 on object-sized data "
352 f"({len(sha256_calls_on_large_data)} times, sizes: {sha256_calls_on_large_data[:5]}) — "
353 "this is checking the work of the mpack hash. Trust the math."
354 )
355
356
357 @pytest.mark.asyncio
358 async def test_job_is_idempotent(
359 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
360 ) -> None:
361 """Running mpack.index twice produces identical DB state to running it once.
362
363 Worker crashes, retries, and duplicate enqueues must all be safe.
364 """
365 _, head, mpack = _make_repo(tmp_path / "repo")
366 job_id = await _push_mpack(client, repo, mpack, head, db_session)
367
368 from musehub.services.musehub_wire import process_mpack_index_job
369
370 r1 = await process_mpack_index_job(db_session, job_id)
371 await db_session.commit()
372
373 r2 = await process_mpack_index_job(db_session, job_id)
374 await db_session.commit()
375
376 repo_row = (await db_session.execute(
377 select(MusehubRepo).where(MusehubRepo.slug == repo)
378 )).scalar_one()
379
380 obj_count = (await db_session.execute(
381 select(func.count()).select_from(MusehubObjectRef).where(
382 MusehubObjectRef.repo_id == repo_row.repo_id
383 )
384 )).scalar_one()
385
386 raw_objects = mpack.get("objects") or []
387 assert obj_count == len(raw_objects), (
388 f"after two runs: expected {len(raw_objects)} object_refs, got {obj_count} "
389 "(idempotency failure — rows were doubled)"
390 )
391
392
393 @pytest.mark.asyncio
394 async def test_return_dict_includes_counts_and_timing(
395 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
396 ) -> None:
397 """Return dict contains objects_written, object_refs_written, elapsed_ms."""
398 _, head, mpack = _make_repo(tmp_path / "repo")
399 job_id = await _push_mpack(client, repo, mpack, head, db_session)
400
401 from musehub.services.musehub_wire import process_mpack_index_job
402 result = await process_mpack_index_job(db_session, job_id)
403 await db_session.commit()
404
405 assert "objects_written" in result, result
406 assert "object_refs_written" in result, result
407 assert "elapsed_ms" in result, result
408 assert result["objects_written"] > 0, result
409 assert result["object_refs_written"] > 0, result
410 assert result["elapsed_ms"] > 0, result
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 21 days ago