gabriel / musehub public
test_mpack_validation_phase2.py python
490 lines 18.8 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 21 days ago
1 """TDD — Phase 2: mpack content validation in the background job.
2
3 Phase 2 invariants (issue #49):
4 2a. Decompression size guard (zip bomb) — MPackValidationError when cumulative
5 decompressed bytes exceeds settings.mpack_max_decompressed_bytes.
6 2b. Object count mismatch — MPackValidationError when actual count differs
7 from declared_objects_count (stored in job payload) by more than 10.
8 2c. Per-object sha256 verification — MPackValidationError when
9 sha256(decompressed_content) != object_id.
10 2d. Quarantine state — on any validation failure: no objects written to MinIO,
11 no commits/snapshots written to DB, job status becomes 'quarantined'.
12
13 All validation runs before any DB or MinIO writes so a failed job leaves no
14 partial state to clean up.
15 """
16 from __future__ import annotations
17
18 import copy
19 import datetime
20 import hashlib
21 import pathlib
22
23 import msgpack
24 import pytest
25 import pytest_asyncio
26
27 pytestmark = pytest.mark.skip(reason="muse wire protocol in flux")
28 from httpx import AsyncClient, ASGITransport
29 from sqlalchemy import select
30 from sqlalchemy.ext.asyncio import AsyncSession
31 from unittest.mock import patch
32
33 from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request
34 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
35 from musehub.db.musehub_repo_models import MusehubCommit
36 from musehub.db.database import get_db
37 from musehub.main import app
38
39 from muse.core.object_store import write_object
40 from muse.core.mpack import build_mpack
41 from muse.core.paths import muse_dir
42 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
43 from muse.core.commits import CommitRecord, write_commit
44 from muse.core.refs import write_branch_ref
45 from muse.core.snapshots import SnapshotRecord, write_snapshot
46 from muse.core.types import blob_id
47 from musehub.types.json_types import JSONObject
48
49
50 _AUTH_CTX = MSignContext(
51 handle="gabriel",
52 identity_id="sha256:" + "0" * 64,
53 is_agent=False,
54 is_admin=True,
55 )
56
57 _N_FILES = 8
58 _N_COMMITS = 4
59 _FILES_CHANGED = 2
60 _BLOB_SIZE = 128
61
62
63 # ── fixtures ────────────────────────────────────────────────────────────────
64
65 @pytest_asyncio.fixture()
66 async def client(db_session: AsyncSession) -> None:
67 async def _override_get_db() -> None:
68 yield db_session
69
70 app.dependency_overrides[get_db] = _override_get_db
71 app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX
72 app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX
73
74 async with AsyncClient(
75 transport=ASGITransport(app=app),
76 base_url="https://localhost:1337",
77 ) as c:
78 yield c
79
80 app.dependency_overrides.clear()
81
82
83 @pytest_asyncio.fixture()
84 async def repo(client: AsyncClient) -> None:
85 resp = await client.post(
86 "/api/repos",
87 json={"owner": "gabriel", "name": "phase2-validation-test", "visibility": "public", "initialize": False},
88 )
89 assert resp.status_code in (200, 201), resp.text
90 data = resp.json()
91 yield data["slug"]
92 await client.delete(f"/api/repos/{data['repoId']}")
93
94
95 def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]:
96 tmp.mkdir(parents=True, exist_ok=True)
97 dot = muse_dir(tmp)
98 dot.mkdir()
99 (dot / "repo.json").write_text('{"repo_id":"phase2-test","owner":"gabriel"}')
100 for d in ("commits", "snapshots", "objects"):
101 (dot / d).mkdir()
102 (dot / "refs" / "heads").mkdir(parents=True)
103 (dot / "HEAD").write_text("ref: refs/heads/main\n")
104 (dot / "config.toml").write_text("")
105
106 blob_ids: list[str] = []
107 for i in range(_N_FILES):
108 data = f"base-{i:04d}".encode() + b"x" * _BLOB_SIZE
109 oid = blob_id(data)
110 write_object(tmp, oid, data)
111 blob_ids.append(oid)
112
113 base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)}
114 parent = None
115 tip = ""
116 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
117
118 for i in range(_N_COMMITS):
119 manifest = dict(base_manifest)
120 for j in range(_FILES_CHANGED):
121 idx = (i * _FILES_CHANGED + j) % _N_FILES
122 raw = f"c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE
123 oid = blob_id(raw)
124 write_object(tmp, oid, raw)
125 manifest[f"src/file_{idx:04d}.py"] = oid
126
127 sid = compute_snapshot_id(manifest)
128 write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest))
129 msg = f"commit-{i:05d}"
130 cid = compute_commit_id(
131 parent_ids=[parent] if parent else [],
132 snapshot_id=sid,
133 message=msg,
134 committed_at_iso=ts.isoformat(),
135 author="gabriel",
136 )
137 write_commit(tmp, CommitRecord(
138 commit_id=cid, branch="main",
139 snapshot_id=sid, message=msg, committed_at=ts,
140 parent_commit_id=parent, parent2_commit_id=None,
141 author="gabriel", metadata={}, structured_delta=None,
142 sem_ver_bump="none", breaking_changes=[],
143 agent_id="", model_id="", toolchain_id="",
144 prompt_hash="", signature="", signer_key_id="",
145 ))
146 parent = cid
147 tip = cid
148 ts += datetime.timedelta(seconds=60)
149
150 write_branch_ref(tmp, "main", tip)
151 mpack = build_mpack(tmp, [tip], have=[])
152 return tmp, tip, mpack
153
154
155 async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession) -> str:
156 """Upload mpack to MinIO and create a mpack.index job row. Returns job_id.
157
158 Does NOT call push/unpack-mpack so validation tests start with a clean DB
159 (no commits/snapshots pre-written by the synchronous route).
160 """
161 import httpx as _httpx
162 from datetime import datetime, timezone
163 from sqlalchemy import select as _select
164 from musehub.db.musehub_repo_models import MusehubRepo as _Repo
165 from musehub.core.genesis import compute_job_id as _compute_job_id
166
167 repo_row = (await db_session.execute(
168 _select(_Repo).where(_Repo.slug == repo_slug)
169 )).scalar_one()
170 repo_id = repo_row.repo_id
171
172 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
173 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
174 n_objects = len(mpack.get("objects") or [])
175
176 pr = await client.post(
177 f"/gabriel/{repo_slug}/push/mpack-presign",
178 content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True),
179 headers={"Content-Type": "application/x-msgpack"},
180 )
181 assert pr.status_code == 200, pr.text
182 upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl")
183
184 async with _httpx.AsyncClient() as raw:
185 put = await raw.put(upload_url, content=wire_bytes)
186 assert put.status_code in (200, 204)
187
188 now = datetime.now(tz=timezone.utc)
189 job_id = _compute_job_id(repo_id, "mpack.index", now.isoformat())
190 db_session.add(MusehubBackgroundJob(
191 job_id=job_id,
192 repo_id=repo_id,
193 job_type="mpack.index",
194 payload={
195 "mpack_key": mpack_key,
196 "pusher_id": "sha256:" + "0" * 64,
197 "branch": "main",
198 "head": head,
199 "force": False,
200 "declared_objects_count": n_objects,
201 },
202 status="pending",
203 created_at=now,
204 attempt=0,
205 ))
206 await db_session.flush()
207 return job_id
208
209
210 def _make_tampered_mpack(mpack: JSONObject) -> JSONObject:
211 """Return a copy of mpack with one object's content replaced.
212
213 The object_id is preserved — sha256(new_content) != object_id, which
214 must trigger MPackValidationError (2c).
215 """
216 bad = copy.deepcopy(mpack)
217 raw_objects = bad.get("objects") or []
218 if not raw_objects:
219 return bad
220 obj = raw_objects[0]
221 content = obj.get("content") or b""
222 if obj.get("encoding") == "zstd" and content:
223 import zstandard
224 content = zstandard.ZstdDecompressor().decompress(content)
225 obj["content"] = b"TAMPERED_" + content[:32]
226 obj.pop("encoding", None)
227 return bad
228
229
230 # ── Phase 2 tests ───────────────────────────────────────────────────────────
231
232 @pytest.mark.asyncio
233 async def test_hash_mismatch_caught_at_unpack_mpack(
234 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
235 ) -> None:
236 """Mpack-level integrity gate: bytes that don't hash to mpack_key → 422.
237
238 The per-mpack sha256 check lives in wire_push_unpack_mpack, not in the
239 background job. If a client claims mpack_key=sha256:X but the bytes stored
240 in MinIO hash to Y, unpack-mpack rejects with 422 "integrity failure".
241
242 Per-object hash checking was intentionally removed: sha256(wire_bytes) ==
243 mpack_key already authenticates every byte in the mpack — no per-item
244 re-hashing needed.
245 """
246 _, head, mpack = _make_repo(tmp_path / "repo")
247 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
248 real_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
249
250 # PUT the real bytes under the real key
251 from musehub.storage.backends import get_backend as _get_backend
252 backend = _get_backend()
253 await backend.put_mpack(real_key, wire_bytes)
254
255 # Now claim a DIFFERENT key — the hash check will catch the mismatch
256 wrong_key = "sha256:" + "a" * 64
257 await backend.put_mpack(wrong_key, wire_bytes)
258
259 resp = await client.post(
260 f"/gabriel/{repo}/push/unpack-mpack",
261 content=msgpack.packb(
262 {"mpack_key": wrong_key, "branch": "main", "head": head},
263 use_bin_type=True,
264 ),
265 headers={"Content-Type": "application/x-msgpack"},
266 )
267 assert resp.status_code == 422, resp.text
268 assert "integrity" in resp.text.lower() or "sha256" in resp.text.lower()
269
270
271 @pytest.mark.asyncio
272 async def test_zip_bomb_raises_mpack_validation_error(
273 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
274 ) -> None:
275 """Decompression size guard triggers on mpacks that expand beyond the limit.
276
277 Setting mpack_max_decompressed_bytes to 1 byte forces the guard on the
278 first object. The error fires before any DB or MinIO writes.
279 """
280 _, head, mpack = _make_repo(tmp_path / "repo")
281 job_id = await _push_mpack(client, repo, mpack, head, db_session)
282
283 from musehub.config import settings as _real_settings
284
285 class _TinyLimitSettings:
286 mpack_max_decompressed_bytes = 1
287 def __getattr__(self, name: str) -> None:
288 return getattr(_real_settings, name)
289
290 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
291 with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()):
292 with pytest.raises(MPackValidationError, match="decompressed size"):
293 await process_mpack_index_job(db_session, job_id)
294
295
296 @pytest.mark.asyncio
297 async def test_count_mismatch_raises_mpack_validation_error(
298 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
299 ) -> None:
300 """Object count mismatch: declared count > 10 off from actual.
301
302 When the job payload has declared_objects_count and the actual mpack object
303 count differs by more than 10, MPackValidationError is raised before any
304 DB or MinIO writes.
305 """
306 _, head, mpack = _make_repo(tmp_path / "repo")
307 job_id = await _push_mpack(client, repo, mpack, head, db_session)
308
309 # Overwrite the declared count in the job payload with a wildly wrong value
310 job_row = (await db_session.execute(
311 select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id)
312 )).scalar_one()
313 actual_objects = len(mpack.get("objects") or [])
314 new_payload = dict(job_row.payload)
315 new_payload["declared_objects_count"] = actual_objects + 100
316 job_row.payload = new_payload
317 await db_session.flush()
318
319 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
320 with pytest.raises(MPackValidationError, match="object count mismatch"):
321 await process_mpack_index_job(db_session, job_id)
322
323
324 @pytest.mark.asyncio
325 async def test_validation_failure_writes_no_objects_to_minio(
326 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
327 ) -> None:
328 """After a validation failure, backend.put is never called for mpack objects.
329
330 The zip bomb guard fires during decompression — before any backend.put()
331 calls. We verify this by spying on backend.put: no call must occur while
332 the job raises MPackValidationError.
333 """
334 _, head, mpack = _make_repo(tmp_path / "repo")
335 job_id = await _push_mpack(client, repo, mpack, head, db_session)
336
337 from musehub.storage.backends import get_backend as _get_backend
338 from musehub.config import settings as _real_settings
339 backend = _get_backend()
340
341 class _TinyLimitSettings:
342 mpack_max_decompressed_bytes = 1
343 def __getattr__(self, name: str) -> None:
344 return getattr(_real_settings, name)
345
346 put_calls: list[str] = []
347 _real_put = backend.put
348
349 async def _spy_put(oid: str, data: bytes) -> None:
350 put_calls.append(oid)
351 return await _real_put(oid, data)
352
353 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
354 with patch.object(backend, "put", side_effect=_spy_put):
355 with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()):
356 with pytest.raises(MPackValidationError):
357 await process_mpack_index_job(db_session, job_id)
358
359 assert not put_calls, (
360 f"backend.put was called for {len(put_calls)} objects despite validation failure: "
361 f"{[oid[:16] for oid in put_calls[:3]]}"
362 )
363
364
365 @pytest.mark.asyncio
366 async def test_validation_failure_writes_no_commits_to_db(
367 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
368 ) -> None:
369 """After a validation failure, no commits are inserted to the DB.
370
371 Validation runs before all DB writes — snapshots and commits are not
372 inserted when MPackValidationError fires.
373 """
374 _, head, mpack = _make_repo(tmp_path / "repo")
375 job_id = await _push_mpack(client, repo, mpack, head, db_session)
376 raw_commits = mpack.get("commits") or []
377 all_cids = [c["commit_id"] for c in raw_commits if "commit_id" in c]
378 assert all_cids, "mpack must contain commits"
379
380 from musehub.config import settings as _real_settings
381
382 class _TinyLimitSettings:
383 mpack_max_decompressed_bytes = 1
384 def __getattr__(self, name: str) -> None:
385 return getattr(_real_settings, name)
386
387 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
388 with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()):
389 with pytest.raises(MPackValidationError):
390 await process_mpack_index_job(db_session, job_id)
391
392 rows = (await db_session.execute(
393 select(MusehubCommit).where(MusehubCommit.commit_id.in_(all_cids))
394 )).scalars().all()
395 assert not rows, (
396 f"{len(rows)} commit rows were inserted despite validation failure: "
397 f"{[r.commit_id[:16] for r in rows[:3]]}"
398 )
399
400
401 @pytest.mark.asyncio
402 async def test_quarantine_job_sets_status_and_reason(
403 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
404 ) -> None:
405 """quarantine_job sets status='quarantined' and stores the reason.
406
407 After a MPackValidationError the worker calls quarantine_job. The job
408 row must reflect status='quarantined' and quarantine_reason must contain
409 the validation error message.
410 """
411 _, head, mpack = _make_repo(tmp_path / "repo")
412 job_id = await _push_mpack(client, repo, mpack, head, db_session)
413
414 from musehub.services.musehub_jobs import quarantine_job
415
416 reason = "object sha256:deadbeef content does not match declared id"
417 await quarantine_job(db_session, job_id, reason)
418 await db_session.commit()
419
420 db_session.expire_all()
421 job_row = (await db_session.execute(
422 select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id)
423 )).scalar_one()
424
425 assert job_row.status == "quarantined", (
426 f"expected status 'quarantined' after quarantine_job, got '{job_row.status}'"
427 )
428 assert job_row.quarantine_reason is not None, "quarantine_reason must be set"
429 assert reason[:50] in job_row.quarantine_reason, (
430 f"quarantine_reason {job_row.quarantine_reason!r} does not contain the error message"
431 )
432
433
434 @pytest.mark.asyncio
435 async def test_process_mpack_index_job_sets_quarantined_on_validation_error(
436 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
437 ) -> None:
438 """On MPackValidationError, the job row status is set to 'quarantined'.
439
440 process_mpack_index_job marks the job quarantined in-session before
441 raising. The caller must commit to persist the status. This avoids a
442 two-call pattern (raise, then separately quarantine_job) in the worker.
443 """
444 _, head, mpack = _make_repo(tmp_path / "repo")
445 job_id = await _push_mpack(client, repo, mpack, head, db_session)
446
447 from musehub.config import settings as _real_settings
448
449 class _TinyLimitSettings:
450 mpack_max_decompressed_bytes = 1
451 def __getattr__(self, name: str) -> None:
452 return getattr(_real_settings, name)
453
454 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
455 with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()):
456 with pytest.raises(MPackValidationError):
457 await process_mpack_index_job(db_session, job_id)
458
459 await db_session.commit()
460 db_session.expire_all()
461
462 job_row = (await db_session.execute(
463 select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id)
464 )).scalar_one()
465 assert job_row.status == "quarantined", (
466 f"expected job status 'quarantined' after MPackValidationError, got '{job_row.status}'"
467 )
468 assert job_row.quarantine_reason is not None, (
469 "quarantine_reason must be set when process_mpack_index_job catches a validation error"
470 )
471
472
473 @pytest.mark.asyncio
474 async def test_valid_mpack_passes_all_validation_checks(
475 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
476 ) -> None:
477 """Regression: a well-formed mpack passes all Phase 2 checks and is indexed.
478
479 All objects have correct sha256 IDs, decompressed size is within limit,
480 and count matches — process_mpack_index_job must complete normally.
481 """
482 _, head, mpack = _make_repo(tmp_path / "repo")
483 job_id = await _push_mpack(client, repo, mpack, head, db_session)
484
485 from musehub.services.musehub_wire import process_mpack_index_job
486 result = await process_mpack_index_job(db_session, job_id)
487 await db_session.commit()
488
489 assert result["commits_written"] > 0, "expected commits written for a valid mpack"
490 assert result["objects_written"] > 0, "expected objects written for a valid mpack"
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 21 days ago