"""TDD — Phase 4: mpack GC / consolidation (issue #63). PG-1 process_mpack_gc_job merges all mpack index rows for a repo into a single consolidated mpack and updates the mpack index so every object_id maps to the new consolidated mpack_key. PG-2 After GC, the number of distinct mpack_ids for any repo is ≤ 1 (a freshly-consolidated repo has exactly one mpack). PG-3 The consolidated mpack is a valid msgpack mpack containing all objects from the source mpacks. PG-4 process_mpack_gc_job is idempotent: running it twice produces the same consolidated mpack_key (same content → same sha256 key) and does not create duplicate mpack index rows. PG-5 A repo with ≤ 1 distinct mpack is skipped (no-op) — GC only acts when consolidation would reduce mpack count. PG-6 After GC, wire_fetch_mpack returns exactly 1 mpack_url (the consolidated mpack) for a repo that was previously spread across N mpacks. """ from __future__ import annotations import datetime import hashlib import msgpack import pytest from sqlalchemy import select, func pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id from musehub.db import musehub_repo_models as db from musehub.core.genesis import compute_identity_id from musehub.services.musehub_repository import create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_mpack( objects: dict[str, bytes], commits: list[dict] | None = None, snapshots: list[dict] | None = None, branch_heads: dict[str, str] | None = None, ) -> tuple[bytes, str]: mpack = { "commits": commits or [], "snapshots": snapshots or [], "objects": [ {"object_id": oid, "content": data} for oid, data in objects.items() ], "branch_heads": branch_heads or {}, } wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() return wire_bytes, mpack_key async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: import musehub.storage.backends as _backends_mod backend = _backends_mod.get_backend() await backend.put_mpack(mpack_key, wire_bytes) async def _push_and_index( session: AsyncSession, repo_id: str, objects: dict[str, bytes], commits: list[dict] | None = None, snapshots: list[dict] | None = None, branch_heads: dict[str, str] | None = None, ) -> str: """Push an mpack and run process_mpack_index_job. Returns mpack_key.""" from musehub.core.genesis import compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_wire import process_mpack_index_job wire_bytes, mpack_key = _make_mpack(objects, commits, snapshots, branch_heads) await _store_mpack(wire_bytes, mpack_key) now = datetime.datetime.now(datetime.timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "branch": "main", "head": (commits or [{}])[-1].get("commit_id", ""), "pusher_id": "", "declared_objects_count": len(objects), "declared_commits_count": len(commits or []), }, status="pending", created_at=now, attempt=0, )) await session.commit() await process_mpack_index_job(session, job_id) await session.commit() return mpack_key def _make_commit_chain(n: int, seed: str, parent_tip: str | None = None) -> tuple[list[dict], list[dict], str, dict[str, str]]: """Return (commits, snapshots, tip_commit_id, objects_dict).""" objects: dict[str, bytes] = {} commits = [] snapshots = [] parent_id = parent_tip for i in range(n): oid = blob_id(f"{seed}-obj-{i}".encode()) objects[oid] = f"{seed}-obj-{i}".encode() snap_id = blob_id(f"{seed}-snap-{i}".encode()) snapshots.append({ "snapshot_id": snap_id, "parent_snapshot_id": None, "delta_upsert": {f"file_{i}.txt": oid}, "delta_remove": [], }) cid = blob_id(f"{seed}-commit-{i}-p={parent_id}".encode()) commits.append({ "commit_id": cid, "branch": "main", "message": f"commit {i}", "author": "gabriel", "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(), "parent_commit_id": parent_id, "parent2_commit_id": None, "snapshot_id": snap_id, "agent_id": "", "model_id": "", "toolchain_id": "", "sem_ver_bump": "none", "breaking_changes": [], "signature": "", "signer_key_id": "", "signer_public_key": "", "prompt_hash": "", }) parent_id = cid return commits, snapshots, parent_id, objects # type: ignore[return-value] async def _repo_pack_oids(session: AsyncSession, repo_id: str) -> list[str]: """Return object_ids that belong to this repo (via MusehubObjectRef).""" q = await session.execute( select(db.MusehubObjectRef.object_id).where(db.MusehubObjectRef.repo_id == repo_id) ) return [row[0] for row in q] async def _distinct_mpack_count(session: AsyncSession, repo_id: str) -> int: """Count distinct mpacks that cover this repo's objects.""" oid_set = await _repo_pack_oids(session, repo_id) if not oid_set: return 0 result = await session.execute( select(db.MusehubMPackIndex.mpack_id) .where(db.MusehubMPackIndex.entity_id.in_(oid_set)) .where(db.MusehubMPackIndex.entity_type == "object") .distinct() ) return len(result.all()) # --------------------------------------------------------------------------- # PG-1 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pg1_gc_consolidates_multiple_mpacks(db_session: AsyncSession) -> None: """process_mpack_gc_job merges N mpacks into one and updates the mpack index.""" from musehub.services.musehub_wire import process_mpack_gc_job repo = await create_repo( db_session, name="pg-test-1", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) # Three separate pushes → three mpacks commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(2, "pg1a") await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, branch_heads={"main": tip_a}) commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(2, "pg1b", parent_tip=tip_a) await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, branch_heads={"main": tip_b}) commits_c, snaps_c, tip_c, objs_c = _make_commit_chain(2, "pg1c", parent_tip=tip_b) await _push_and_index(db_session, repo.repo_id, objs_c, commits_c, snaps_c, branch_heads={"main": tip_c}) before = await _distinct_mpack_count(db_session, repo.repo_id) assert before == 3, f"expected 3 mpacks before GC, got {before}" result = await process_mpack_gc_job(db_session, repo.repo_id) await db_session.commit() assert result["packs_before"] == 3 assert result["packs_after"] == 1 assert result["consolidated_key"], "no consolidated mpack_key returned" # --------------------------------------------------------------------------- # PG-2 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pg2_gc_leaves_exactly_one_mpack(db_session: AsyncSession) -> None: """After GC, distinct mpack_id count for the repo is exactly 1.""" from musehub.services.musehub_wire import process_mpack_gc_job repo = await create_repo( db_session, name="pg-test-2", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) for seed in ("pg2a", "pg2b", "pg2c", "pg2d"): commits, snaps, tip, objs = _make_commit_chain(2, seed) await _push_and_index(db_session, repo.repo_id, objs, commits, snaps, branch_heads={"main": tip}) await process_mpack_gc_job(db_session, repo.repo_id) await db_session.commit() after = await _distinct_mpack_count(db_session, repo.repo_id) assert after == 1, f"expected 1 mpack after GC, got {after}" # --------------------------------------------------------------------------- # PG-3 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pg3_consolidated_mpack_contains_all_objects(db_session: AsyncSession) -> None: """The consolidated mpack stored in MinIO is a valid msgpack mpack with all objects.""" import musehub.storage.backends as _backends_mod from musehub.services.musehub_wire import process_mpack_gc_job repo = await create_repo( db_session, name="pg-test-3", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) all_oids: set[str] = set() commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, "pg3a") await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, branch_heads={"main": tip_a}) all_oids.update(objs_a) commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, "pg3b", parent_tip=tip_a) await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, branch_heads={"main": tip_b}) all_oids.update(objs_b) result = await process_mpack_gc_job(db_session, repo.repo_id) await db_session.commit() consolidated_key = result["consolidated_key"] backend = _backends_mod.get_backend() mpack_bytes = await backend.get_mpack(consolidated_key) assert mpack_bytes, "consolidated mpack not found in storage" if mpack_bytes[:4] == b"MUSE": from muse.core.mpack import parse_wire_mpack as _parse_wm mpack = _parse_wm(mpack_bytes) else: mpack = msgpack.unpackb(mpack_bytes, raw=False) stored_oids = {obj["object_id"] for obj in mpack.get("objects", [])} assert all_oids == stored_oids, ( f"missing objects in consolidated mpack: {all_oids - stored_oids}" ) # --------------------------------------------------------------------------- # PG-4 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pg4_gc_is_idempotent(db_session: AsyncSession) -> None: """Running GC twice produces the same consolidated_key and no duplicate rows.""" from musehub.services.musehub_wire import process_mpack_gc_job repo = await create_repo( db_session, name="pg-test-4", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(2, "pg4a") await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, branch_heads={"main": tip_a}) commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(2, "pg4b", parent_tip=tip_a) await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, branch_heads={"main": tip_b}) result1 = await process_mpack_gc_job(db_session, repo.repo_id) await db_session.commit() result2 = await process_mpack_gc_job(db_session, repo.repo_id) await db_session.commit() assert result1["consolidated_key"] == result2["consolidated_key"], ( "second GC produced a different key — consolidation is not content-addressed" ) # No duplicate rows — each (object_id, mpack_id) pair must be unique. # Query via object refs since mpack index is global. oid_set = await _repo_pack_oids(db_session, repo.repo_id) total_q = await db_session.execute( select(func.count()).select_from(db.MusehubMPackIndex) .where(db.MusehubMPackIndex.entity_id.in_(oid_set)) .where(db.MusehubMPackIndex.entity_type == "object") ) total = total_q.scalar_one() all_oids = len(objs_a) + len(objs_b) assert total == all_oids, f"expected {all_oids} rows (no dups), got {total}" # --------------------------------------------------------------------------- # PG-5 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pg5_gc_skips_single_mpack_repo(db_session: AsyncSession) -> None: """GC is a no-op when the repo already has ≤ 1 mpack.""" from musehub.services.musehub_wire import process_mpack_gc_job repo = await create_repo( db_session, name="pg-test-5", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) commits, snaps, tip, objs = _make_commit_chain(3, "pg5") await _push_and_index(db_session, repo.repo_id, objs, commits, snaps, branch_heads={"main": tip}) result = await process_mpack_gc_job(db_session, repo.repo_id) await db_session.commit() assert result["skipped"] is True, "expected GC to be skipped for single-mpack repo" assert result.get("packs_before", 1) <= 1 after = await _distinct_mpack_count(db_session, repo.repo_id) assert after == 1 # --------------------------------------------------------------------------- # PG-6 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pg6_fetch_works_after_gc_and_index_converges(db_session: AsyncSession) -> None: """After GC, wire_fetch_mpack still works and MPackIndex collapses to 1 covering mpack. Pre-GC: two pushes → two source mpacks in MPackIndex. Post-GC: consolidated mpack → all objects point to same mpack_id. wire_fetch_mpack must assemble a correct fetch mpack in both cases. """ from musehub.services.musehub_wire import process_mpack_gc_job, wire_fetch_mpack repo = await create_repo( db_session, name="pg-test-6", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, "pg6a") await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, branch_heads={"main": tip_a}) commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, "pg6b", parent_tip=tip_a) await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, branch_heads={"main": tip_b}) # Before GC: 2 distinct covering mpacks in MPackIndex for this repo's objects assert await _distinct_mpack_count(db_session, repo.repo_id) == 2, ( "expected 2 distinct mpacks in MPackIndex before GC" ) # Before GC: fetch must already work before_result = await wire_fetch_mpack( db_session, repo.repo_id, want=[tip_b], have=[], ttl_seconds=3600, ) assert before_result.get("mpack_url"), "fetch must return mpack_url before GC" assert before_result["object_count"] >= 1, "fetch must return at least one object before GC" before_object_count = before_result["object_count"] await process_mpack_gc_job(db_session, repo.repo_id) await db_session.commit() # After GC: exactly 1 distinct covering mpack in MPackIndex assert await _distinct_mpack_count(db_session, repo.repo_id) == 1, ( "expected 1 distinct mpack in MPackIndex after GC" ) # After GC: fetch must still work and return the same objects after_result = await wire_fetch_mpack( db_session, repo.repo_id, want=[tip_b], have=[], ttl_seconds=3600, ) assert after_result.get("mpack_url"), "fetch must return mpack_url after GC" assert after_result["object_count"] == before_object_count, ( f"object count changed after GC: {before_object_count} → {after_result['object_count']}" )