"""TDD — Phase 1: mpack index written for every pushed object (issue #63). PI-1 After process_mpack_index_job, every object in the mpack has a row in musehub_mpack_index with the correct mpack_id (= mpack_key). PI-2 The mpack_id stored matches the mpack_key exactly. PI-3 A second push of different objects adds new rows; existing rows are untouched (on_conflict_do_nothing). PI-4 Objects from different repos are distinct in the global index (content-addressed objects are globally unique by object_id). NOTE: process_mpack_index_job was removed — indexing is now inline during push (musehub_wire_push.py step 7d). These tests will be revisited once the MVP wire protocol and pack index are fully wired in. """ from __future__ import annotations import datetime import hashlib from collections.abc import Mapping import msgpack import pytest pytestmark = pytest.mark.skip(reason="process_mpack_index_job removed — revisit with MWP pack index wiring") from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id from musehub.db import musehub_repo_models as db from musehub.core.genesis import compute_identity_id from musehub.services.musehub_repository import create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_mpack(objects: Mapping[str, bytes]) -> tuple[bytes, str]: """Build a minimal MPack and return (wire_bytes, mpack_key).""" mpack = { "commits": [], "snapshots": [], "objects": [ {"object_id": oid, "content": data} for oid, data in objects.items() ], "branch_heads": {}, } wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() return wire_bytes, mpack_key async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: """Write mpack bytes to storage so process_mpack_index_job can fetch it.""" import musehub.storage.backends as _backends_mod backend = _backends_mod.get_backend() await backend.put_mpack(mpack_key, wire_bytes) async def _enqueue_and_process( session: AsyncSession, repo_id: str, mpack_key: str, n_objects: int, ) -> str: """Enqueue a mpack.index job and run it synchronously. Returns job_id.""" from musehub.core.genesis import compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_wire import process_mpack_index_job now = datetime.datetime.now(datetime.timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "branch": "main", "head": "", "pusher_id": "", "declared_objects_count": n_objects, "declared_commits_count": 0, }, status="pending", created_at=now, attempt=0, )) await session.commit() await process_mpack_index_job(session, job_id) await session.commit() return job_id # --------------------------------------------------------------------------- # PI-1 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi1_mpack_index_written_for_every_object(db_session: AsyncSession) -> None: """Every object in the mpack must have a row in musehub_mpack_index.""" repo = await create_repo( db_session, name="pi-test-1", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objects = {blob_id(f"pi1-obj-{i}".encode()): f"pi1-obj-{i}".encode() for i in range(5)} wire_bytes, mpack_key = _make_mpack(objects) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects)) rows_q = await db_session.execute( select(db.MusehubMPackIndex).where( db.MusehubMPackIndex.entity_id.in_(list(objects.keys())) ) ) rows = rows_q.scalars().all() indexed_oids = {r.entity_id for r in rows} assert indexed_oids == set(objects.keys()), ( f"expected {len(objects)} mpack index rows, got {len(rows)}\n" f"missing: {set(objects.keys()) - indexed_oids}" ) # --------------------------------------------------------------------------- # PI-2 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi2_mpack_id_matches_mpack_key(db_session: AsyncSession) -> None: """The mpack_id stored in musehub_mpack_index must equal the mpack_key.""" repo = await create_repo( db_session, name="pi-test-2", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objects = {blob_id(b"pi2-only-obj"): b"pi2-only-obj"} wire_bytes, mpack_key = _make_mpack(objects) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects)) row = await db_session.get(db.MusehubMPackIndex, (list(objects.keys())[0], mpack_key)) assert row is not None, "mpack index row not found" assert row.mpack_id == mpack_key, f"mpack_id {row.mpack_id!r} != mpack_key {mpack_key!r}" # --------------------------------------------------------------------------- # PI-3 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi3_second_push_adds_rows_without_overwriting(db_session: AsyncSession) -> None: """A second push of different objects adds new rows; first push rows survive.""" repo = await create_repo( db_session, name="pi-test-3", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objs_a = {blob_id(f"pi3-a-{i}".encode()): f"pi3-a-{i}".encode() for i in range(3)} wire_a, key_a = _make_mpack(objs_a) await _store_mpack(wire_a, key_a) await _enqueue_and_process(db_session, repo.repo_id, key_a, len(objs_a)) objs_b = {blob_id(f"pi3-b-{i}".encode()): f"pi3-b-{i}".encode() for i in range(3)} wire_b, key_b = _make_mpack(objs_b) await _store_mpack(wire_b, key_b) await _enqueue_and_process(db_session, repo.repo_id, key_b, len(objs_b)) all_oids = set(objs_a.keys()) | set(objs_b.keys()) rows_q = await db_session.execute( select(db.MusehubMPackIndex).where(db.MusehubMPackIndex.entity_id.in_(list(all_oids))) ) rows = rows_q.scalars().all() indexed_oids = {r.entity_id for r in rows} indexed_mpacks = {r.mpack_id for r in rows} assert set(objs_a.keys()) <= indexed_oids, "first push objects missing from index" assert set(objs_b.keys()) <= indexed_oids, "second push objects missing from index" assert key_a in indexed_mpacks, "first mpack_key missing from index" assert key_b in indexed_mpacks, "second mpack_key missing from index" # --------------------------------------------------------------------------- # PI-4 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pi4_objects_from_different_repos_are_distinct(db_session: AsyncSession) -> None: """Objects pushed to different repos are distinct in the global index (no collisions).""" repo_a = await create_repo( db_session, name="pi-test-4a", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) repo_b = await create_repo( db_session, name="pi-test-4b", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) objs_a = {blob_id(b"pi4-repo-a-obj"): b"pi4-repo-a-obj"} wire_a, key_a = _make_mpack(objs_a) await _store_mpack(wire_a, key_a) await _enqueue_and_process(db_session, repo_a.repo_id, key_a, 1) objs_b = {blob_id(b"pi4-repo-b-obj"): b"pi4-repo-b-obj"} wire_b, key_b = _make_mpack(objs_b) await _store_mpack(wire_b, key_b) await _enqueue_and_process(db_session, repo_b.repo_id, key_b, 1) rows_a = (await db_session.execute( select(db.MusehubMPackIndex).where( db.MusehubMPackIndex.entity_id.in_(list(objs_a.keys())) ) )).scalars().all() rows_b = (await db_session.execute( select(db.MusehubMPackIndex).where( db.MusehubMPackIndex.entity_id.in_(list(objs_b.keys())) ) )).scalars().all() assert {r.entity_id for r in rows_a} == set(objs_a.keys()), "repo_a objects not indexed" assert {r.entity_id for r in rows_b} == set(objs_b.keys()), "repo_b objects not indexed" assert {r.entity_id for r in rows_a}.isdisjoint({r.entity_id for r in rows_b}), ( "object collision — two repos pushed the same object_id (seeds must be unique)" )