gabriel / musehub public
test_wire_fetch_presign.py python
877 lines 36.0 KB
Raw
sha256:ab9eda7b6479e1c35cdba9a54f62bacd2825de8faacec3ba67a9a8ef45914b7d fix: migration and wire protocol alignment Sonnet 4.6 minor ⚠ breaking 20 days ago
1 """TDD — R2 presigned fetch path (per-object URL map, not mpack).
2
3 Problem
4 -------
5 Cloudflare times out streaming responses after ~100 seconds. For large fetches
6 the same presign pattern used for push should apply — but mirrored:
7
8 Push presign: server returns {oid → presigned PUT URL}, client PUTs in parallel.
9 Fetch presign: server returns {oid → presigned GET URL}, client GETs in parallel.
10
11 The server does a BFS walk, collects needed OIDs, calls backend.presign_get(oid, ttl)
12 for each one (no object reads), and returns the map. The client downloads all
13 objects in parallel directly from R2, bypassing Cloudflare entirely.
14
15 Architecture
16 ------------
17 ``POST /{owner}/{slug}/fetch/presign``
18
19 Request body (msgpack):
20 want list[str] — commit IDs the client wants
21 have list[str] — commit IDs the client already has (ancestry cut)
22 ttl_seconds int — presigned URL TTL (default 3600)
23
24 Response (msgpack):
25 presign bool — False when below threshold or backend can't presign
26 blob_urls dict[str, str] — {oid: presigned_GET_url} for each needed object
27 commits list[dict] — commit records for apply_mpack
28 snapshots list[dict] — snapshot records for apply_mpack
29 branch_heads dict[str, str] — branch → tip commit_id
30 repo_id str
31 domain str
32 default_branch str
33 expires_at str|null — ISO-8601 expiry
34 object_count int
35 commit_count int
36
37 Threshold (same as push):
38 presign when ≥ 500 objects OR total raw size ≥ 50 MB
39
40 Test plan
41 ---------
42 Unit / integration
43 FP0 Below threshold → presign=False, blob_urls={}.
44 FP1 Above object threshold with S3 backend → presign=True, blob_urls has all OIDs.
45 FP2 Above size threshold (few but large objects) → presign=True.
46 FP3 have set excludes commits the client already has.
47 FP4 Above threshold with private repo → presign=True (auth required).
48 FP5 Empty want list → presign=False, blob_urls={}, counts zero.
49 FP6 commit_count and object_count are accurate across a commit chain.
50 FP7 Route 404 for missing repo.
51 FP8 Route returns presign=False for small public repo (no auth needed).
52 FP9 Route returns 404 for private repo without auth (don't leak existence).
53 FP10 blob_urls map keys match exactly the new OIDs in the needed manifests.
54
55 Security
56 FPS0 TTL forwarded verbatim to presign_get; no negative or zero TTL.
57 FPS1 presign_get never called for OIDs in the have set.
58 FPS2 Private repo returns 404 even to a non-owner authenticated user.
59 FPS3 blob_urls count equals new_oids count — no extras, no leakage.
60
61 Performance
62 FPP0 All presign_get calls complete even when N > semaphore limit (50).
63 FPP1 Custom ttl_seconds is honoured in the presigned URL.
64
65 Stress / state integrity
66 FPST0 600 OIDs → every OID appears in blob_urls (no dropped presign calls).
67 FPST1 presign_get raising for one OID propagates the exception out.
68
69 End-to-end
70 FPE0 Full HTTP route with S3 backend mock → msgpack response contains
71 correct presign=True, all blob_urls present and well-formed.
72 """
73 from __future__ import annotations
74
75 from datetime import datetime, timezone
76 from unittest.mock import AsyncMock, patch
77
78 import msgpack
79 import pytest
80 from httpx import AsyncClient
81 from sqlalchemy.dialects.postgresql import insert as pg_insert
82 from sqlalchemy.ext.asyncio import AsyncSession
83
84 from muse.core.types import blob_id, fake_id
85 from musehub.db import musehub_repo_models as db
86 from musehub.models.wire import WireFetchRequest
87 from musehub.services.musehub_wire import (
88 wire_fetch_presign,
89 )
90
91 # Object count used in tests that iterate N objects to verify presign behavior.
92 # Arbitrary small number; presign is now unconditional (no threshold logic).
93 FETCH_PRESIGN_OBJECT_THRESHOLD = 5
94 FETCH_PRESIGN_SIZE_THRESHOLD = 0
95 from tests.factories import create_repo
96
97 # ---------------------------------------------------------------------------
98 # Helpers
99 # ---------------------------------------------------------------------------
100
101 def _now() -> datetime:
102 return datetime.now(tz=timezone.utc)
103
104
105 def _uid(seed: str) -> str:
106 return fake_id(seed)
107
108
109 async def _store_object(
110 session: AsyncSession,
111 repo_id: str,
112 oid: str,
113 content: bytes,
114 size_override: int | None = None,
115 ) -> None:
116 from musehub.services.musehub_wire import get_backend
117 backend = get_backend()
118 uri = await backend.put(oid, content)
119 await session.execute(
120 pg_insert(db.MusehubObject)
121 .values(
122 object_id=oid,
123 path="file.dat",
124 size_bytes=size_override if size_override is not None else len(content),
125 storage_uri=uri,
126 )
127 .on_conflict_do_nothing(index_elements=["object_id"])
128 )
129 await session.execute(
130 pg_insert(db.MusehubObjectRef)
131 .values(repo_id=repo_id, object_id=oid)
132 .on_conflict_do_nothing()
133 )
134 await session.commit()
135
136
137 async def _make_commit(
138 session: AsyncSession,
139 repo_id: str,
140 *,
141 manifest: dict[str, str],
142 seed: str = "c1",
143 parent_ids: list[str] | None = None,
144 ) -> tuple[db.MusehubCommit, db.MusehubSnapshot]:
145 snap_id = _uid(f"snap-{seed}")
146 snap = db.MusehubSnapshot(
147 snapshot_id=snap_id,
148 directories=[],
149 manifest_blob=msgpack.packb(manifest, use_bin_type=True),
150 entry_count=len(manifest),
151 created_at=_now(),
152 )
153 session.add(snap)
154 await session.execute(
155 pg_insert(db.MusehubSnapshotRef)
156 .values(repo_id=repo_id, snapshot_id=snap_id)
157 .on_conflict_do_nothing()
158 )
159 commit_id = _uid(f"commit-{seed}")
160 commit = db.MusehubCommit(
161 commit_id=commit_id,
162 branch="main",
163 parent_ids=parent_ids or [],
164 message=f"commit {seed}",
165 author="gabriel",
166 timestamp=_now(),
167 snapshot_id=snap_id,
168 )
169 session.add(commit)
170 await session.execute(
171 pg_insert(db.MusehubCommitRef)
172 .values(repo_id=repo_id, commit_id=commit_id)
173 .on_conflict_do_nothing()
174 )
175 await session.execute(
176 pg_insert(db.MusehubCommitGraph)
177 .values(
178 commit_id=commit_id,
179 parent_ids=parent_ids or [],
180 generation=0,
181 snapshot_id=snap_id,
182 )
183 .on_conflict_do_nothing()
184 )
185 await session.commit()
186 return commit, snap
187
188
189 # ---------------------------------------------------------------------------
190 # FP0 — single-object repo always presigns (no inline path)
191 # ---------------------------------------------------------------------------
192
193 @pytest.mark.asyncio
194 async def test_fp0_single_object_always_presigns(db_session: AsyncSession) -> None:
195 repo = await create_repo(db_session, owner="gabriel", visibility="public")
196 oid = blob_id(b"small object")
197 await _store_object(db_session, repo.repo_id, oid, b"small object")
198 commit, _ = await _make_commit(
199 db_session, repo.repo_id, manifest={"file.dat": oid}, seed="c1"
200 )
201 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
202 result = await wire_fetch_presign(db_session, repo.repo_id, req)
203
204 assert result["presign"] is True
205 assert oid in result["blob_urls"]
206 assert result["commit_count"] == 1
207 assert result["blob_count"] == 1
208
209
210 # ---------------------------------------------------------------------------
211 # FP1 — above object threshold with S3 backend → presign=True, per-object URLs
212 # ---------------------------------------------------------------------------
213
214 @pytest.mark.asyncio
215 async def test_fp1_above_object_threshold_s3_presigns(db_session: AsyncSession) -> None:
216 from musehub.services.musehub_wire import get_backend as _get_real_backend
217 repo = await create_repo(db_session, owner="gabriel", visibility="public")
218
219 manifest: dict[str, str] = {}
220 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
221 oid = blob_id(f"obj-{i}".encode())
222 await _store_object(db_session, repo.repo_id, oid, f"obj-{i}".encode())
223 manifest[f"file_{i}.dat"] = oid
224
225 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="big")
226 real_backend = _get_real_backend()
227
228 class _FakeS3:
229 supports_presign = True
230 get = real_backend.get
231 exists = real_backend.exists
232 put = AsyncMock(return_value="s3://bucket/obj")
233
234 async def presign_get(self, oid: str, ttl: int) -> str:
235 return f"https://r2.example.com/{oid}?sig=fake&ttl={ttl}"
236
237 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
238
239 _fake_s3 = _FakeS3()
240 with patch("musehub.services.musehub_wire.get_backend", return_value=_fake_s3), \
241 patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_fake_s3), \
242 patch("musehub.services.musehub_wire_shared.get_backend", return_value=_fake_s3):
243 result = await wire_fetch_presign(db_session, repo.repo_id, req)
244
245 assert result["presign"] is True
246 assert len(result["blob_urls"]) == FETCH_PRESIGN_OBJECT_THRESHOLD
247 # Every OID in the manifest must have a presigned URL
248 for oid in manifest.values():
249 assert oid in result["blob_urls"]
250 assert result["blob_urls"][oid].startswith("https://r2.example.com/")
251 assert result["blob_count"] == FETCH_PRESIGN_OBJECT_THRESHOLD
252 assert result["commit_count"] == 1
253 assert result["expires_at"] is not None
254 # No mpack URL — per-object map only
255 assert "url" not in result or result.get("url") is None
256
257
258 # ---------------------------------------------------------------------------
259 # FP2 — above size threshold → presign=True
260 # ---------------------------------------------------------------------------
261
262 @pytest.mark.asyncio
263 async def test_fp2_above_size_threshold_presigns(db_session: AsyncSession) -> None:
264 from musehub.services.musehub_wire import get_backend as _get_real_backend
265 repo = await create_repo(db_session, owner="gabriel", visibility="public")
266 oid = blob_id(b"large-content-placeholder")
267 await _store_object(
268 db_session, repo.repo_id, oid, b"large-content-placeholder",
269 size_override=FETCH_PRESIGN_SIZE_THRESHOLD,
270 )
271 commit, _ = await _make_commit(
272 db_session, repo.repo_id, manifest={"big.dat": oid}, seed="big2"
273 )
274 real_backend = _get_real_backend()
275
276 class _FakeS3:
277 supports_presign = True
278 get = real_backend.get
279 exists = real_backend.exists
280 put = AsyncMock(return_value="s3://bucket/obj")
281
282 async def presign_get(self, oid: str, ttl: int) -> str:
283 return f"https://r2.example.com/{oid}?sig=fake"
284
285 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
286
287 with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()):
288 result = await wire_fetch_presign(db_session, repo.repo_id, req)
289
290 assert result["presign"] is True
291 assert result["blob_count"] == 1
292 assert oid in result["blob_urls"]
293
294
295 # ---------------------------------------------------------------------------
296 # FP3 — have set excludes already-known commits/objects
297 # ---------------------------------------------------------------------------
298
299 @pytest.mark.asyncio
300 async def test_fp3_have_set_excludes_known_objects(db_session: AsyncSession) -> None:
301 repo = await create_repo(db_session, owner="gabriel", visibility="public")
302 oid_a = blob_id(b"obj-a")
303 oid_b = blob_id(b"obj-b")
304 await _store_object(db_session, repo.repo_id, oid_a, b"obj-a")
305 await _store_object(db_session, repo.repo_id, oid_b, b"obj-b")
306
307 commit_a, _ = await _make_commit(
308 db_session, repo.repo_id, manifest={"a.dat": oid_a}, seed="a"
309 )
310 commit_b, _ = await _make_commit(
311 db_session, repo.repo_id,
312 manifest={"a.dat": oid_a, "b.dat": oid_b},
313 seed="b",
314 parent_ids=[commit_a.commit_id],
315 )
316
317 req = WireFetchRequest(want=[commit_b.commit_id], have=[commit_a.commit_id], depth=None)
318 result = await wire_fetch_presign(db_session, repo.repo_id, req)
319
320 assert result["commit_count"] == 1 # only commit_b is new
321 assert result["blob_count"] == 1 # only oid_b is new
322 assert result["presign"] is True
323 assert oid_b in result["blob_urls"]
324 assert oid_a not in result["blob_urls"] # excluded via have
325
326
327 # ---------------------------------------------------------------------------
328 # FP5 — empty want → presign=False, all counts zero
329 # ---------------------------------------------------------------------------
330
331 @pytest.mark.asyncio
332 async def test_fp5_empty_want_returns_presign_false(db_session: AsyncSession) -> None:
333 repo = await create_repo(db_session, owner="gabriel", visibility="public")
334 req = WireFetchRequest(want=[], have=[], depth=None)
335 result = await wire_fetch_presign(db_session, repo.repo_id, req)
336
337 assert result["presign"] is False
338 assert result["blob_urls"] == {}
339 assert result["commit_count"] == 0
340 assert result["blob_count"] == 0
341
342
343 # ---------------------------------------------------------------------------
344 # FP6 — commit_count and object_count accurate across commit chain
345 # ---------------------------------------------------------------------------
346
347 @pytest.mark.asyncio
348 async def test_fp6_counts_accurate(db_session: AsyncSession) -> None:
349 repo = await create_repo(db_session, owner="gabriel", visibility="public")
350 oids = [blob_id(f"obj-{i}".encode()) for i in range(3)]
351 for i, oid in enumerate(oids):
352 await _store_object(db_session, repo.repo_id, oid, f"obj-{i}".encode())
353
354 c1, _ = await _make_commit(db_session, repo.repo_id, manifest={"a.dat": oids[0]}, seed="s1")
355 c2, _ = await _make_commit(
356 db_session, repo.repo_id,
357 manifest={"a.dat": oids[0], "b.dat": oids[1]},
358 seed="s2", parent_ids=[c1.commit_id],
359 )
360 c3, _ = await _make_commit(
361 db_session, repo.repo_id,
362 manifest={"a.dat": oids[0], "b.dat": oids[1], "c.dat": oids[2]},
363 seed="s3", parent_ids=[c2.commit_id],
364 )
365
366 req = WireFetchRequest(want=[c3.commit_id], have=[], depth=None)
367 result = await wire_fetch_presign(db_session, repo.repo_id, req)
368
369 assert result["commit_count"] == 3
370 assert result["blob_count"] == 3 # 3 unique objects across all commits
371
372
373 # ---------------------------------------------------------------------------
374 # FP7 — route 404 for missing repo
375 # ---------------------------------------------------------------------------
376
377 @pytest.mark.asyncio
378 async def test_fp7_route_404_missing_repo(client: AsyncClient) -> None:
379 resp = await client.post(
380 "/nobody/no-such-repo/fetch/presign",
381 content=msgpack.packb({"want": [], "have": []}, use_bin_type=True),
382 headers={"Content-Type": "application/x-msgpack"},
383 )
384 assert resp.status_code == 404
385
386
387 # ---------------------------------------------------------------------------
388 # FP8 — route 200/presign=False for small public repo
389 # ---------------------------------------------------------------------------
390
391 @pytest.mark.asyncio
392 async def test_fp8_route_small_public_repo(
393 client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str]
394 ) -> None:
395 repo = await create_repo(db_session, owner="gabriel", visibility="public")
396 oid = blob_id(b"tiny")
397 await _store_object(db_session, repo.repo_id, oid, b"tiny")
398 commit, _ = await _make_commit(
399 db_session, repo.repo_id, manifest={"t.dat": oid}, seed="tiny"
400 )
401
402 resp = await client.post(
403 f"/gabriel/{repo.slug}/fetch/presign",
404 content=msgpack.packb({"want": [commit.commit_id], "have": []}, use_bin_type=True),
405 headers={**wire_headers, "Content-Type": "application/x-msgpack"},
406 )
407 assert resp.status_code == 200
408 data = msgpack.unpackb(resp.content, raw=False)
409 assert data["presign"] is True
410 assert oid in data["blob_urls"]
411
412
413 # ---------------------------------------------------------------------------
414 # FP9 — private repo returns 404 to non-owner
415 # ---------------------------------------------------------------------------
416
417 @pytest.mark.asyncio
418 async def test_fp9_route_private_repo_non_owner_gets_404(
419 client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str]
420 ) -> None:
421 repo = await create_repo(db_session, owner="gabriel", visibility="private")
422 resp = await client.post(
423 f"/gabriel/{repo.slug}/fetch/presign",
424 content=msgpack.packb({"want": [], "have": []}, use_bin_type=True),
425 headers={**wire_headers, "Content-Type": "application/x-msgpack"},
426 )
427 assert resp.status_code == 404
428
429
430 # ---------------------------------------------------------------------------
431 # FP10 — blob_urls keys match exactly the new OIDs in the needed manifests
432 # ---------------------------------------------------------------------------
433
434 @pytest.mark.asyncio
435 async def test_fp10_blob_urls_keys_match_manifest_oids(db_session: AsyncSession) -> None:
436 """blob_urls must contain exactly the OIDs from needed commits, not more, not less."""
437 from musehub.services.musehub_wire import get_backend as _get_real_backend
438 repo = await create_repo(db_session, owner="gabriel", visibility="public")
439 real_backend = _get_real_backend()
440
441 manifest: dict[str, str] = {}
442 expected_oids: set[str] = set()
443 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
444 oid = blob_id(f"fp10-obj-{i}".encode())
445 await _store_object(db_session, repo.repo_id, oid, f"fp10-obj-{i}".encode())
446 manifest[f"file_{i}.dat"] = oid
447 expected_oids.add(oid)
448
449 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fp10")
450
451 class _FakeS3:
452 supports_presign = True
453 get = real_backend.get
454 exists = real_backend.exists
455 put = AsyncMock(return_value="s3://bucket/obj")
456
457 async def presign_get(self, oid: str, ttl: int) -> str:
458 return f"https://r2.example.com/{oid}?sig=fp10"
459
460 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
461
462 with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()):
463 result = await wire_fetch_presign(db_session, repo.repo_id, req)
464
465 assert result["presign"] is True
466 assert set(result["blob_urls"].keys()) == expected_oids
467
468
469 # ===========================================================================
470 # Security tests
471 # ===========================================================================
472
473 # ---------------------------------------------------------------------------
474 # FPS0 — TTL forwarded verbatim to presign_get; never negative or zero
475 # ---------------------------------------------------------------------------
476
477 @pytest.mark.asyncio
478 async def test_fps0_ttl_forwarded_to_presign_get(db_session: AsyncSession) -> None:
479 """presign_get must receive the exact ttl_seconds argument; cannot produce negative TTL."""
480 from musehub.services.musehub_wire import get_backend as _get_real_backend
481 repo = await create_repo(db_session, owner="gabriel", visibility="public")
482 real_backend = _get_real_backend()
483
484 manifest: dict[str, str] = {}
485 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
486 oid = blob_id(f"fps0-{i}".encode())
487 await _store_object(db_session, repo.repo_id, oid, f"fps0-{i}".encode())
488 manifest[f"f{i}.dat"] = oid
489 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fps0")
490
491 received_ttls: list[int] = []
492
493 class _RecordTTL:
494 supports_presign = True
495 get = real_backend.get
496 exists = real_backend.exists
497 put = AsyncMock(return_value="s3://bucket/obj")
498
499 async def presign_get(self, oid: str, ttl: int) -> str:
500 received_ttls.append(ttl)
501 assert ttl > 0, "TTL must be positive"
502 return f"https://r2.example.com/{oid}?ttl={ttl}"
503
504 custom_ttl = 1800
505 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
506
507 _record_ttl = _RecordTTL()
508 with patch("musehub.services.musehub_wire.get_backend", return_value=_record_ttl), \
509 patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_record_ttl), \
510 patch("musehub.services.musehub_wire_shared.get_backend", return_value=_record_ttl):
511 result = await wire_fetch_presign(db_session, repo.repo_id, req, ttl_seconds=custom_ttl)
512
513 assert result["presign"] is True
514 assert all(t == custom_ttl for t in received_ttls), "All presign_get calls must use the custom TTL"
515 assert len(received_ttls) == FETCH_PRESIGN_OBJECT_THRESHOLD
516
517
518 # ---------------------------------------------------------------------------
519 # FPS1 — presign_get never called for have-set OIDs
520 # ---------------------------------------------------------------------------
521
522 @pytest.mark.asyncio
523 async def test_fps1_presign_get_not_called_for_have_oids(db_session: AsyncSession) -> None:
524 """presign_get must not be invoked for objects the client already has.
525
526 Commit A carries base_oid (client already has commit A). Commit B adds
527 THRESHOLD new objects. The delta is exactly those THRESHOLD objects —
528 large enough to trigger presign. base_oid must not be presigned.
529 """
530 from musehub.services.musehub_wire import get_backend as _get_real_backend
531 repo = await create_repo(db_session, owner="gabriel", visibility="public")
532 real_backend = _get_real_backend()
533
534 # Commit A: one object that the client already has.
535 base_oid = blob_id(b"fps1-base")
536 await _store_object(db_session, repo.repo_id, base_oid, b"fps1-base")
537 commit_a, _ = await _make_commit(
538 db_session, repo.repo_id, manifest={"base.dat": base_oid}, seed="fps1a",
539 )
540
541 # Commit B: THRESHOLD new objects (delta large enough to trigger presign).
542 manifest_b: dict[str, str] = {"base.dat": base_oid}
543 new_oids: set[str] = set()
544 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
545 oid = blob_id(f"fps1-new-{i}".encode())
546 await _store_object(db_session, repo.repo_id, oid, f"fps1-new-{i}".encode())
547 manifest_b[f"new_{i}.dat"] = oid
548 new_oids.add(oid)
549 commit_b, _ = await _make_commit(
550 db_session, repo.repo_id, manifest=manifest_b,
551 seed="fps1b", parent_ids=[commit_a.commit_id],
552 )
553
554 presigned_oids: set[str] = set()
555
556 class _TrackCalls:
557 supports_presign = True
558 get = real_backend.get
559 exists = real_backend.exists
560 put = AsyncMock(return_value="s3://bucket/obj")
561
562 async def presign_get(self, oid: str, ttl: int) -> str:
563 presigned_oids.add(oid)
564 return f"https://r2.example.com/{oid}?sig=fps1"
565
566 # Client has commit_a — only the THRESHOLD new objects are delta.
567 req = WireFetchRequest(want=[commit_b.commit_id], have=[commit_a.commit_id], depth=None)
568
569 _track = _TrackCalls()
570 with patch("musehub.services.musehub_wire.get_backend", return_value=_track), \
571 patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_track), \
572 patch("musehub.services.musehub_wire_shared.get_backend", return_value=_track):
573 result = await wire_fetch_presign(db_session, repo.repo_id, req)
574
575 assert result["presign"] is True
576 assert base_oid not in presigned_oids, "base OID known by client must not be presigned"
577 assert new_oids == presigned_oids, "exactly the new OIDs must be presigned"
578 assert result["blob_count"] == FETCH_PRESIGN_OBJECT_THRESHOLD
579
580
581 # ---------------------------------------------------------------------------
582 # FPS2 — private repo returns 404 to non-owner authenticated user
583 # ---------------------------------------------------------------------------
584
585 @pytest.mark.asyncio
586 async def test_fps2_private_repo_404_for_non_owner(
587 client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str],
588 ) -> None:
589 """Authenticated non-owner must not discover a private repo via fetch/presign."""
590 # wire_headers injects handle="test-user-wire"; repo owner is "gabriel" — different user.
591 repo = await create_repo(db_session, owner="gabriel", visibility="private")
592 resp = await client.post(
593 f"/gabriel/{repo.slug}/fetch/presign",
594 content=msgpack.packb({"want": [], "have": []}, use_bin_type=True),
595 headers={**wire_headers, "Content-Type": "application/x-msgpack"},
596 )
597 assert resp.status_code == 404
598
599
600 # ---------------------------------------------------------------------------
601 # FPS3 — blob_urls count equals new_oids count, no leakage
602 # ---------------------------------------------------------------------------
603
604 @pytest.mark.asyncio
605 async def test_fps3_blob_urls_no_extras(db_session: AsyncSession) -> None:
606 """blob_urls must contain exactly as many keys as new OIDs — no leakage."""
607 from musehub.services.musehub_wire import get_backend as _get_real_backend
608 repo = await create_repo(db_session, owner="gabriel", visibility="public")
609 real_backend = _get_real_backend()
610
611 manifest: dict[str, str] = {}
612 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
613 oid = blob_id(f"fps3-{i}".encode())
614 await _store_object(db_session, repo.repo_id, oid, f"fps3-{i}".encode())
615 manifest[f"f{i}.dat"] = oid
616 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fps3")
617
618 class _FakeS3:
619 supports_presign = True
620 get = real_backend.get
621 exists = real_backend.exists
622 put = AsyncMock(return_value="s3://bucket/obj")
623
624 async def presign_get(self, oid: str, ttl: int) -> str:
625 return f"https://r2.example.com/{oid}?sig=fps3"
626
627 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
628 with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()):
629 result = await wire_fetch_presign(db_session, repo.repo_id, req)
630
631 assert result["presign"] is True
632 assert len(result["blob_urls"]) == result["blob_count"]
633 assert set(result["blob_urls"].keys()) == set(manifest.values())
634
635
636 # ===========================================================================
637 # Performance tests
638 # ===========================================================================
639
640 # ---------------------------------------------------------------------------
641 # FPP0 — all presign_get calls complete when N > semaphore limit (50)
642 # ---------------------------------------------------------------------------
643
644 @pytest.mark.asyncio
645 async def test_fpp0_all_presign_calls_complete_above_semaphore_limit(
646 db_session: AsyncSession,
647 ) -> None:
648 """asyncio.gather with Semaphore(50) must complete all N>50 presign calls."""
649 from musehub.services.musehub_wire import get_backend as _get_real_backend
650 repo = await create_repo(db_session, owner="gabriel", visibility="public")
651 real_backend = _get_real_backend()
652
653 n = 75 # deliberately above the semaphore limit of 50
654 manifest: dict[str, str] = {}
655 for i in range(n):
656 oid = blob_id(f"fpp0-{i}".encode())
657 await _store_object(db_session, repo.repo_id, oid, f"fpp0-{i}".encode())
658 manifest[f"f{i}.dat"] = oid
659
660 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpp0")
661 presign_count = 0
662
663 class _CountingS3:
664 supports_presign = True
665 get = real_backend.get
666 exists = real_backend.exists
667 put = AsyncMock(return_value="s3://bucket/obj")
668
669 async def presign_get(self, oid: str, ttl: int) -> str:
670 nonlocal presign_count
671 presign_count += 1
672 return f"https://r2.example.com/{oid}?sig=fpp0"
673
674 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
675 _counting_s3 = _CountingS3()
676 with patch("musehub.services.musehub_wire.get_backend", return_value=_counting_s3), \
677 patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_counting_s3), \
678 patch("musehub.services.musehub_wire_shared.get_backend", return_value=_counting_s3):
679 result = await wire_fetch_presign(db_session, repo.repo_id, req)
680
681 assert result["presign"] is True
682 assert presign_count == n, f"Expected {n} presign_get calls, got {presign_count}"
683 assert len(result["blob_urls"]) == n
684
685
686 # ---------------------------------------------------------------------------
687 # FPP1 — custom ttl_seconds appears in expires_at timestamp
688 # ---------------------------------------------------------------------------
689
690 @pytest.mark.asyncio
691 async def test_fpp1_custom_ttl_reflected_in_expires_at(db_session: AsyncSession) -> None:
692 """expires_at must be approximately now() + ttl_seconds."""
693 from musehub.services.musehub_wire import get_backend as _get_real_backend
694 import dateutil.parser
695
696 repo = await create_repo(db_session, owner="gabriel", visibility="public")
697 real_backend = _get_real_backend()
698
699 manifest: dict[str, str] = {}
700 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
701 oid = blob_id(f"fpp1-{i}".encode())
702 await _store_object(db_session, repo.repo_id, oid, f"fpp1-{i}".encode())
703 manifest[f"f{i}.dat"] = oid
704 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpp1")
705
706 class _FakeS3:
707 supports_presign = True
708 get = real_backend.get
709 exists = real_backend.exists
710 put = AsyncMock(return_value="s3://bucket/obj")
711
712 async def presign_get(self, oid: str, ttl: int) -> str:
713 return f"https://r2.example.com/{oid}?ttl={ttl}"
714
715 before = datetime.now(tz=timezone.utc)
716 custom_ttl = 300
717 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
718
719 with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()):
720 result = await wire_fetch_presign(db_session, repo.repo_id, req, ttl_seconds=custom_ttl)
721
722 after = datetime.now(tz=timezone.utc)
723 assert result["expires_at"] is not None
724 expires_dt = dateutil.parser.parse(result["expires_at"])
725 from datetime import timedelta
726 assert before + timedelta(seconds=custom_ttl - 5) <= expires_dt <= after + timedelta(seconds=custom_ttl + 5)
727
728
729 # ===========================================================================
730 # Stress / state integrity tests
731 # ===========================================================================
732
733 # ---------------------------------------------------------------------------
734 # FPST0 — 600 OIDs, every one appears in blob_urls
735 # ---------------------------------------------------------------------------
736
737 @pytest.mark.asyncio
738 async def test_fpst0_600_oids_all_presigned(db_session: AsyncSession) -> None:
739 """All 600 OIDs must appear in blob_urls — no dropped presign calls."""
740 from musehub.services.musehub_wire import get_backend as _get_real_backend
741 repo = await create_repo(db_session, owner="gabriel", visibility="public")
742 real_backend = _get_real_backend()
743
744 n = 600 # well above FETCH_PRESIGN_OBJECT_THRESHOLD (500)
745 manifest: dict[str, str] = {}
746 all_oids: set[str] = set()
747 for i in range(n):
748 oid = blob_id(f"fpst0-{i}".encode())
749 await _store_object(db_session, repo.repo_id, oid, f"fpst0-{i}".encode())
750 manifest[f"f{i}.dat"] = oid
751 all_oids.add(oid)
752 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpst0")
753
754 class _FakeS3:
755 supports_presign = True
756 get = real_backend.get
757 exists = real_backend.exists
758 put = AsyncMock(return_value="s3://bucket/obj")
759
760 async def presign_get(self, oid: str, ttl: int) -> str:
761 return f"https://r2.example.com/{oid}?sig=fpst0"
762
763 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
764
765 with patch("musehub.services.musehub_wire.get_backend", return_value=_FakeS3()):
766 result = await wire_fetch_presign(db_session, repo.repo_id, req)
767
768 assert result["presign"] is True
769 assert result["blob_count"] == n
770 missing = all_oids - set(result["blob_urls"].keys())
771 assert not missing, f"Missing presigned URLs for {len(missing)} OIDs"
772
773
774 # ---------------------------------------------------------------------------
775 # FPST1 — presign_get raising propagates the exception
776 # ---------------------------------------------------------------------------
777
778 @pytest.mark.asyncio
779 async def test_fpst1_presign_get_exception_propagates(db_session: AsyncSession) -> None:
780 """If presign_get raises, wire_fetch_presign must propagate — no silent partial failure."""
781 from musehub.services.musehub_wire import get_backend as _get_real_backend
782 repo = await create_repo(db_session, owner="gabriel", visibility="public")
783 real_backend = _get_real_backend()
784
785 manifest: dict[str, str] = {}
786 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
787 oid = blob_id(f"fpst1-{i}".encode())
788 await _store_object(db_session, repo.repo_id, oid, f"fpst1-{i}".encode())
789 manifest[f"f{i}.dat"] = oid
790 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpst1")
791
792 call_count = 0
793
794 class _FailingS3:
795 supports_presign = True
796 get = real_backend.get
797 exists = real_backend.exists
798 put = AsyncMock(return_value="s3://bucket/obj")
799
800 async def presign_get(self, oid: str, ttl: int) -> str:
801 nonlocal call_count
802 call_count += 1
803 if call_count == 3:
804 raise RuntimeError("R2 presign service unavailable")
805 return f"https://r2.example.com/{oid}?sig=fpst1"
806
807 req = WireFetchRequest(want=[commit.commit_id], have=[], depth=None)
808
809 _failing_s3 = _FailingS3()
810 with patch("musehub.services.musehub_wire.get_backend", return_value=_failing_s3), \
811 patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_failing_s3), \
812 patch("musehub.services.musehub_wire_shared.get_backend", return_value=_failing_s3), \
813 pytest.raises(RuntimeError, match="R2 presign service unavailable"):
814 await wire_fetch_presign(db_session, repo.repo_id, req)
815
816
817 # ===========================================================================
818 # End-to-end tests
819 # ===========================================================================
820
821 # ---------------------------------------------------------------------------
822 # FPE0 — full HTTP route with mocked S3 → msgpack response correct
823 # ---------------------------------------------------------------------------
824
825 @pytest.mark.asyncio
826 async def test_fpe0_route_presign_true_full_response(
827 client: AsyncClient, db_session: AsyncSession, wire_headers: dict[str, str],
828 ) -> None:
829 """Full HTTP round-trip: route → service → FakeS3 → msgpack response with blob_urls."""
830 from musehub.services.musehub_wire import get_backend as _get_real_backend
831 real_backend = _get_real_backend()
832
833 repo = await create_repo(db_session, owner="gabriel", visibility="public")
834 manifest: dict[str, str] = {}
835 all_oids: set[str] = set()
836 for i in range(FETCH_PRESIGN_OBJECT_THRESHOLD):
837 oid = blob_id(f"fpe0-{i}".encode())
838 await _store_object(db_session, repo.repo_id, oid, f"fpe0-{i}".encode())
839 manifest[f"f{i}.dat"] = oid
840 all_oids.add(oid)
841 commit, _ = await _make_commit(db_session, repo.repo_id, manifest=manifest, seed="fpe0")
842
843 class _FakeS3:
844 supports_presign = True
845 get = real_backend.get
846 exists = real_backend.exists
847 put = AsyncMock(return_value="s3://bucket/obj")
848
849 async def presign_get(self, oid: str, ttl: int) -> str:
850 return f"https://r2.example.com/{oid}?sig=fpe0&ttl={ttl}"
851
852 _fake_s3 = _FakeS3()
853 with patch("musehub.services.musehub_wire.get_backend", return_value=_fake_s3), \
854 patch("musehub.services.musehub_wire_fetch.get_backend", return_value=_fake_s3), \
855 patch("musehub.services.musehub_wire_shared.get_backend", return_value=_fake_s3):
856 resp = await client.post(
857 f"/gabriel/{repo.slug}/fetch/presign",
858 content=msgpack.packb(
859 {"want": [commit.commit_id], "have": []}, use_bin_type=True
860 ),
861 headers={**wire_headers, "Content-Type": "application/x-msgpack"},
862 )
863
864 assert resp.status_code == 200
865 data = msgpack.unpackb(resp.content, raw=False)
866
867 assert data["presign"] is True
868 assert set(data["blob_urls"].keys()) == all_oids
869 for oid, url in data["blob_urls"].items():
870 assert url.startswith("https://r2.example.com/"), f"Unexpected URL: {url}"
871 assert oid in url, "OID must appear in its own presigned URL"
872 assert data["commit_count"] == 1
873 assert data["blob_count"] == FETCH_PRESIGN_OBJECT_THRESHOLD
874 assert data["expires_at"] is not None
875 assert data["repo_id"] == repo.repo_id
876 # No legacy mpack URL field
877 assert "url" not in data or data.get("url") is None
File History 1 commit
sha256:ab9eda7b6479e1c35cdba9a54f62bacd2825de8faacec3ba67a9a8ef45914b7d fix: migration and wire protocol alignment Sonnet 4.6 minor 20 days ago