"""TDD — Phase 2: MPack index covers all three entity types. The MusehubMPackIndex table must map every commit, snapshot, and object in a pushed mpack to the mpack_id that contains it. This enables the fetch path to locate covering mpacks for any entity without O(N) individual GET calls. PI-5 After process_mpack_index_job, every commit_id in the mpack has a row in MusehubMPackIndex with entity_type="commit" and the correct mpack_id. PI-6 After process_mpack_index_job, every snapshot_id in the mpack has a row in MusehubMPackIndex with entity_type="snapshot" and the correct mpack_id. PI-7 All three entity types (object, commit, snapshot) are written atomically in one process_mpack_index_job call — a single push indexes everything. PI-8 A second push with different commits and snapshots adds new rows without disturbing the first push's rows (on_conflict_do_nothing idempotency). """ from __future__ import annotations import datetime import hashlib from collections.abc import Mapping import msgpack import pytest from sqlalchemy import select pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from sqlalchemy.ext.asyncio import AsyncSession from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.types import blob_id from musehub.core.genesis import compute_identity_id from musehub.db import musehub_repo_models as db from musehub.services.musehub_repository import create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) def _make_commit_dict( repo_id: str, message: str, snapshot_id: str, parent_ids: list[str] | None = None, ) -> tuple[dict, str]: """Build a minimal commit dict and return (dict, commit_id).""" cid = compute_commit_id( parent_ids=parent_ids or [], snapshot_id=snapshot_id, message=message, committed_at_iso=_DT.isoformat(), author="gabriel", ) return { "commit_id": cid, "repo_id": repo_id, "branch": "main", "snapshot_id": snapshot_id, "parent_commit_id": None, "parent2_commit_id": None, "message": message, "committed_at": _DT.isoformat(), "author": "gabriel", "metadata": {}, "structured_delta": None, "sem_ver_bump": "none", "breaking_changes": [], "agent_id": "", "model_id": "", "toolchain_id": "", "prompt_hash": "", "signature": "", "signer_key_id": "", }, cid def _make_snapshot_dict(manifest: Mapping[str, str]) -> tuple[dict, str]: """Build a minimal snapshot dict and return (dict, snapshot_id).""" sid = compute_snapshot_id(manifest) return { "snapshot_id": sid, "parent_snapshot_id": None, "delta_upsert": manifest, "delta_remove": [], }, sid def _make_full_mpack( repo_id: str, objects: dict[str, bytes], extra_tag: str = "", ) -> tuple[bytes, str, list[str], list[str]]: """Build an mpack with objects, one snapshot, and one commit. Returns (wire_bytes, mpack_key, [commit_id], [snapshot_id]). """ manifest = {f"src/file_{extra_tag}_{k[-8:]}.py": k for k in objects} snap_dict, sid = _make_snapshot_dict(manifest) commit_dict, cid = _make_commit_dict(repo_id, f"commit {extra_tag}", sid) mpack = { "commits": [commit_dict], "snapshots": [snap_dict], "objects": [ {"object_id": oid, "content": content} for oid, content in objects.items() ], } wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() return wire_bytes, mpack_key, [cid], [sid] async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: import musehub.storage.backends as _backends_mod backend = _backends_mod.get_backend() await backend.put_mpack(mpack_key, wire_bytes) async def _enqueue_and_process( session: AsyncSession, repo_id: str, mpack_key: str, n_objects: int, n_commits: int = 1, ) -> None: from musehub.core.genesis import compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_wire import process_mpack_index_job now = datetime.datetime.now(datetime.timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "branch": "main", "head": "", "pusher_id": "", "declared_objects_count": n_objects, "declared_commits_count": n_commits, }, status="pending", created_at=now, attempt=0, )) await session.commit() await process_mpack_index_job(session, job_id) await session.commit() # --------------------------------------------------------------------------- # PI-5 commit_ids are indexed # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi5_commit_ids_indexed_after_push(db_session: AsyncSession) -> None: """Every commit_id in the mpack must appear in MusehubMPackIndex with entity_type='commit'.""" repo = await create_repo( db_session, name="pi5-test", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objects = {blob_id(b"pi5-obj"): b"pi5-obj"} wire_bytes, mpack_key, commit_ids, _ = _make_full_mpack( repo.repo_id, objects, extra_tag="pi5" ) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects)) rows = (await db_session.execute( select(db.MusehubMPackIndex).where( db.MusehubMPackIndex.entity_id.in_(commit_ids), db.MusehubMPackIndex.entity_type == "commit", ) )).scalars().all() assert {r.entity_id for r in rows} == set(commit_ids), ( f"commit_ids not indexed — missing: {set(commit_ids) - {r.entity_id for r in rows}}" ) assert all(r.mpack_id == mpack_key for r in rows), ( "commit index rows have wrong mpack_id" ) # --------------------------------------------------------------------------- # PI-6 snapshot_ids are indexed # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi6_snapshot_ids_indexed_after_push(db_session: AsyncSession) -> None: """Every snapshot_id in the mpack must appear in MusehubMPackIndex with entity_type='snapshot'.""" repo = await create_repo( db_session, name="pi6-test", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objects = {blob_id(b"pi6-obj"): b"pi6-obj"} wire_bytes, mpack_key, _, snapshot_ids = _make_full_mpack( repo.repo_id, objects, extra_tag="pi6" ) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects)) rows = (await db_session.execute( select(db.MusehubMPackIndex).where( db.MusehubMPackIndex.entity_id.in_(snapshot_ids), db.MusehubMPackIndex.entity_type == "snapshot", ) )).scalars().all() assert {r.entity_id for r in rows} == set(snapshot_ids), ( f"snapshot_ids not indexed — missing: {set(snapshot_ids) - {r.entity_id for r in rows}}" ) assert all(r.mpack_id == mpack_key for r in rows), ( "snapshot index rows have wrong mpack_id" ) # --------------------------------------------------------------------------- # PI-7 All three types indexed atomically in one job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi7_all_three_types_indexed_atomically(db_session: AsyncSession) -> None: """One process_mpack_index_job call must index objects, commits, and snapshots.""" repo = await create_repo( db_session, name="pi7-test", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objects = { blob_id(f"pi7-obj-{i}".encode()): f"pi7-obj-{i}".encode() for i in range(3) } wire_bytes, mpack_key, commit_ids, snapshot_ids = _make_full_mpack( repo.repo_id, objects, extra_tag="pi7" ) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects)) all_entity_ids = set(objects.keys()) | set(commit_ids) | set(snapshot_ids) rows = (await db_session.execute( select(db.MusehubMPackIndex).where( db.MusehubMPackIndex.entity_id.in_(list(all_entity_ids)) ) )).scalars().all() indexed = {r.entity_id for r in rows} by_type = {} for r in rows: by_type.setdefault(r.entity_type, set()).add(r.entity_id) assert set(objects.keys()) <= indexed, ( f"object_ids not fully indexed: {set(objects.keys()) - indexed}" ) assert set(commit_ids) <= indexed, ( f"commit_ids not indexed: {set(commit_ids) - indexed}" ) assert set(snapshot_ids) <= indexed, ( f"snapshot_ids not indexed: {set(snapshot_ids) - indexed}" ) assert by_type.get("object", set()) == set(objects.keys()), "wrong entity_type for objects" assert by_type.get("commit", set()) == set(commit_ids), "wrong entity_type for commits" assert by_type.get("snapshot", set()) == set(snapshot_ids), "wrong entity_type for snapshots" # --------------------------------------------------------------------------- # PI-8 Second push adds new rows without overwriting first push # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi8_second_push_indexes_without_overwriting(db_session: AsyncSession) -> None: """A second push adds new commit/snapshot/object rows; first push rows survive.""" repo = await create_repo( db_session, name="pi8-test", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objs_a = {blob_id(f"pi8-a-{i}".encode()): f"pi8-a-{i}".encode() for i in range(2)} wire_a, key_a, cids_a, sids_a = _make_full_mpack(repo.repo_id, objs_a, extra_tag="pi8a") await _store_mpack(wire_a, key_a) await _enqueue_and_process(db_session, repo.repo_id, key_a, len(objs_a)) objs_b = {blob_id(f"pi8-b-{i}".encode()): f"pi8-b-{i}".encode() for i in range(2)} wire_b, key_b, cids_b, sids_b = _make_full_mpack(repo.repo_id, objs_b, extra_tag="pi8b") await _store_mpack(wire_b, key_b) await _enqueue_and_process(db_session, repo.repo_id, key_b, len(objs_b)) all_ids = ( set(objs_a) | set(objs_b) | set(cids_a) | set(cids_b) | set(sids_a) | set(sids_b) ) rows = (await db_session.execute( select(db.MusehubMPackIndex).where( db.MusehubMPackIndex.entity_id.in_(list(all_ids)) ) )).scalars().all() indexed = {r.entity_id for r in rows} mpack_ids_found = {r.mpack_id for r in rows} assert set(objs_a) <= indexed, "first push objects missing after second push" assert set(objs_b) <= indexed, "second push objects missing" assert set(cids_a) <= indexed, "first push commits missing after second push" assert set(cids_b) <= indexed, "second push commits missing" assert set(sids_a) <= indexed, "first push snapshots missing after second push" assert set(sids_b) <= indexed, "second push snapshots missing" assert key_a in mpack_ids_found, "first mpack_key lost from index" assert key_b in mpack_ids_found, "second mpack_key missing from index"