"""TDD — Phase 2: mpack content validation in the background job. Phase 2 invariants (issue #49): 2a. Decompression size guard (zip bomb) — MPackValidationError when cumulative decompressed bytes exceeds settings.mpack_max_decompressed_bytes. 2b. Object count mismatch — MPackValidationError when actual count differs from declared_objects_count (stored in job payload) by more than 10. 2c. Per-object sha256 verification — MPackValidationError when sha256(decompressed_content) != object_id. 2d. Quarantine state — on any validation failure: no objects written to MinIO, no commits/snapshots written to DB, job status becomes 'quarantined'. All validation runs before any DB or MinIO writes so a failed job leaves no partial state to clean up. """ from __future__ import annotations import copy import datetime import hashlib import pathlib import msgpack import pytest import pytest_asyncio pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from httpx import AsyncClient, ASGITransport from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from unittest.mock import patch from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.db.musehub_repo_models import MusehubCommit from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id from musehub.types.json_types import JSONObject _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) _N_FILES = 8 _N_COMMITS = 4 _FILES_CHANGED = 2 _BLOB_SIZE = 128 # ── fixtures ──────────────────────────────────────────────────────────────── @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "phase2-validation-test", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data["slug"] await client.delete(f"/api/repos/{data['repoId']}") def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"phase2-test","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") blob_ids: list[str] = [] for i in range(_N_FILES): data = f"base-{i:04d}".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(tmp, oid, data) blob_ids.append(oid) base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)} parent = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES raw = f"c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE oid = blob_id(raw) write_object(tmp, oid, raw) manifest[f"src/file_{idx:04d}.py"] = oid sid = compute_snapshot_id(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(tmp, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(tmp, "main", tip) mpack = build_mpack(tmp, [tip], have=[]) return tmp, tip, mpack async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession) -> str: """Upload mpack to MinIO and create a mpack.index job row. Returns job_id. Does NOT call push/unpack-mpack so validation tests start with a clean DB (no commits/snapshots pre-written by the synchronous route). """ import httpx as _httpx from datetime import datetime, timezone from sqlalchemy import select as _select from musehub.db.musehub_repo_models import MusehubRepo as _Repo from musehub.core.genesis import compute_job_id as _compute_job_id repo_row = (await db_session.execute( _select(_Repo).where(_Repo.slug == repo_slug) )).scalar_one() repo_id = repo_row.repo_id wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() n_objects = len(mpack.get("objects") or []) pr = await client.post( f"/gabriel/{repo_slug}/push/mpack-presign", content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True), headers={"Content-Type": "application/x-msgpack"}, ) assert pr.status_code == 200, pr.text upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl") async with _httpx.AsyncClient() as raw: put = await raw.put(upload_url, content=wire_bytes) assert put.status_code in (200, 204) now = datetime.now(tz=timezone.utc) job_id = _compute_job_id(repo_id, "mpack.index", now.isoformat()) db_session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "pusher_id": "sha256:" + "0" * 64, "branch": "main", "head": head, "force": False, "declared_objects_count": n_objects, }, status="pending", created_at=now, attempt=0, )) await db_session.flush() return job_id def _make_tampered_mpack(mpack: JSONObject) -> JSONObject: """Return a copy of mpack with one object's content replaced. The object_id is preserved — sha256(new_content) != object_id, which must trigger MPackValidationError (2c). """ bad = copy.deepcopy(mpack) raw_objects = bad.get("objects") or [] if not raw_objects: return bad obj = raw_objects[0] content = obj.get("content") or b"" if obj.get("encoding") == "zstd" and content: import zstandard content = zstandard.ZstdDecompressor().decompress(content) obj["content"] = b"TAMPERED_" + content[:32] obj.pop("encoding", None) return bad # ── Phase 2 tests ─────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_hash_mismatch_caught_at_unpack_mpack( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Mpack-level integrity gate: bytes that don't hash to mpack_key → 422. The per-mpack sha256 check lives in wire_push_unpack_mpack, not in the background job. If a client claims mpack_key=sha256:X but the bytes stored in MinIO hash to Y, unpack-mpack rejects with 422 "integrity failure". Per-object hash checking was intentionally removed: sha256(wire_bytes) == mpack_key already authenticates every byte in the mpack — no per-item re-hashing needed. """ _, head, mpack = _make_repo(tmp_path / "repo") wire_bytes = msgpack.packb(mpack, use_bin_type=True) real_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() # PUT the real bytes under the real key from musehub.storage.backends import get_backend as _get_backend backend = _get_backend() await backend.put_mpack(real_key, wire_bytes) # Now claim a DIFFERENT key — the hash check will catch the mismatch wrong_key = "sha256:" + "a" * 64 await backend.put_mpack(wrong_key, wire_bytes) resp = await client.post( f"/gabriel/{repo}/push/unpack-mpack", content=msgpack.packb( {"mpack_key": wrong_key, "branch": "main", "head": head}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status_code == 422, resp.text assert "integrity" in resp.text.lower() or "sha256" in resp.text.lower() @pytest.mark.asyncio async def test_zip_bomb_raises_mpack_validation_error( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Decompression size guard triggers on mpacks that expand beyond the limit. Setting mpack_max_decompressed_bytes to 1 byte forces the guard on the first object. The error fires before any DB or MinIO writes. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.config import settings as _real_settings class _TinyLimitSettings: mpack_max_decompressed_bytes = 1 def __getattr__(self, name: str) -> None: return getattr(_real_settings, name) from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()): with pytest.raises(MPackValidationError, match="decompressed size"): await process_mpack_index_job(db_session, job_id) @pytest.mark.asyncio async def test_count_mismatch_raises_mpack_validation_error( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Object count mismatch: declared count > 10 off from actual. When the job payload has declared_objects_count and the actual mpack object count differs by more than 10, MPackValidationError is raised before any DB or MinIO writes. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) # Overwrite the declared count in the job payload with a wildly wrong value job_row = (await db_session.execute( select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id) )).scalar_one() actual_objects = len(mpack.get("objects") or []) new_payload = dict(job_row.payload) new_payload["declared_objects_count"] = actual_objects + 100 job_row.payload = new_payload await db_session.flush() from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with pytest.raises(MPackValidationError, match="object count mismatch"): await process_mpack_index_job(db_session, job_id) @pytest.mark.asyncio async def test_validation_failure_writes_no_objects_to_minio( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """After a validation failure, backend.put is never called for mpack objects. The zip bomb guard fires during decompression — before any backend.put() calls. We verify this by spying on backend.put: no call must occur while the job raises MPackValidationError. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.storage.backends import get_backend as _get_backend from musehub.config import settings as _real_settings backend = _get_backend() class _TinyLimitSettings: mpack_max_decompressed_bytes = 1 def __getattr__(self, name: str) -> None: return getattr(_real_settings, name) put_calls: list[str] = [] _real_put = backend.put async def _spy_put(oid: str, data: bytes) -> None: put_calls.append(oid) return await _real_put(oid, data) from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with patch.object(backend, "put", side_effect=_spy_put): with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()): with pytest.raises(MPackValidationError): await process_mpack_index_job(db_session, job_id) assert not put_calls, ( f"backend.put was called for {len(put_calls)} objects despite validation failure: " f"{[oid[:16] for oid in put_calls[:3]]}" ) @pytest.mark.asyncio async def test_validation_failure_writes_no_commits_to_db( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """After a validation failure, no commits are inserted to the DB. Validation runs before all DB writes — snapshots and commits are not inserted when MPackValidationError fires. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) raw_commits = mpack.get("commits") or [] all_cids = [c["commit_id"] for c in raw_commits if "commit_id" in c] assert all_cids, "mpack must contain commits" from musehub.config import settings as _real_settings class _TinyLimitSettings: mpack_max_decompressed_bytes = 1 def __getattr__(self, name: str) -> None: return getattr(_real_settings, name) from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()): with pytest.raises(MPackValidationError): await process_mpack_index_job(db_session, job_id) rows = (await db_session.execute( select(MusehubCommit).where(MusehubCommit.commit_id.in_(all_cids)) )).scalars().all() assert not rows, ( f"{len(rows)} commit rows were inserted despite validation failure: " f"{[r.commit_id[:16] for r in rows[:3]]}" ) @pytest.mark.asyncio async def test_quarantine_job_sets_status_and_reason( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """quarantine_job sets status='quarantined' and stores the reason. After a MPackValidationError the worker calls quarantine_job. The job row must reflect status='quarantined' and quarantine_reason must contain the validation error message. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_jobs import quarantine_job reason = "object sha256:deadbeef content does not match declared id" await quarantine_job(db_session, job_id, reason) await db_session.commit() db_session.expire_all() job_row = (await db_session.execute( select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id) )).scalar_one() assert job_row.status == "quarantined", ( f"expected status 'quarantined' after quarantine_job, got '{job_row.status}'" ) assert job_row.quarantine_reason is not None, "quarantine_reason must be set" assert reason[:50] in job_row.quarantine_reason, ( f"quarantine_reason {job_row.quarantine_reason!r} does not contain the error message" ) @pytest.mark.asyncio async def test_process_mpack_index_job_sets_quarantined_on_validation_error( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """On MPackValidationError, the job row status is set to 'quarantined'. process_mpack_index_job marks the job quarantined in-session before raising. The caller must commit to persist the status. This avoids a two-call pattern (raise, then separately quarantine_job) in the worker. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.config import settings as _real_settings class _TinyLimitSettings: mpack_max_decompressed_bytes = 1 def __getattr__(self, name: str) -> None: return getattr(_real_settings, name) from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with patch("musehub.services.musehub_wire.settings", _TinyLimitSettings()): with pytest.raises(MPackValidationError): await process_mpack_index_job(db_session, job_id) await db_session.commit() db_session.expire_all() job_row = (await db_session.execute( select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id) )).scalar_one() assert job_row.status == "quarantined", ( f"expected job status 'quarantined' after MPackValidationError, got '{job_row.status}'" ) assert job_row.quarantine_reason is not None, ( "quarantine_reason must be set when process_mpack_index_job catches a validation error" ) @pytest.mark.asyncio async def test_valid_mpack_passes_all_validation_checks( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Regression: a well-formed mpack passes all Phase 2 checks and is indexed. All objects have correct sha256 IDs, decompressed size is within limit, and count matches — process_mpack_index_job must complete normally. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() assert result["commits_written"] > 0, "expected commits written for a valid mpack" assert result["objects_written"] > 0, "expected objects written for a valid mpack"