"""TDD — R2 presigned fetch path (per-object URL map, not mpack). Problem ------- Cloudflare times out streaming responses after ~100 seconds. For large fetches the same presign pattern used for push should apply — but mirrored: Push presign: server returns {oid → presigned PUT URL}, client PUTs in parallel. Fetch presign: server returns {oid → presigned GET URL}, client GETs in parallel. The server does a BFS walk, collects needed OIDs, calls backend.presign_get(oid, ttl) for each one (no object reads), and returns the map. The client downloads all objects in parallel directly from R2, bypassing Cloudflare entirely. Architecture ------------ ``POST /{owner}/{slug}/fetch/presign`` Request body (msgpack): want list[str] — commit IDs the client wants have list[str] — commit IDs the client already has (ancestry cut) ttl_seconds int — presigned URL TTL (default 3600) Response (msgpack): presign bool — False when below threshold or backend can't presign blob_urls dict[str, str] — {oid: presigned_GET_url} for each needed object commits list[dict] — commit records for apply_mpack snapshots list[dict] — snapshot records for apply_mpack branch_heads dict[str, str] — branch → tip commit_id repo_id str domain str default_branch str expires_at str|null — ISO-8601 expiry object_count int commit_count int Threshold (same as push): presign when ≥ 500 objects OR total raw size ≥ 50 MB Test plan --------- Unit / integration FP0 Below threshold → presign=False, blob_urls={}. FP1 Above object threshold with S3 backend → presign=True, blob_urls has all OIDs. FP2 Above size threshold (few but large objects) → presign=True. FP3 have set excludes commits the client already has. FP4 Above threshold with private repo → presign=True (auth required). FP5 Empty want list → presign=False, blob_urls={}, counts zero. FP6 commit_count and object_count are accurate across a commit chain. FP7 Route 404 for missing repo. FP8 Route returns presign=False for small public repo (no auth needed). FP9 Route returns 404 for private repo without auth (don't leak existence). FP10 blob_urls map keys match exactly the new OIDs in the needed manifests. Security FPS0 TTL forwarded verbatim to presign_get; no negative or zero TTL. FPS1 presign_get never called for OIDs in the have set. FPS2 Private repo returns 404 even to a non-owner authenticated user. FPS3 blob_urls count equals new_oids count — no extras, no leakage. Performance FPP0 All presign_get calls complete even when N > semaphore limit (50). FPP1 Custom ttl_seconds is honoured in the presigned URL. Stress / state integrity FPST0 600 OIDs → every OID appears in blob_urls (no dropped presign calls). FPST1 presign_get raising for one OID propagates the exception out. End-to-end FPE0 Full HTTP route with S3 backend mock → msgpack response contains correct presign=True, all blob_urls present and well-formed. """ from __future__ import annotations from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import msgpack import pytest from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id, fake_id from musehub.db import musehub_repo_models as db from musehub.models.wire import WireFetchRequest from musehub.services.musehub_wire import ( wire_fetch_presign, ) # Object count used in tests that iterate N objects to verify presign behavior. # Arbitrary small number; presign is now unconditional (no threshold logic). FETCH_PRESIGN_OBJECT_THRESHOLD = 5 FETCH_PRESIGN_SIZE_THRESHOLD = 0 from tests.factories import create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid(seed: str) -> str: return fake_id(seed) async def _store_object( session: AsyncSession, repo_id: str, oid: str, content: bytes, size_override: int | None = None, ) -> None: from musehub.services.musehub_wire import get_backend backend = get_backend() uri = await backend.put(oid, content) await session.execute( pg_insert(db.MusehubObject) .values( object_id=oid, path="file.dat", size_bytes=size_override if size_override is not None else len(content), storage_uri=uri, ) .on_conflict_do_nothing(index_elements=["object_id"]) ) await session.execute( pg_insert(db.MusehubObjectRef) .values(repo_id=repo_id, object_id=oid) .on_conflict_do_nothing() ) await session.commit() async def _make_commit( session: AsyncSession, repo_id: str, *, manifest: dict[str, str], seed: str = "c1", parent_ids: list[str] | None = None, ) -> tuple[db.MusehubCommit, db.MusehubSnapshot]: snap_id = _uid(f"snap-{seed}") snap = db.MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), created_at=_now(), ) session.add(snap) await session.execute( pg_insert(db.MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() ) commit_id = _uid(f"commit-{seed}") commit = db.MusehubCommit( commit_id=commit_id, branch="main", parent_ids=parent_ids or [], message=f"commit {seed}", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) session.add(commit) await session.execute( pg_insert(db.MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.execute( pg_insert(db.MusehubCommitGraph) .values( commit_id=commit_id, parent_ids=parent_ids or [], generation=0, snapshot_id=snap_id, ) .on_conflict_do_nothing() ) await session.commit() return commit, snap # --------------------------------------------------------------------------- # FP0 — single-object repo always presigns (no inline path) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp0_single_object_always_presigns(db_session: AsyncSession) -> None: repo = await create_repo(db_session, owner="gabriel", visibility="public") oid = blob_id(b"small object") await _store_object(db_session, repo.repo_id, oid, b"small object") commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"file.dat": oid}, seed="c1" ) req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert oid in result["blob_urls"] assert result["commit_count"] == 1 assert result["blob_count"] == 1 # --------------------------------------------------------------------------- # FP1 — above object threshold with S3 backend → presign=True, per-object URLs # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp1_above_object_threshold_s3_presigns(db_session: AsyncSession) -> None: from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") manifest: dict[str, str] = {} for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"obj-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"obj-{i}".encode()) manifest[f"file_{i}.dat"] = oid commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="big") real_backend = _get_real_backend() class _FakeS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: return f"https://r2.example.com/{oid}?sig=fake&ttl={ttl}" req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) _fake_s3 = _FakeS3() with patch("musehub.services.musehub_wire.get_backend", return_value=_fake_s3), \ patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_fake_s3), \ patch("musehub.services.musehub_wire_shared.get_backend", return_value=_fake_s3): result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert len(result["blob_urls"]) == FETCH_PRESIGN_OBJECT_THRESHOLD # Every OID in the manifest must have a presigned URL for oid in manifest.values(): assert oid in result["blob_urls"] assert result["blob_urls"][oid].startswith("https://r2.example.com/") assert result["blob_count"] == FETCH_PRESIGN_OBJECT_THRESHOLD assert result["commit_count"] == 1 assert result["expires_at"] is not None # No mpack URL — per-object map only assert "url" not in result or result.get("url") is None # --------------------------------------------------------------------------- # FP2 — above size threshold → presign=True # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp2_above_size_threshold_presigns(db_session: AsyncSession) -> None: from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") oid = blob_id(b"large-content-placeholder") await _store_object( db_session, repo.repo_id, oid, b"large-content-placeholder", size_override=FETCH_PRESIGN_SIZE_THRESHOLD, ) commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"big.dat": oid}, seed="big2" ) real_backend = _get_real_backend() class _FakeS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: return f"https://r2.example.com/{oid}?sig=fake" req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()): result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert result["blob_count"] == 1 assert oid in result["blob_urls"] # --------------------------------------------------------------------------- # FP3 — have set excludes already-known commits/objects # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp3_have_set_excludes_known_objects(db_session: AsyncSession) -> None: repo = await create_repo(db_session, owner="gabriel", visibility="public") oid_a = blob_id(b"obj-a") oid_b = blob_id(b"obj-b") await _store_object(db_session, repo.repo_id, oid_a, b"obj-a") await _store_object(db_session, repo.repo_id, oid_b, b"obj-b") commit_a, _ = await _make_commit( db_session, repo.repo_id, manifest={"a.dat": oid_a}, seed="a" ) commit_b, _ = await _make_commit( db_session, repo.repo_id, manifest={"a.dat": oid_a, "b.dat": oid_b}, seed="b", parent_ids=[commit_a.commit_id], ) req = WireFetchRequest(want=[commit_b.commit_id], have=[commit_a.commit_id], depth=None) result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["commit_count"] == 1 # only commit_b is new assert result["blob_count"] == 1 # only oid_b is new assert result["presign"] is True assert oid_b in result["blob_urls"] assert oid_a not in result["blob_urls"] # excluded via have # --------------------------------------------------------------------------- # FP5 — empty want → presign=False, all counts zero # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp5_empty_want_returns_presign_false(db_session: AsyncSession) -> None: repo = await create_repo(db_session, owner="gabriel", visibility="public") req = WireFetchRequest(want=[], have=[], depth=None) result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is False assert result["blob_urls"] == {} assert result["commit_count"] == 0 assert result["blob_count"] == 0 # --------------------------------------------------------------------------- # FP6 — commit_count and object_count accurate across commit chain # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp6_counts_accurate(db_session: AsyncSession) -> None: repo = await create_repo(db_session, owner="gabriel", visibility="public") oids = [blob_id(f"obj-{i}".encode()) for i in range(3)] for i, oid in enumerate(oids): await _store_object(db_session, repo.repo_id, oid, f"obj-{i}".encode()) c1, _ = await _make_commit(db_session, repo.repo_id, manifest={"a.dat": oids[0]}, seed="s1") c2, _ = await _make_commit( db_session, repo.repo_id, manifest={"a.dat": oids[0], "b.dat": oids[1]}, seed="s2", parent_ids=[c1.commit_id], ) c3, _ = await _make_commit( db_session, repo.repo_id, manifest={"a.dat": oids[0], "b.dat": oids[1], "c.dat": oids[2]}, seed="s3", parent_ids=[c2.commit_id], ) req = WireFetchRequest(want=[c3.commit_id], have=[], depth=None) result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["commit_count"] == 3 assert result["blob_count"] == 3 # 3 unique objects across all commits # --------------------------------------------------------------------------- # FP7 — route 404 for missing repo # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp7_route_404_missing_repo(client: AsyncClient) -> None: resp = await client.post( "/nobody/no-such-repo/fetch/presign", content=msgpack.packb({"want": [], "have": []}, use_bin_type=True), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # FP8 — route 200/presign=False for small public repo # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp8_route_small_public_repo( client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str] ) -> None: repo = await create_repo(db_session, owner="gabriel", visibility="public") oid = blob_id(b"tiny") await _store_object(db_session, repo.repo_id, oid, b"tiny") commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"t.dat": oid}, seed="tiny" ) resp = await client.post( f"/gabriel/{repo.slug}/fetch/presign", content=msgpack.packb({"want": [commit.commit_id], "have": []}, use_bin_type=True), headers={**wire_headers, "Content-Type": "application/x-msgpack"}, ) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) assert data["presign"] is True assert oid in data["blob_urls"] # --------------------------------------------------------------------------- # FP9 — private repo returns 404 to non-owner # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp9_route_private_repo_non_owner_gets_404( client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str] ) -> None: repo = await create_repo(db_session, owner="gabriel", visibility="private") resp = await client.post( f"/gabriel/{repo.slug}/fetch/presign", content=msgpack.packb({"want": [], "have": []}, use_bin_type=True), headers={**wire_headers, "Content-Type": "application/x-msgpack"}, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # FP10 — blob_urls keys match exactly the new OIDs in the needed manifests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fp10_blob_urls_keys_match_manifest_oids(db_session: AsyncSession) -> None: """blob_urls must contain exactly the OIDs from needed commits, not more, not less.""" from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() manifest: dict[str, str] = {} expected_oids: set[str] = set() for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"fp10-obj-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fp10-obj-{i}".encode()) manifest[f"file_{i}.dat"] = oid expected_oids.add(oid) commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fp10") class _FakeS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: return f"https://r2.example.com/{oid}?sig=fp10" req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()): result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert set(result["blob_urls"].keys()) == expected_oids # =========================================================================== # Security tests # =========================================================================== # --------------------------------------------------------------------------- # FPS0 — TTL forwarded verbatim to presign_get; never negative or zero # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fps0_ttl_forwarded_to_presign_get(db_session: AsyncSession) -> None: """presign_get must receive the exact ttl_seconds argument; cannot produce negative TTL.""" from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() manifest: dict[str, str] = {} for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"fps0-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fps0-{i}".encode()) manifest[f"f{i}.dat"] = oid commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fps0") received_ttls: list[int] = [] class _RecordTTL: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: received_ttls.append(ttl) assert ttl > 0, "TTL must be positive" return f"https://r2.example.com/{oid}?ttl={ttl}" custom_ttl = 1800 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) _record_ttl = _RecordTTL() with patch("musehub.services.musehub_wire.get_backend", return_value=_record_ttl), \ patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_record_ttl), \ patch("musehub.services.musehub_wire_shared.get_backend", return_value=_record_ttl): result = await wire_fetch_presign(db_session, repo.repo_id, req, ttl_seconds=custom_ttl) assert result["presign"] is True assert all(t == custom_ttl for t in received_ttls), "All presign_get calls must use the custom TTL" assert len(received_ttls) == FETCH_PRESIGN_OBJECT_THRESHOLD # --------------------------------------------------------------------------- # FPS1 — presign_get never called for have-set OIDs # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fps1_presign_get_not_called_for_have_oids(db_session: AsyncSession) -> None: """presign_get must not be invoked for objects the client already has. Commit A carries base_oid (client already has commit A). Commit B adds THRESHOLD new objects. The delta is exactly those THRESHOLD objects — large enough to trigger presign. base_oid must not be presigned. """ from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() # Commit A: one object that the client already has. base_oid = blob_id(b"fps1-base") await _store_object(db_session, repo.repo_id, base_oid, b"fps1-base") commit_a, _ = await _make_commit( db_session, repo.repo_id, manifest={"base.dat": base_oid}, seed="fps1a", ) # Commit B: THRESHOLD new objects (delta large enough to trigger presign). manifest_b: dict[str, str] = {"base.dat": base_oid} new_oids: set[str] = set() for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"fps1-new-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fps1-new-{i}".encode()) manifest_b[f"new_{i}.dat"] = oid new_oids.add(oid) commit_b, _ = await _make_commit( db_session, repo.repo_id, manifest=manifest_b, seed="fps1b", parent_ids=[commit_a.commit_id], ) presigned_oids: set[str] = set() class _TrackCalls: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: presigned_oids.add(oid) return f"https://r2.example.com/{oid}?sig=fps1" # Client has commit_a — only the THRESHOLD new objects are delta. req = WireFetchRequest(want=[commit_b.commit_id], have=[commit_a.commit_id], depth=None) _track = _TrackCalls() with patch("musehub.services.musehub_wire.get_backend", return_value=_track), \ patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_track), \ patch("musehub.services.musehub_wire_shared.get_backend", return_value=_track): result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert base_oid not in presigned_oids, "base OID known by client must not be presigned" assert new_oids == presigned_oids, "exactly the new OIDs must be presigned" assert result["blob_count"] == FETCH_PRESIGN_OBJECT_THRESHOLD # --------------------------------------------------------------------------- # FPS2 — private repo returns 404 to non-owner authenticated user # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fps2_private_repo_404_for_non_owner( client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str], ) -> None: """Authenticated non-owner must not discover a private repo via fetch/presign.""" # wire_headers injects handle="test-user-wire"; repo owner is "gabriel" — different user. repo = await create_repo(db_session, owner="gabriel", visibility="private") resp = await client.post( f"/gabriel/{repo.slug}/fetch/presign", content=msgpack.packb({"want": [], "have": []}, use_bin_type=True), headers={**wire_headers, "Content-Type": "application/x-msgpack"}, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # FPS3 — blob_urls count equals new_oids count, no leakage # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fps3_blob_urls_no_extras(db_session: AsyncSession) -> None: """blob_urls must contain exactly as many keys as new OIDs — no leakage.""" from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() manifest: dict[str, str] = {} for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"fps3-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fps3-{i}".encode()) manifest[f"f{i}.dat"] = oid commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fps3") class _FakeS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: return f"https://r2.example.com/{oid}?sig=fps3" req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()): result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert len(result["blob_urls"]) == result["blob_count"] assert set(result["blob_urls"].keys()) == set(manifest.values()) # =========================================================================== # Performance tests # =========================================================================== # --------------------------------------------------------------------------- # FPP0 — all presign_get calls complete when N > semaphore limit (50) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fpp0_all_presign_calls_complete_above_semaphore_limit( db_session: AsyncSession, ) -> None: """asyncio.gather with Semaphore(50) must complete all N>50 presign calls.""" from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() n = 75 # deliberately above the semaphore limit of 50 manifest: dict[str, str] = {} for i in range(n): oid = blob_id(f"fpp0-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fpp0-{i}".encode()) manifest[f"f{i}.dat"] = oid commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpp0") presign_count = 0 class _CountingS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: nonlocal presign_count presign_count += 1 return f"https://r2.example.com/{oid}?sig=fpp0" req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) _counting_s3 = _CountingS3() with patch("musehub.services.musehub_wire.get_backend", return_value=_counting_s3), \ patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_counting_s3), \ patch("musehub.services.musehub_wire_shared.get_backend", return_value=_counting_s3): result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert presign_count == n, f"Expected {n} presign_get calls, got {presign_count}" assert len(result["blob_urls"]) == n # --------------------------------------------------------------------------- # FPP1 — custom ttl_seconds appears in expires_at timestamp # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fpp1_custom_ttl_reflected_in_expires_at(db_session: AsyncSession) -> None: """expires_at must be approximately now() + ttl_seconds.""" from musehub.services.musehub_wire import get_backend as _get_real_backend import dateutil.parser repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() manifest: dict[str, str] = {} for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"fpp1-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fpp1-{i}".encode()) manifest[f"f{i}.dat"] = oid commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpp1") class _FakeS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: return f"https://r2.example.com/{oid}?ttl={ttl}" before = datetime.now(tz=timezone.utc) custom_ttl = 300 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()): result = await wire_fetch_presign(db_session, repo.repo_id, req, ttl_seconds=custom_ttl) after = datetime.now(tz=timezone.utc) assert result["expires_at"] is not None expires_dt = dateutil.parser.parse(result["expires_at"]) from datetime import timedelta assert before + timedelta(seconds=custom_ttl - 5) <= expires_dt <= after + timedelta(seconds=custom_ttl + 5) # =========================================================================== # Stress / state integrity tests # =========================================================================== # --------------------------------------------------------------------------- # FPST0 — 600 OIDs, every one appears in blob_urls # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fpst0_600_oids_all_presigned(db_session: AsyncSession) -> None: """All 600 OIDs must appear in blob_urls — no dropped presign calls.""" from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() n = 600 # well above FETCH_PRESIGN_OBJECT_THRESHOLD (500) manifest: dict[str, str] = {} all_oids: set[str] = set() for i in range(n): oid = blob_id(f"fpst0-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fpst0-{i}".encode()) manifest[f"f{i}.dat"] = oid all_oids.add(oid) commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpst0") class _FakeS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: return f"https://r2.example.com/{oid}?sig=fpst0" req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()): result = await wire_fetch_presign(db_session, repo.repo_id, req) assert result["presign"] is True assert result["blob_count"] == n missing = all_oids - set(result["blob_urls"].keys()) assert not missing, f"Missing presigned URLs for {len(missing)} OIDs" # --------------------------------------------------------------------------- # FPST1 — presign_get raising propagates the exception # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fpst1_presign_get_exception_propagates(db_session: AsyncSession) -> None: """If presign_get raises, wire_fetch_presign must propagate — no silent partial failure.""" from musehub.services.musehub_wire import get_backend as _get_real_backend repo = await create_repo(db_session, owner="gabriel", visibility="public") real_backend = _get_real_backend() manifest: dict[str, str] = {} for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"fpst1-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fpst1-{i}".encode()) manifest[f"f{i}.dat"] = oid commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpst1") call_count = 0 class _FailingS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: nonlocal call_count call_count += 1 if call_count == 3: raise RuntimeError("R2 presign service unavailable") return f"https://r2.example.com/{oid}?sig=fpst1" req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None) _failing_s3 = _FailingS3() with patch("musehub.services.musehub_wire.get_backend", return_value=_failing_s3), \ patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_failing_s3), \ patch("musehub.services.musehub_wire_shared.get_backend", return_value=_failing_s3), \ pytest.raises(RuntimeError, match="R2 presign service unavailable"): await wire_fetch_presign(db_session, repo.repo_id, req) # =========================================================================== # End-to-end tests # =========================================================================== # --------------------------------------------------------------------------- # FPE0 — full HTTP route with mocked S3 → msgpack response correct # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fpe0_route_presign_true_full_response( client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str], ) -> None: """Full HTTP round-trip: route → service → FakeS3 → msgpack response with blob_urls.""" from musehub.services.musehub_wire import get_backend as _get_real_backend real_backend = _get_real_backend() repo = await create_repo(db_session, owner="gabriel", visibility="public") manifest: dict[str, str] = {} all_oids: set[str] = set() for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD): oid = blob_id(f"fpe0-{i}".encode()) await _store_object(db_session, repo.repo_id, oid, f"fpe0-{i}".encode()) manifest[f"f{i}.dat"] = oid all_oids.add(oid) commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpe0") class _FakeS3: supports_presign = True get = real_backend.get exists = real_backend.exists put = AsyncMock(return_value="s3://bucket/obj") async def presign_get(self, oid: str, ttl: int) -> str: return f"https://r2.example.com/{oid}?sig=fpe0&ttl={ttl}" _fake_s3 = _FakeS3() with patch("musehub.services.musehub_wire.get_backend", return_value=_fake_s3), \ patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_fake_s3), \ patch("musehub.services.musehub_wire_shared.get_backend", return_value=_fake_s3): resp = await client.post( f"/gabriel/{repo.slug}/fetch/presign", content=msgpack.packb( {"want": [commit.commit_id], "have": []}, use_bin_type=True ), headers={**wire_headers, "Content-Type": "application/x-msgpack"}, ) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) assert data["presign"] is True assert set(data["blob_urls"].keys()) == all_oids for oid, url in data["blob_urls"].items(): assert url.startswith("https://r2.example.com/"), f"Unexpected URL: {url}" assert oid in url, "OID must appear in its own presigned URL" assert data["commit_count"] == 1 assert data["blob_count"] == FETCH_PRESIGN_OBJECT_THRESHOLD assert data["expires_at"] is not None assert data["repo_id"] == repo.repo_id # No legacy mpack URL field assert "url" not in data or data.get("url") is None