"""TDD — mpack.index job: object writes, object refs, idempotency. The one principle: Content-addressing is a proof, not a label. sha256(wire_bytes) == mpack_key already authenticated every byte in the mpack — including every object's content and its declared ID. Phase 1 invariants: 1. After job runs: every object blob is in MinIO under its sha256 key. 2. After job runs: musehub_objects has one row per object. 3. After job runs: musehub_object_refs has rows for every (repo_id, object_id). 4. No per-object sha256 re-verification — that is checking the work of the hash. 5. Ghost guard passes on stream push immediately after job completes. 6. Job is idempotent — running it twice produces identical DB state. 7. Return dict includes objects_written, object_refs_written, elapsed_ms. """ from __future__ import annotations import datetime import hashlib import pathlib import msgpack import pytest import pytest_asyncio pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from httpx import AsyncClient, ASGITransport from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef, MusehubRepo from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id from musehub.types.json_types import JSONObject _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) # Small enough to be fast, large enough to exercise batching _N_FILES = 30 _N_COMMITS = 15 _FILES_CHANGED = 3 _BLOB_SIZE = 256 # ── fixtures ─────────────────────────────────────────────────────────────── @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "mpack-index-test", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data["slug"] await client.delete(f"/api/repos/{data['repoId']}") def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]: """Return (repo_path, head_commit_id, mpack_dict).""" tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"idx-test","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") blob_ids: list[str] = [] for i in range(_N_FILES): data = f"base-{i:04d}".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(tmp, oid, data) blob_ids.append(oid) base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)} parent = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES raw = f"c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE oid = blob_id(raw) write_object(tmp, oid, raw) manifest[f"src/file_{idx:04d}.py"] = oid sid = compute_snapshot_id(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(tmp, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(tmp, "main", tip) mpack = build_mpack(tmp, [tip], have=[]) return tmp, tip, mpack async def _push_mpack( client: AsyncClient, repo_slug: str, mpack: JSONObject, head: str, db_session: AsyncSession, ) -> str: """Upload mpack to MinIO and create a mpack.index job row. Returns job_id. Does NOT call push/unpack-mpack (that route is now fully synchronous and does not return a job_id). Creates the job row directly so tests can call process_mpack_index_job in isolation. """ import httpx as _httpx from datetime import datetime, timezone from musehub.core.genesis import compute_job_id as _compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob repo_row = (await db_session.execute( select(MusehubRepo).where(MusehubRepo.slug == repo_slug) )).scalar_one() repo_id = repo_row.repo_id wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() n_objects = len(mpack.get("objects") or []) presign_resp = await client.post( f"/gabriel/{repo_slug}/push/mpack-presign", content=msgpack.packb( {"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert presign_resp.status_code == 200, presign_resp.text upload_url = presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl") async with _httpx.AsyncClient() as raw: put = await raw.put(upload_url, content=wire_bytes) assert put.status_code in (200, 204) now = datetime.now(tz=timezone.utc) job_id = _compute_job_id(repo_id, "mpack.index", now.isoformat()) db_session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "pusher_id": "sha256:" + "0" * 64, "branch": "main", "head": head, "force": False, "declared_objects_count": n_objects, }, status="pending", created_at=now, attempt=0, )) await db_session.flush() return job_id # ── Phase 1 tests ────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_objects_written_to_minio_after_job( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """After mpack.index runs: every object blob is retrievable from MinIO by object_id. This is the load-bearing test. Fetch/clone/pull are all blocked until objects are at their sha256 addresses in storage. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() raw_objects = mpack.get("objects") or [] assert raw_objects, "mpack must contain objects" # Phase 2: objects are stored via mpack URI (no individual MinIO writes). # Verify musehub_objects rows exist and storage_uri points to the mpack. all_oids = [obj["object_id"] for obj in raw_objects] rows = (await db_session.execute( select(MusehubObject).where(MusehubObject.object_id.in_(all_oids)) )).scalars().all() row_map = {r.object_id: r for r in rows} missing = [oid for oid in all_oids if oid not in row_map] assert not missing, ( f"{len(missing)}/{len(raw_objects)} objects have no musehub_objects row after job: " f"{missing[:3]}" ) bad_uri = [oid for oid in all_oids if not (row_map[oid].storage_uri or "").startswith("mpack://")] assert not bad_uri, ( f"{len(bad_uri)} objects have unexpected storage_uri (expected mpack://...): " + str({oid: row_map[oid].storage_uri for oid in bad_uri[:3]}) ) @pytest.mark.asyncio async def test_musehub_objects_rows_after_job( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """After mpack.index runs: musehub_objects has one row per object.""" _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() raw_objects = mpack.get("objects") or [] all_oids = [obj["object_id"] for obj in raw_objects] rows = (await db_session.execute( select(MusehubObject).where(MusehubObject.object_id.in_(all_oids)) )).scalars().all() assert len(rows) == len(all_oids), ( f"expected {len(all_oids)} musehub_objects rows, got {len(rows)}" ) assert result["objects_written"] == len(all_oids), result @pytest.mark.asyncio async def test_object_refs_after_job( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """After mpack.index runs: musehub_object_refs links every object to this repo.""" _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() repo_row = (await db_session.execute( select(MusehubRepo).where(MusehubRepo.slug == repo) )).scalar_one() ref_count = (await db_session.execute( select(func.count()).select_from(MusehubObjectRef).where( MusehubObjectRef.repo_id == repo_row.repo_id ) )).scalar_one() raw_objects = mpack.get("objects") or [] assert ref_count == len(raw_objects), ( f"expected {len(raw_objects)} object_refs rows, got {ref_count}" ) assert result["object_refs_written"] == len(raw_objects), result @pytest.mark.asyncio async def test_no_per_object_sha256_verification( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """mpack.index must not call hashlib.sha256 on individual object content. sha256(wire_bytes) == mpack_key already authenticated every byte in the mpack. Re-hashing each object is checking the work of the hash. The mpack-level sha256 is called once on the sync path (unpack-mpack). After that: trust the math. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) import hashlib as _hashlib sha256_calls_on_large_data: list[int] = [] _real_sha256 = _hashlib.sha256 # Objects are ~300 bytes each. MPack wire_bytes is much larger. # Any sha256 call on data >= 256 bytes inside the job is re-verifying objects. _OBJECT_SIZE_THRESHOLD = 256 def _spy_sha256(data: bytes | bytearray = b"", *args: typing.Any, **kwargs: typing.Any) -> bytes: if isinstance(data, (bytes, bytearray)) and len(data) >= _OBJECT_SIZE_THRESHOLD: sha256_calls_on_large_data.append(len(data)) return _real_sha256(data, *args, **kwargs) import musehub.services.musehub_wire as _wire_mod from unittest.mock import patch with patch.object(_wire_mod, "hashlib", wraps=_wire_mod.hashlib) as mock_hl: mock_hl.sha256 = _spy_sha256 from musehub.services.musehub_wire import process_mpack_index_job await process_mpack_index_job(db_session, job_id) await db_session.commit() assert not sha256_calls_on_large_data, ( f"mpack.index called sha256 on object-sized data " f"({len(sha256_calls_on_large_data)} times, sizes: {sha256_calls_on_large_data[:5]}) — " "this is checking the work of the mpack hash. Trust the math." ) @pytest.mark.asyncio async def test_job_is_idempotent( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Running mpack.index twice produces identical DB state to running it once. Worker crashes, retries, and duplicate enqueues must all be safe. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job r1 = await process_mpack_index_job(db_session, job_id) await db_session.commit() r2 = await process_mpack_index_job(db_session, job_id) await db_session.commit() repo_row = (await db_session.execute( select(MusehubRepo).where(MusehubRepo.slug == repo) )).scalar_one() obj_count = (await db_session.execute( select(func.count()).select_from(MusehubObjectRef).where( MusehubObjectRef.repo_id == repo_row.repo_id ) )).scalar_one() raw_objects = mpack.get("objects") or [] assert obj_count == len(raw_objects), ( f"after two runs: expected {len(raw_objects)} object_refs, got {obj_count} " "(idempotency failure — rows were doubled)" ) @pytest.mark.asyncio async def test_return_dict_includes_counts_and_timing( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Return dict contains objects_written, object_refs_written, elapsed_ms.""" _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() assert "objects_written" in result, result assert "object_refs_written" in result, result assert "elapsed_ms" in result, result assert result["objects_written"] > 0, result assert result["object_refs_written"] > 0, result assert result["elapsed_ms"] > 0, result