gabriel / musehub public
test_wire_mpack_presign_step1_tiers4567.py python
427 lines 15.4 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """Push Protocol Step 1 — Tiers 4–7: stress, integrity, performance, security.
2
3 Tier 4 — Stress: concurrent presign requests don't corrupt the quota counter.
4 Tier 5 — Data integrity: DB schema invariants on musehub_daily_push_bytes.
5 Tier 6 — Performance: presign endpoint latency gate with index coverage.
6 Tier 7 — Security: malformed keys, cross-user quota isolation, unauthenticated access.
7 """
8 from __future__ import annotations
9
10 import asyncio
11 import datetime
12 import time
13
14 import msgpack
15 import pytest
16 import pytest_asyncio
17 from httpx import AsyncClient, ASGITransport
18 from sqlalchemy import select, func, text
19 from sqlalchemy.ext.asyncio import AsyncSession
20
21 from muse.core.mpack import build_presign_payload
22 from muse.core.types import blob_id
23 from musehub.auth.dependencies import require_valid_token
24 from musehub.auth.request_signing import MSignContext
25 from musehub.config import get_settings
26 from musehub.core.genesis import compute_identity_id
27 import typing
28 from sqlalchemy.ext.asyncio import async_sessionmaker
29 from musehub.db.database import get_db
30 from musehub.db.musehub_abuse_models import MusehubDailyPushBytes
31 from musehub.db.musehub_repo_models import MusehubRepo
32 from musehub.main import app
33 from musehub.services.musehub_repository import create_repo
34
35 _FAKE_UPLOAD_URL = "https://minio.example.com/mpacks/sha256:fake?sig=presigned"
36 _OWNER = "gabriel"
37 _IDENTITY_ID = compute_identity_id(b"gabriel")
38 _OTHER_IDENTITY_ID = compute_identity_id(b"aria")
39 _REPO_NAME = "step1-tiers-test"
40
41 _AUTH_CTX = MSignContext(
42 handle=_OWNER,
43 identity_id=_IDENTITY_ID,
44 is_agent=False,
45 is_admin=False,
46 )
47 _OTHER_AUTH_CTX = MSignContext(
48 handle="aria",
49 identity_id=_OTHER_IDENTITY_ID,
50 is_agent=False,
51 is_admin=False,
52 )
53
54
55 # ---------------------------------------------------------------------------
56 # Fixtures
57 # ---------------------------------------------------------------------------
58
59 @pytest_asyncio.fixture()
60 async def repo(db_session: AsyncSession) -> MusehubRepo:
61 r = await create_repo(
62 db_session,
63 name=_REPO_NAME,
64 owner=_OWNER,
65 owner_user_id=_IDENTITY_ID,
66 visibility="public",
67 initialize=False,
68 )
69 await db_session.commit()
70 return r
71
72
73 @pytest_asyncio.fixture(autouse=True)
74 async def mock_presign_put() -> None:
75 from unittest.mock import AsyncMock, MagicMock, patch
76 mock_backend = MagicMock()
77 mock_backend.presign_mpack_put = AsyncMock(return_value=_FAKE_UPLOAD_URL)
78 with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
79 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
80 yield mock_backend
81
82
83 def _make_client(db_session: AsyncSession, auth_ctx: MSignContext = _AUTH_CTX) -> None:
84 async def _override_db() -> None:
85 yield db_session
86 app.dependency_overrides[get_db] = _override_db
87 app.dependency_overrides[require_valid_token] = lambda: auth_ctx
88 return AsyncClient(transport=ASGITransport(app=app), base_url="https://localhost:1337")
89
90
91 def _body(mpack_bytes: bytes) -> bytes:
92 return msgpack.packb(build_presign_payload(mpack_bytes), use_bin_type=True)
93
94
95 async def _presign(client: AsyncClient, mpack_bytes: bytes) -> int:
96 resp = await client.post(
97 f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign",
98 content=_body(mpack_bytes),
99 headers={"Content-Type": "application/x-msgpack"},
100 )
101 return resp.status_code
102
103
104 # ---------------------------------------------------------------------------
105 # Tier 4 — Stress
106 # ---------------------------------------------------------------------------
107
108 @pytest.mark.tier4
109 @pytest.mark.asyncio
110 async def test_t4_concurrent_quota_writes_not_corrupted(
111 repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession],
112 ) -> None:
113 """20 concurrent record_mpack_bytes_uploaded calls → quota row equals sum of all sizes.
114
115 Tests the upsert concurrency directly — this is the critical DB operation.
116 """
117 from musehub.services.musehub_wire import record_mpack_bytes_uploaded
118
119 settings = get_settings()
120 if settings.mpack_daily_upload_limit_bytes <= 0:
121 pytest.skip("daily quota disabled")
122
123 chunk = 1000
124 n = 20
125
126 async def _do() -> None:
127 async with session_factory() as sess:
128 await record_mpack_bytes_uploaded(sess, _IDENTITY_ID, chunk)
129 await sess.commit()
130
131 await asyncio.gather(*[_do() for _ in range(n)])
132
133 today = datetime.date.today()
134 async with session_factory() as check_sess:
135 result = await check_sess.execute(
136 select(func.coalesce(func.sum(MusehubDailyPushBytes.bytes_uploaded), 0)).where(
137 MusehubDailyPushBytes.identity_id == _IDENTITY_ID,
138 MusehubDailyPushBytes.date == today,
139 )
140 )
141 recorded = int(result.scalar())
142
143 expected = n * chunk
144 assert recorded == expected, f"concurrent quota corrupted: got {recorded}, expected {expected}"
145
146
147 @pytest.mark.tier4
148 @pytest.mark.asyncio
149 async def test_t4_concurrent_distinct_quota_writes_all_land(
150 repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession],
151 ) -> None:
152 """20 concurrent quota writes for different identities all land without error."""
153 from musehub.services.musehub_wire import record_mpack_bytes_uploaded
154
155 n = 20
156
157 async def _do(i: int) -> None:
158 identity = compute_identity_id(f"stress-user-{i}".encode())
159 async with session_factory() as sess:
160 await record_mpack_bytes_uploaded(sess, identity, 512)
161 await sess.commit()
162
163 await asyncio.gather(*[_do(i) for i in range(n)])
164
165 today = datetime.date.today()
166 async with session_factory() as check_sess:
167 result = await check_sess.execute(
168 select(func.count()).select_from(MusehubDailyPushBytes).where(
169 MusehubDailyPushBytes.date == today,
170 MusehubDailyPushBytes.bytes_uploaded == 512,
171 )
172 )
173 count = result.scalar()
174
175 assert count >= n, f"expected {n} rows, got {count}"
176
177
178 # ---------------------------------------------------------------------------
179 # Tier 5 — Data integrity
180 # ---------------------------------------------------------------------------
181
182 @pytest.mark.tier5
183 @pytest.mark.asyncio
184 async def test_t5_quota_pk_is_identity_and_date(db_session: AsyncSession, repo: MusehubRepo) -> None:
185 """(identity_id, date) is the PK — two rows for the same identity+date raise IntegrityError."""
186 import sqlalchemy.exc
187
188 today = datetime.date.today()
189 now = datetime.datetime.now(datetime.timezone.utc)
190 row_a = MusehubDailyPushBytes(
191 identity_id=_IDENTITY_ID, date=today, bytes_uploaded=100, updated_at=now,
192 )
193 row_b = MusehubDailyPushBytes(
194 identity_id=_IDENTITY_ID, date=today, bytes_uploaded=200, updated_at=now,
195 )
196 db_session.add(row_a)
197 await db_session.commit()
198 db_session.expunge(row_a) # remove from identity map so adding row_b with same PK doesn't warn
199 db_session.add(row_b)
200 with pytest.raises(sqlalchemy.exc.IntegrityError):
201 await db_session.commit()
202
203
204 @pytest.mark.tier5
205 @pytest.mark.asyncio
206 async def test_t5_different_dates_different_rows(db_session: AsyncSession, repo: MusehubRepo) -> None:
207 """Same identity on two different dates produces two separate rows."""
208 now = datetime.datetime.now(datetime.timezone.utc)
209 today = datetime.date.today()
210 yesterday = today - datetime.timedelta(days=1)
211
212 db_session.add(MusehubDailyPushBytes(
213 identity_id=_IDENTITY_ID, date=today, bytes_uploaded=100, updated_at=now,
214 ))
215 db_session.add(MusehubDailyPushBytes(
216 identity_id=_IDENTITY_ID, date=yesterday, bytes_uploaded=50, updated_at=now,
217 ))
218 await db_session.commit()
219
220 result = await db_session.execute(
221 select(func.count()).select_from(MusehubDailyPushBytes).where(
222 MusehubDailyPushBytes.identity_id == _IDENTITY_ID,
223 )
224 )
225 assert result.scalar() == 2
226
227
228 @pytest.mark.tier5
229 @pytest.mark.asyncio
230 async def test_t5_different_identities_isolated(db_session: AsyncSession, repo: MusehubRepo) -> None:
231 """Two identities have independent quota rows — one does not bleed into the other."""
232 now = datetime.datetime.now(datetime.timezone.utc)
233 today = datetime.date.today()
234
235 db_session.add(MusehubDailyPushBytes(
236 identity_id=_IDENTITY_ID, date=today, bytes_uploaded=1000, updated_at=now,
237 ))
238 db_session.add(MusehubDailyPushBytes(
239 identity_id=_OTHER_IDENTITY_ID, date=today, bytes_uploaded=500, updated_at=now,
240 ))
241 await db_session.commit()
242
243 res_a = await db_session.execute(
244 select(MusehubDailyPushBytes.bytes_uploaded).where(
245 MusehubDailyPushBytes.identity_id == _IDENTITY_ID,
246 MusehubDailyPushBytes.date == today,
247 )
248 )
249 res_b = await db_session.execute(
250 select(MusehubDailyPushBytes.bytes_uploaded).where(
251 MusehubDailyPushBytes.identity_id == _OTHER_IDENTITY_ID,
252 MusehubDailyPushBytes.date == today,
253 )
254 )
255 assert res_a.scalar() == 1000
256 assert res_b.scalar() == 500
257
258
259 @pytest.mark.tier5
260 @pytest.mark.asyncio
261 async def test_t5_upsert_accumulates_not_overwrites(db_session: AsyncSession, repo: MusehubRepo) -> None:
262 """record_mpack_bytes_uploaded upserts — repeated calls accumulate, not overwrite."""
263 from musehub.services.musehub_wire import record_mpack_bytes_uploaded
264
265 today = datetime.date.today()
266 await record_mpack_bytes_uploaded(db_session, _IDENTITY_ID, 300)
267 await db_session.commit()
268 await record_mpack_bytes_uploaded(db_session, _IDENTITY_ID, 200)
269 await db_session.commit()
270
271 result = await db_session.execute(
272 select(MusehubDailyPushBytes.bytes_uploaded).where(
273 MusehubDailyPushBytes.identity_id == _IDENTITY_ID,
274 MusehubDailyPushBytes.date == today,
275 )
276 )
277 assert result.scalar() == 500
278
279
280 # ---------------------------------------------------------------------------
281 # Tier 6 — Performance
282 # ---------------------------------------------------------------------------
283
284 @pytest.mark.tier6
285 @pytest.mark.asyncio
286 async def test_t6_presign_latency_under_50ms(db_session: AsyncSession, repo: MusehubRepo) -> None:
287 """Presign endpoint (no MinIO round-trip, stub backend) completes in < 50ms."""
288 async with _make_client(db_session) as client:
289 # Warm-up — exclude connection setup from gate
290 await client.post(
291 f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign",
292 content=_body(b"warmup"),
293 headers={"Content-Type": "application/x-msgpack"},
294 )
295
296 t0 = time.perf_counter()
297 resp = await client.post(
298 f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign",
299 content=_body(b"perf-mpack-bytes" * 64),
300 headers={"Content-Type": "application/x-msgpack"},
301 )
302 elapsed_ms = (time.perf_counter() - t0) * 1000
303
304 app.dependency_overrides.clear()
305 assert resp.status_code == 200
306 assert elapsed_ms < 50, f"presign took {elapsed_ms:.1f}ms — gate is 50ms"
307
308
309 @pytest.mark.tier6
310 @pytest.mark.asyncio
311 async def test_t6_quota_query_uses_index(db_session: AsyncSession, repo: MusehubRepo) -> None:
312 """EXPLAIN on the quota SUM query references ix_daily_push_bytes_identity_date."""
313 today = datetime.date.today()
314 plan = await db_session.execute(text(
315 "EXPLAIN SELECT COALESCE(SUM(bytes_uploaded), 0) "
316 "FROM musehub_daily_push_bytes "
317 f"WHERE identity_id = '{_IDENTITY_ID}' AND date = '{today}'"
318 ))
319 plan_text = "\n".join(row[0] for row in plan)
320 assert "Index" in plan_text or "index" in plan_text, (
321 f"quota query not using an index:\n{plan_text}"
322 )
323
324
325 # ---------------------------------------------------------------------------
326 # Tier 7 — Security
327 # ---------------------------------------------------------------------------
328
329 @pytest.mark.tier7
330 @pytest.mark.asyncio
331 async def test_t7_malformed_mpack_key_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
332 """mpack_key without sha256: prefix is rejected with 422."""
333 body = msgpack.packb(
334 {"mpack_key": "not-a-real-key", "size_bytes": 100},
335 use_bin_type=True,
336 )
337 async with _make_client(db_session) as client:
338 resp = await client.post(
339 f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign",
340 content=body,
341 headers={"Content-Type": "application/x-msgpack"},
342 )
343 app.dependency_overrides.clear()
344 assert resp.status_code == 422, f"expected 422 for malformed key, got {resp.status_code}"
345
346
347 @pytest.mark.tier7
348 @pytest.mark.asyncio
349 async def test_t7_cross_user_quota_isolation(db_session: AsyncSession, repo: MusehubRepo) -> None:
350 """User A's presign does not consume User B's quota."""
351 settings = get_settings()
352 if settings.mpack_daily_upload_limit_bytes <= 0:
353 pytest.skip("daily quota disabled")
354
355 today = datetime.date.today()
356 mpack_bytes = b"y" * 2048
357
358 async with _make_client(db_session, _AUTH_CTX) as client:
359 resp = await client.post(
360 f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign",
361 content=_body(mpack_bytes),
362 headers={"Content-Type": "application/x-msgpack"},
363 )
364 app.dependency_overrides.clear()
365 assert resp.status_code == 200
366
367 result = await db_session.execute(
368 select(func.coalesce(func.sum(MusehubDailyPushBytes.bytes_uploaded), 0)).where(
369 MusehubDailyPushBytes.identity_id == _OTHER_IDENTITY_ID,
370 MusehubDailyPushBytes.date == today,
371 )
372 )
373 aria_bytes = int(result.scalar())
374 assert aria_bytes == 0, f"aria's quota was touched: {aria_bytes} bytes"
375
376
377 @pytest.mark.tier7
378 @pytest.mark.asyncio
379 async def test_t7_unauthenticated_request_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
380 """No auth header → 401 before any quota or presign logic runs."""
381 async def _override_db() -> None:
382 yield db_session
383 app.dependency_overrides[get_db] = _override_db
384 # no require_valid_token override — real enforcement
385
386 today = datetime.date.today()
387 async with AsyncClient(
388 transport=ASGITransport(app=app),
389 base_url="https://localhost:1337",
390 ) as client:
391 resp = await client.post(
392 f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign",
393 content=_body(b"should not reach quota logic"),
394 headers={"Content-Type": "application/x-msgpack"},
395 )
396 app.dependency_overrides.clear()
397
398 assert resp.status_code in (401, 403)
399
400 # Quota row must not exist — auth rejected before any DB write
401 result = await db_session.execute(
402 select(func.count()).select_from(MusehubDailyPushBytes).where(
403 MusehubDailyPushBytes.identity_id == _IDENTITY_ID,
404 MusehubDailyPushBytes.date == today,
405 )
406 )
407 assert result.scalar() == 0, "quota row written for unauthenticated request"
408
409
410 @pytest.mark.tier7
411 @pytest.mark.asyncio
412 async def test_t7_oversized_key_field_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
413 """A pathologically long mpack_key string is rejected, not stored."""
414 body = msgpack.packb(
415 {"mpack_key": "sha256:" + "a" * 10_000, "size_bytes": 100},
416 use_bin_type=True,
417 )
418 async with _make_client(db_session) as client:
419 resp = await client.post(
420 f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign",
421 content=body,
422 headers={"Content-Type": "application/x-msgpack"},
423 )
424 app.dependency_overrides.clear()
425 assert resp.status_code in (400, 422), (
426 f"expected 4xx for oversized key, got {resp.status_code}"
427 )
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 9 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 12 days ago