"""TDD — MPack-based fetch path: wire_fetch_mpack (issue #47). The core principle: content-addressing is a proof, not a label. sha256(mpack_bytes) == mpack_id IS the integrity check for everything inside. No per-object hash loop. No secondary verification. New function: wire_fetch_mpack(session, repo_id, want, have, ttl_seconds) Returns FetchMPackResult: mpack_id — "sha256:" — content-addressed ID of the mpack mpack_url — presigned GET URL for the entire mpack (None when empty) commit_count — number of commits in the delta object_count — number of unique new objects Tests: FB0 Single commit + single object → mpack_id present, mpack_url set. FB1 sha256(mpack_bytes) == mpack_id — the content-addressing proof. FB2 MPack deserializes to expected commits and objects. FB3 have cuts the delta — commits the client already has are excluded. FB4 Empty want list → empty result (mpack_id=None, mpack_url=None). FB5 All objects in the mpack satisfy sha256(content) == object_id (individual proofs). """ from __future__ import annotations import hashlib import msgpack import pytest from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from unittest.mock import AsyncMock from datetime import datetime, timezone from sqlalchemy import select from muse.core.types import blob_id, fake_id from muse.core.mpack import parse_wire_mpack from musehub.db import musehub_repo_models as db from tests.factories import create_repo type ObjectStore = dict[str, bytes] # ── helpers ─────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _stub_backend(monkeypatch: pytest.MonkeyPatch) -> ObjectStore: store: dict[str, bytes] = {} async def _put(oid: str, data: bytes, **_: typing.Any) -> str: store[oid] = data return f"mem://{oid}" async def _get(oid: str) -> bytes | None: return store.get(oid) async def _exists(oid: str, **_: typing.Any) -> bool: return oid in store async def _presign_get(oid: str, ttl: int) -> str: return f"https://minio.test/{oid}?ttl={ttl}" async def _get_mpack(mpack_id: str) -> bytes | None: return store.get(mpack_id) async def _put_mpack(mpack_id: str, data: bytes) -> str: store[mpack_id] = data return f"mem://mpacks/{mpack_id}" async def _presign_mpack_get(mpack_id: str, ttl: int) -> str: return f"https://minio.test/mpacks/{mpack_id}?ttl={ttl}" backend = AsyncMock() backend.put = _put backend.get = _get backend.exists = _exists backend.presign_get = _presign_get backend.get_mpack = _get_mpack backend.put_mpack = _put_mpack backend.presign_mpack_get = _presign_mpack_get backend.supports_presign = True monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_shared.get_backend", lambda: backend) return store async def _store_object( session: AsyncSession, repo_id: str, oid: str, content: bytes, store: dict[str, bytes], ) -> None: # Store content in mock backend (legacy fallback path) store[oid] = content # Build a per-object fake mpack so the MPackIndex path also works fake_mpack_bytes = msgpack.packb( {"blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": []}, use_bin_type=True, ) fake_mpack_id = blob_id(fake_mpack_bytes) store[fake_mpack_id] = fake_mpack_bytes await session.execute( pg_insert(db.MusehubObject) .values( object_id=oid, path="file.dat", size_bytes=len(content), storage_uri=f"mem://{oid}", ) .on_conflict_do_nothing(index_elements=["object_id"]) ) await session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo_id, object_id=oid) .on_conflict_do_nothing() ) await session.execute( pg_insert(db.MusehubMPackIndex) .values(entity_id=oid, mpack_id=fake_mpack_id, entity_type="object") .on_conflict_do_nothing() ) await session.commit() async def _make_commit( session: AsyncSession, repo_id: str, *, manifest: dict[str, str], seed: str = "c1", parent_ids: list[str] | None = None, ) -> tuple[db.MusehubCommit, db.MusehubSnapshot]: snap_id = fake_id(f"snap-{seed}") snap = db.MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), created_at=_now(), ) session.add(snap) await session.execute( pg_insert(db.MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() ) commit_id = fake_id(f"commit-{seed}") commit = db.MusehubCommit( commit_id=commit_id, branch="main", parent_ids=parent_ids or [], message=f"commit {seed}", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) session.add(commit) await session.execute( pg_insert(db.MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.execute( pg_insert(db.MusehubCommitGraph) .values( commit_id=commit_id, parent_ids=parent_ids or [], generation=0, snapshot_id=snap_id, ) .on_conflict_do_nothing() ) # Update branch head branch_q = await session.execute( select(db.MusehubBranch).where( db.MusehubBranch.repo_id == repo_id, db.MusehubBranch.name == "main", ) ) branch = branch_q.scalar_one_or_none() if branch: branch.head_commit_id = commit_id await session.commit() return commit, snap # ══════════════════════════════════════════════════════════════════════════════ # FB0 — single commit + single object → mpack_id and mpack_url returned # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb0_single_commit_returns_mpack( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Single commit with one object → wire_fetch_mpack returns mpack_id and mpack_url.""" from musehub.services.musehub_wire import wire_fetch_mpack store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"hello world content" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw, store) commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"hello.txt": oid}, seed="fb0" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) assert result["mpack_id"].startswith("sha256:"), "mpack_id must be sha256-prefixed" assert result["mpack_url"], "mpack_url must be non-empty" assert result["commit_count"] == 1 assert result["blob_count"] == 1 # ══════════════════════════════════════════════════════════════════════════════ # FB1 — sha256(mpack_bytes) == mpack_id — the proof # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb1_mpack_id_equals_sha256_of_bytes( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """sha256(mpack_bytes) == mpack_id — content-addressing is the integrity check.""" from musehub.services.musehub_wire import wire_fetch_mpack store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"content for proof test" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw, store) commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"proof.bin": oid}, seed="fb1" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) actual_bytes = store[result["mpack_id"]] expected_id = blob_id(actual_bytes) assert result["mpack_id"] == expected_id, ( f"mpack_id {result['mpack_id']!r} != blob_id(stored_bytes) {expected_id!r}" ) # ══════════════════════════════════════════════════════════════════════════════ # FB2 — mpack deserializes with expected commits and objects # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb2_mpack_contains_commits_and_objects( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """MPack bytes deserialize to a dict with commits and objects matching what was pushed.""" from musehub.services.musehub_wire import wire_fetch_mpack store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raws = [f"object-{i}".encode() for i in range(3)] oids = [blob_id(r) for r in raws] for oid, raw in zip(oids, raws): await _store_object(db_session, repo.repo_id, oid, raw, store) manifest = {f"file{i}.bin": oid for i, oid in enumerate(oids)} commit, _ = await _make_commit( db_session, repo.repo_id, manifest=manifest, seed="fb2" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) mpack = parse_wire_mpack(store[result["mpack_id"]]) assert "commits" in mpack, "mpack must contain 'commits'" assert "blobs" in mpack, "mpack must contain 'objects'" mpack_commit_ids = {c["commit_id"] for c in mpack["commits"]} assert commit.commit_id in mpack_commit_ids, "expected commit must be in mpack" mpack_oids = {o["object_id"] for o in mpack["blobs"]} assert set(oids) == mpack_oids, ( f"mpack objects {mpack_oids} != expected {set(oids)}" ) # ══════════════════════════════════════════════════════════════════════════════ # FB3 — have cuts the delta # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb3_have_excludes_known_commits( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Commits in have are excluded from the mpack delta.""" from musehub.services.musehub_wire import wire_fetch_mpack store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw1 = b"first commit content" oid1 = blob_id(raw1) await _store_object(db_session, repo.repo_id, oid1, raw1, store) c1, _ = await _make_commit( db_session, repo.repo_id, manifest={"f1.bin": oid1}, seed="fb3-c1" ) raw2 = b"second commit content" oid2 = blob_id(raw2) await _store_object(db_session, repo.repo_id, oid2, raw2, store) c2, _ = await _make_commit( db_session, repo.repo_id, manifest={"f1.bin": oid1, "f2.bin": oid2}, seed="fb3-c2", parent_ids=[c1.commit_id], ) # Client already has c1 — only c2's delta should be in the mpack result = await wire_fetch_mpack( db_session, repo.repo_id, want=[c2.commit_id], have=[c1.commit_id] ) mpack = parse_wire_mpack(store[result["mpack_id"]]) mpack_commit_ids = {c["commit_id"] for c in mpack["commits"]} assert c2.commit_id in mpack_commit_ids, "new commit must be in mpack" assert c1.commit_id not in mpack_commit_ids, "known commit must be excluded by have" assert result["commit_count"] == 1 mpack_oids = {o["object_id"] for o in mpack["blobs"]} assert oid1 not in mpack_oids, "object from have-side commit must be excluded" assert oid2 in mpack_oids, "new object must be included" # ══════════════════════════════════════════════════════════════════════════════ # FB4 — empty want → empty result # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb4_empty_want_returns_empty( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Empty want list → zero commits, zero objects, no mpack produced.""" from musehub.services.musehub_wire import wire_fetch_mpack _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") result = await wire_fetch_mpack( db_session, repo.repo_id, want=[], have=[] ) assert result["commit_count"] == 0 assert result["blob_count"] == 0 assert result["mpack_id"] is None # ══════════════════════════════════════════════════════════════════════════════ # FB5 — per-object integrity (each object's content hashes to its ID) # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb5_object_content_hashes_to_id( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Every object in the mpack has sha256(content) == object_id.""" from musehub.services.musehub_wire import wire_fetch_mpack store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raws = [f"verifiable-content-{i}".encode() for i in range(4)] oids = [blob_id(r) for r in raws] for oid, raw in zip(oids, raws): await _store_object(db_session, repo.repo_id, oid, raw, store) manifest = {f"f{i}.bin": oid for i, oid in enumerate(oids)} commit, _ = await _make_commit( db_session, repo.repo_id, manifest=manifest, seed="fb5" ) result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[] ) mpack = parse_wire_mpack(store[result["mpack_id"]]) for obj in mpack["blobs"]: actual_id = blob_id(obj["content"]) assert actual_id == obj["object_id"], ( f"object integrity failure: sha256(content)={actual_id} != id={obj['object_id']}" ) # ══════════════════════════════════════════════════════════════════════════════ # FB6 — server-side merge commit snapshot is included even when CommitGraph # has snapshot_id=None (defensive fallback via MusehubCommit row) # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb6_merge_commit_snapshot_included_via_commit_fallback( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """wire_fetch_mpack returns the merge snapshot even when MusehubCommitGraph.snapshot_id is NULL. Regression test for the original wire-fetch bug: server-side merge commits were not included in fetch mpack responses because _walk_commit_delta only read snapshot_id from CommitGraph rows (which were NULL before the proposals fix), ignoring the snapshot_id already present on the MusehubCommit row. The fix: wire_fetch_mpack falls back to MusehubCommit.snapshot_id for any commit whose CommitGraph row has snapshot_id=None. """ from musehub.services.musehub_wire import wire_fetch_mpack store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") # Two parent commits (client already has these) raw1 = b"parent-1-content" oid1 = blob_id(raw1) await _store_object(db_session, repo.repo_id, oid1, raw1, store) c1, _ = await _make_commit( db_session, repo.repo_id, manifest={"f1.bin": oid1}, seed="fb6-c1" ) raw2 = b"parent-2-content" oid2 = blob_id(raw2) await _store_object(db_session, repo.repo_id, oid2, raw2, store) c2, _ = await _make_commit( db_session, repo.repo_id, manifest={"f2.bin": oid2}, seed="fb6-c2", parent_ids=[c1.commit_id], ) # Merge commit: has a real snapshot on MusehubCommit, but CommitGraph row # has snapshot_id=None — this is the pre-fix bug state. merge_manifest = {"f1.bin": oid1, "f2.bin": oid2} merge_snap_id = fake_id("fb6-merge-snap") merge_snap = db.MusehubSnapshot( snapshot_id=merge_snap_id, directories=[], manifest_blob=msgpack.packb(merge_manifest, use_bin_type=True), entry_count=len(merge_manifest), created_at=_now(), ) db_session.add(merge_snap) await db_session.execute( pg_insert(db.MusehubSnapshotRef) .values(repo_id=repo.repo_id, snapshot_id=merge_snap_id) .on_conflict_do_nothing() ) merge_commit_id = fake_id("fb6-merge-commit") parent_ids = [c1.commit_id, c2.commit_id] merge_commit = db.MusehubCommit( commit_id=merge_commit_id, branch="main", parent_ids=parent_ids, message="Merge branch 'feat' into 'main'", author="gabriel", timestamp=_now(), snapshot_id=merge_snap_id, # MusehubCommit has it ) db_session.add(merge_commit) await db_session.execute( pg_insert(db.MusehubCommitRef) .values(repo_id=repo.repo_id, commit_id=merge_commit_id) .on_conflict_do_nothing() ) # CommitGraph row with snapshot_id=None — the pre-fix bug state await db_session.execute( pg_insert(db.MusehubCommitGraph) .values( commit_id=merge_commit_id, parent_ids=parent_ids, generation=1, snapshot_id=None, # ← bug: missing snapshot_id ) .on_conflict_do_nothing() ) await db_session.commit() # Client has both parents; needs the merge commit result = await wire_fetch_mpack( db_session, repo.repo_id, want=[merge_commit_id], have=[c1.commit_id, c2.commit_id], ) assert result["mpack_id"] is not None, "mpack must be produced for the merge commit" assert result["commit_count"] == 1 mpack = parse_wire_mpack(store[result["mpack_id"]]) snap_ids_in_mpack = {s["snapshot_id"] for s in mpack.get("snapshots", [])} assert merge_snap_id in snap_ids_in_mpack, ( f"merge snapshot {merge_snap_id!r} must be in mpack snapshots; got {snap_ids_in_mpack}" ) # ══════════════════════════════════════════════════════════════════════════════ # FB7 — server-side merge commit snapshot included when CommitGraph has it # (the post-fix happy path; also ensures no regression) # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_fb7_merge_commit_snapshot_included_via_commit_graph( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """wire_fetch_mpack returns the merge snapshot when CommitGraph.snapshot_id is set. This is the happy path after merge_proposal was fixed to always insert CommitGraph rows with snapshot_id populated. """ from musehub.services.musehub_wire import wire_fetch_mpack store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw1 = b"fb7-parent-1" oid1 = blob_id(raw1) await _store_object(db_session, repo.repo_id, oid1, raw1, store) c1, _ = await _make_commit( db_session, repo.repo_id, manifest={"f1.bin": oid1}, seed="fb7-c1" ) raw2 = b"fb7-parent-2" oid2 = blob_id(raw2) await _store_object(db_session, repo.repo_id, oid2, raw2, store) c2, _ = await _make_commit( db_session, repo.repo_id, manifest={"f2.bin": oid2}, seed="fb7-c2", parent_ids=[c1.commit_id], ) merge_manifest = {"f1.bin": oid1, "f2.bin": oid2} merge_snap_id = fake_id("fb7-merge-snap") merge_snap = db.MusehubSnapshot( snapshot_id=merge_snap_id, directories=[], manifest_blob=msgpack.packb(merge_manifest, use_bin_type=True), entry_count=len(merge_manifest), created_at=_now(), ) db_session.add(merge_snap) await db_session.execute( pg_insert(db.MusehubSnapshotRef) .values(repo_id=repo.repo_id, snapshot_id=merge_snap_id) .on_conflict_do_nothing() ) merge_commit_id = fake_id("fb7-merge-commit") parent_ids = [c1.commit_id, c2.commit_id] merge_commit = db.MusehubCommit( commit_id=merge_commit_id, branch="main", parent_ids=parent_ids, message="Merge branch 'feat' into 'main'", author="gabriel", timestamp=_now(), snapshot_id=merge_snap_id, ) db_session.add(merge_commit) await db_session.execute( pg_insert(db.MusehubCommitRef) .values(repo_id=repo.repo_id, commit_id=merge_commit_id) .on_conflict_do_nothing() ) # CommitGraph row WITH snapshot_id — the post-fix state await db_session.execute( pg_insert(db.MusehubCommitGraph) .values( commit_id=merge_commit_id, parent_ids=parent_ids, generation=1, snapshot_id=merge_snap_id, # ← correct ) .on_conflict_do_nothing() ) await db_session.commit() result = await wire_fetch_mpack( db_session, repo.repo_id, want=[merge_commit_id], have=[c1.commit_id, c2.commit_id], ) assert result["mpack_id"] is not None assert result["commit_count"] == 1 mpack = parse_wire_mpack(store[result["mpack_id"]]) snap_ids_in_mpack = {s["snapshot_id"] for s in mpack.get("snapshots", [])} assert merge_snap_id in snap_ids_in_mpack, ( f"merge snapshot {merge_snap_id!r} must be in mpack snapshots; got {snap_ids_in_mpack}" )