"""TDD — POST /{owner}/{slug}/fetch — Step 1 of the fetch protocol (issue #68). The fetch endpoint is the server side of the three-step fetch flow: 1. POST /{owner}/{slug}/fetch body: {"want": ["sha256:..."], "have": ["sha256:..."]} → {"mpack_url": "...", "mpack_id": "sha256:...", "commit_count": N, "object_count": N} 2. Client GETs mpack bytes directly from MinIO via presigned URL. 3. Client calls apply_mpack() locally. Tests: F01 Happy path — want=[tip], have=[] → 200, mpack_url set, sha256 mpack_id. F02 Up-to-date — want=[tip], have=[tip] → 200, commit_count=0, object_count=0, mpack_url null. F03 Delta — want=[c2], have=[c1] → only new commit + new object in delta. F04 Missing repo → 404. F05 Empty want → 422. F06 Malformed want entry (not sha256:*) → 422. F07 Unknown want commit_id → 404. F08 Objects not in pack_index → 503 with Retry-After header. F09 Private repo without auth → 404. F10 mpack_id == sha256(mpack_bytes) — content-addressing proof survives end-to-end. """ from __future__ import annotations import hashlib import typing from collections.abc import Coroutine, Mapping from datetime import datetime, timezone from unittest.mock import AsyncMock import msgpack import pytest from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.mpack import parse_wire_mpack from muse.core.types import blob_id, fake_id from musehub.db.musehub_repo_models import ( MusehubBranch, MusehubCommit, MusehubCommitGraph, MusehubCommitRef, MusehubMPackIndex, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef, ) from tests.factories import create_repo from musehub.types.json_types import StrDict type _ByteStore = dict[str, bytes] # ── helpers ─────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _stub_backend(monkeypatch: pytest.MonkeyPatch) -> _ByteStore: """Replace MinIO backend with an in-memory dict. Returns the store.""" store: dict[str, bytes] = {} async def _put(oid: str, data: bytes, **_: typing.Any) -> str: store[oid] = data return f"mem://{oid}" async def _get(oid: str) -> bytes | None: return store.get(oid) async def _exists(oid: str, **_: typing.Any) -> bool: return oid in store async def _presign_get(oid: str, ttl_seconds: int = 3600) -> str: return f"https://minio.test/mpacks/{oid}?ttl={ttl_seconds}" async def _get_mpack(mpack_id: str) -> bytes | None: return store.get(mpack_id) async def _put_mpack(mpack_id: str, data: bytes) -> str: store[mpack_id] = data return f"mem://mpacks/{mpack_id}" async def _presign_mpack_get(mpack_id: str, ttl: int) -> str: return f"https://minio.test/mpacks/{mpack_id}?ttl={ttl}" backend = AsyncMock() backend.put = _put backend.get = _get backend.exists = _exists backend.presign_get = _presign_get backend.get_mpack = _get_mpack backend.put_mpack = _put_mpack backend.presign_mpack_get = _presign_mpack_get backend.supports_presign = True monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_shared.get_backend", lambda: backend) return store async def _store_object( session: AsyncSession, repo_id: str, oid: str, content: bytes, *, indexed: bool = True, mpack_key: str = "sha256:" + "a" * 64, ) -> None: """Write a musehub_objects row + ref + optional pack_index entry.""" await session.execute( pg_insert(MusehubObject) .values(object_id=oid, path="file.dat", size_bytes=len(content), storage_uri=f"mem://{oid}", content_cache=content) .on_conflict_do_nothing(index_elements=["object_id"]) ) await session.execute( pg_insert(MusehubObjectRef) .values(repo_id=repo_id, object_id=oid) .on_conflict_do_nothing() ) if indexed: await session.execute( pg_insert(MusehubMPackIndex) .values(entity_id=oid, mpack_id=mpack_key, entity_type="object") .on_conflict_do_nothing() ) await session.commit() async def _make_commit( session: AsyncSession, repo_id: str, *, manifest: dict[str, str], seed: str, parent_ids: list[str] | None = None, generation: int = 0, ) -> tuple[MusehubCommit, MusehubSnapshot]: snap_id = fake_id(f"snap-f01-{seed}") snap = MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), created_at=_now(), ) session.add(snap) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() ) commit_id = fake_id(f"commit-f01-{seed}") pids = parent_ids or [] commit = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=pids, message=f"commit {seed}", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) session.add(commit) await session.execute( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) graph = MusehubCommitGraph( commit_id=commit_id, parent_ids=pids, generation=generation, snapshot_id=snap_id, ) session.add(graph) from sqlalchemy import select as _select branch_q = await session.execute( _select(MusehubBranch).where( MusehubBranch.repo_id == repo_id, MusehubBranch.name == "main", ) ) branch = branch_q.scalar_one_or_none() if branch: branch.head_commit_id = commit_id await session.commit() return commit, snap def _fetch( client: AsyncClient, owner: str, slug: str, want: list[str], have: list[str], headers: dict[str, str] | None = None, ) -> Coroutine[typing.Any, typing.Any, typing.Any]: merged = {"Content-Type": "application/x-msgpack", **(headers or {})} return client.post( f"/{owner}/{slug}/fetch", content=msgpack.packb({"want": want, "have": have}, use_bin_type=True), headers=merged, ) # ══════════════════════════════════════════════════════════════════════════════ # F01 — happy path: want=[tip], have=[] → 200, mpack_url, sha256 mpack_id # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f01_happy_path_returns_presigned_url( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, wire_headers: StrDict, ) -> None: """want=[tip], have=[] → 200, mpack_url non-empty, mpack_id sha256-prefixed.""" _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"hello fetch world" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw) commit, _ = await _make_commit(db_session, repo.repo_id, manifest={"f.txt": oid}, seed="f01") resp = await _fetch(client, "gabriel", repo.slug, [commit.commit_id], [], headers=wire_headers) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) assert data["mpack_id"].startswith("sha256:"), f"bad mpack_id: {data['mpack_id']!r}" assert data["mpack_url"], "mpack_url must be non-empty" assert data["commit_count"] == 1 assert data["blob_count"] == 1 # ══════════════════════════════════════════════════════════════════════════════ # F02 — up-to-date: want=[tip], have=[tip] → 200, all counts zero, mpack_url null # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f02_up_to_date_returns_zero_counts( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, wire_headers: StrDict, ) -> None: """Client already has everything — server returns 200 with commit_count=0.""" _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"already have this" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw) commit, _ = await _make_commit(db_session, repo.repo_id, manifest={"g.txt": oid}, seed="f02") resp = await _fetch(client, "gabriel", repo.slug, [commit.commit_id], [commit.commit_id], headers=wire_headers) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) assert data["commit_count"] == 0 assert data["blob_count"] == 0 assert not data.get("mpack_url"), "mpack_url must be null/empty when nothing to send" # ══════════════════════════════════════════════════════════════════════════════ # F03 — delta: want=[c2], have=[c1] → only new commit + new object # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f03_delta_excludes_have_side_objects( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, wire_headers: StrDict, ) -> None: """have=[c1] cuts the delta — c1 and its object must not appear in the mpack.""" store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw1 = b"base object" oid1 = blob_id(raw1) await _store_object(db_session, repo.repo_id, oid1, raw1) c1, _ = await _make_commit(db_session, repo.repo_id, manifest={"base.txt": oid1}, seed="f03-c1", generation=0) raw2 = b"delta object" oid2 = blob_id(raw2) await _store_object(db_session, repo.repo_id, oid2, raw2) c2, _ = await _make_commit(db_session, repo.repo_id, manifest={"base.txt": oid1, "delta.txt": oid2}, seed="f03-c2", parent_ids=[c1.commit_id], generation=1) resp = await _fetch(client, "gabriel", repo.slug, [c2.commit_id], [c1.commit_id], headers=wire_headers) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) assert data["commit_count"] == 1 assert data["blob_count"] == 1 mpack = parse_wire_mpack(store[data["mpack_id"]]) commit_ids = {c["commit_id"] for c in mpack["commits"]} assert c2.commit_id in commit_ids assert c1.commit_id not in commit_ids, "have-side commit must be excluded" obj_ids = {o["object_id"] for o in mpack["blobs"]} assert oid2 in obj_ids assert oid1 not in obj_ids, "have-side object must be excluded" # ══════════════════════════════════════════════════════════════════════════════ # F04 — missing repo → 404 # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f04_missing_repo_returns_404( client: AsyncClient, wire_headers: StrDict, ) -> None: """POST to a repo that doesn't exist → 404.""" resp = await _fetch(client, "nobody", "no-such-repo", ["sha256:" + "a" * 64], [], headers=wire_headers) assert resp.status_code == 404 # ══════════════════════════════════════════════════════════════════════════════ # F05 — empty want → 422 # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f05_empty_want_returns_422( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """Empty want list → 422 (not a valid fetch request).""" repo = await create_repo(db_session, owner="gabriel", visibility="public") resp = await _fetch(client, "gabriel", repo.slug, [], [], headers=wire_headers) assert resp.status_code == 422 # ══════════════════════════════════════════════════════════════════════════════ # F06 — malformed want entry → 422 # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f06_malformed_want_entry_returns_422( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """want entries that don't match sha256:* → 422.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") resp = await _fetch(client, "gabriel", repo.slug, ["not-a-hash"], [], headers=wire_headers) assert resp.status_code == 422 # ══════════════════════════════════════════════════════════════════════════════ # F07 — unknown want commit_id → 404 # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f07_unknown_want_commit_returns_404( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """want contains a sha256 ID that doesn't exist in musehub_commits → 404.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") ghost_id = "sha256:" + "b" * 64 resp = await _fetch(client, "gabriel", repo.slug, [ghost_id], [], headers=wire_headers) assert resp.status_code == 404 # ══════════════════════════════════════════════════════════════════════════════ # F08 — objects not in pack_index → 503 with Retry-After # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f08_unindexed_objects_returns_503( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, wire_headers: StrDict, ) -> None: """Objects that haven't been indexed in musehub_mpack_index → 503 Retry-After. This happens when unpack-mpack returned 200 but the mpack.index background job hasn't completed yet. The client must retry rather than receiving a partial or corrupt fetch mpack. """ _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"not yet indexed" oid = blob_id(raw) # Write object WITHOUT a pack_index entry (indexed=False) await _store_object(db_session, repo.repo_id, oid, raw, indexed=False) commit, _ = await _make_commit(db_session, repo.repo_id, manifest={"pending.txt": oid}, seed="f08") resp = await _fetch(client, "gabriel", repo.slug, [commit.commit_id], [], headers=wire_headers) assert resp.status_code == 503 assert "retry-after" in {k.lower() for k in resp.headers}, ( "503 response must include Retry-After header" ) # ══════════════════════════════════════════════════════════════════════════════ # F09 — private repo without auth → 404 # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f09_private_repo_without_auth_returns_404( client: AsyncClient, db_session: AsyncSession, ) -> None: """Private repo must 404 for unauthenticated requests (don't leak existence).""" repo = await create_repo(db_session, owner="gabriel", visibility="private") resp = await _fetch(client, "gabriel", repo.slug, ["sha256:" + "c" * 64], []) assert resp.status_code == 404 # ══════════════════════════════════════════════════════════════════════════════ # F10 — mpack_id == sha256(mpack_bytes) end-to-end # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_f10_mpack_id_equals_sha256_of_stored_bytes( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, wire_headers: StrDict, ) -> None: """mpack_id in the HTTP response == sha256(bytes written to MinIO). Content-addressing is the integrity proof — no secondary verification needed. """ store = _stub_backend(monkeypatch) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"content-addressing proof" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw) commit, _ = await _make_commit(db_session, repo.repo_id, manifest={"proof.bin": oid}, seed="f10") resp = await _fetch(client, "gabriel", repo.slug, [commit.commit_id], [], headers=wire_headers) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) mpack_id = data["mpack_id"] stored_bytes = store[mpack_id] expected_id = blob_id(stored_bytes) assert mpack_id == expected_id, ( f"mpack_id {mpack_id!r} != blob_id(stored_bytes) {expected_id!r}" )