gabriel / musehub public
test_wire_mpack_unpack_step3_tiers4567.py python
515 lines 20.2 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """Push Protocol Step 3 (unpack-mpack) — Tiers 4–7: stress, integrity, performance, security.
2
3 Tier 4 — Stress: concurrent wire_push_unpack_mpack calls all land without errors.
4 Tier 5 — Data integrity: DB invariants on MusehubBackgroundJob and MusehubBranch.
5 Tier 6 — Performance: unpack endpoint latency gate.
6 Tier 7 — Security: malformed keys, oversized keys, unauthenticated access.
7 """
8 from __future__ import annotations
9
10 import asyncio
11 import time
12
13 import msgpack
14 import pytest
15 import pytest_asyncio
16 from httpx import AsyncClient, ASGITransport
17 from sqlalchemy import select, func, text
18 from sqlalchemy.ext.asyncio import AsyncSession
19
20 from muse.core.mpack import build_wire_mpack
21 from muse.core.types import blob_id, fake_id
22 from musehub.auth.dependencies import require_valid_token
23 from musehub.auth.request_signing import MSignContext
24 from musehub.config import get_settings
25 from musehub.core.genesis import compute_identity_id
26 from musehub.db.database import get_db
27 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
28 import typing
29 from sqlalchemy.ext.asyncio import async_sessionmaker
30 from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo
31 from musehub.main import app
32 from musehub.services.musehub_repository import create_repo
33
34 _OWNER = "gabriel"
35 _IDENTITY_ID = compute_identity_id(b"gabriel")
36 _OTHER_IDENTITY_ID = compute_identity_id(b"aria")
37 _REPO_NAME = "step3-tiers-test"
38
39 _MPACK_BYTES = build_wire_mpack({"blobs": [], "commits": [], "snapshots": []})
40 _MPACK_KEY = blob_id(_MPACK_BYTES)
41 _HEAD = fake_id("step3-tiers-tip")
42
43 _AUTH_CTX = MSignContext(
44 handle=_OWNER,
45 identity_id=_IDENTITY_ID,
46 is_agent=False,
47 is_admin=False,
48 )
49
50
51 # ---------------------------------------------------------------------------
52 # Fixtures
53 # ---------------------------------------------------------------------------
54
55 @pytest_asyncio.fixture()
56 async def repo(db_session: AsyncSession) -> MusehubRepo:
57 r = await create_repo(
58 db_session,
59 name=_REPO_NAME,
60 owner=_OWNER,
61 owner_user_id=_IDENTITY_ID,
62 visibility="public",
63 initialize=False,
64 )
65 await db_session.commit()
66 return r
67
68
69 @pytest_asyncio.fixture(autouse=True)
70 async def mock_get_mpack() -> None:
71 from unittest.mock import AsyncMock, MagicMock, patch
72 mock_backend = MagicMock()
73 mock_backend.get_mpack = AsyncMock(return_value=_MPACK_BYTES)
74 mock_backend.put = AsyncMock(return_value="s3://muse-objects/objects/fake")
75 mock_backend.put_mpack = AsyncMock(return_value=None)
76 mock_backend.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack?sig=x")
77 # Patch all entry points — module-level imports and the canonical backends module
78 # (deferred imports pick up musehub.storage.backends.get_backend at call time)
79 with patch("musehub.storage.backends.get_backend", return_value=mock_backend), \
80 patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
81 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend), \
82 patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend):
83 yield mock_backend
84
85
86 def _make_client(db_session: AsyncSession, auth_ctx: MSignContext = _AUTH_CTX) -> None:
87 async def _override_db() -> None:
88 yield db_session
89 app.dependency_overrides[get_db] = _override_db
90 app.dependency_overrides[require_valid_token] = lambda: auth_ctx
91 return AsyncClient(transport=ASGITransport(app=app), base_url="https://localhost:1337")
92
93
94 def _body(
95 mpack_key: str = _MPACK_KEY,
96 branch: str = "main",
97 head: str = "",
98 commits_count: int = 1,
99 blobs_count: int = 2,
100 ) -> bytes:
101 return msgpack.packb(
102 {
103 "mpack_key": mpack_key,
104 "branch": branch,
105 "head": head,
106 "commits_count": commits_count,
107 "blobs_count": blobs_count,
108 },
109 use_bin_type=True,
110 )
111
112
113 async def _unpack(client: AsyncClient, repo_name: str = _REPO_NAME, **kwargs: typing.Any) -> int:
114 resp = await client.post(
115 f"/{_OWNER}/{repo_name}/push/unpack-mpack",
116 content=_body(**kwargs),
117 headers={"Content-Type": "application/x-msgpack"},
118 )
119 return resp.status_code
120
121
122 # ---------------------------------------------------------------------------
123 # Tier 4 — Stress
124 # ---------------------------------------------------------------------------
125
126 @pytest.mark.tier4
127 @pytest.mark.asyncio
128 async def test_t4_concurrent_unpack_calls_all_succeed(
129 repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession],
130 ) -> None:
131 """20 concurrent wire_push_unpack_mpack calls with distinct mpack_keys all succeed.
132
133 Tests inline processing under concurrency — every call must complete without
134 deadlocking. Phase 3: all processing is inline; no background job rows.
135 """
136 from musehub.services.musehub_wire import wire_push_unpack_mpack
137
138 n = 20
139
140 async def _do(i: int) -> None:
141 # Each call gets a unique mpack_key so job_id is unique
142 content = f"object-{i}".encode()
143 oid = blob_id(content)
144 unique_bytes = build_wire_mpack(
145 {"blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": []}
146 )
147 unique_key = blob_id(unique_bytes)
148 async with session_factory() as sess:
149 # Override get_mpack to return the correct bytes for this key
150 from unittest.mock import AsyncMock, MagicMock, patch
151 mock_backend = MagicMock()
152 mock_backend.get_mpack = AsyncMock(return_value=unique_bytes)
153 mock_backend.put = AsyncMock(return_value="s3://muse-objects/objects/fake")
154 mock_backend.put_mpack = AsyncMock(return_value=None)
155 mock_backend.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack?sig=x")
156 with patch("musehub.storage.backends.get_backend", return_value=mock_backend), \
157 patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
158 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
159 await wire_push_unpack_mpack(
160 sess, repo.repo_id, unique_key, _OWNER,
161 branch="main", head_commit_id="", commits_count=1, blobs_count=1,
162 )
163 await sess.commit()
164
165 # If any call raises, gather re-raises immediately — no further assertion needed.
166 await asyncio.gather(*[_do(i) for i in range(n)])
167
168
169 @pytest.mark.tier4
170 @pytest.mark.asyncio
171 async def test_t4_concurrent_branch_updates_for_same_branch(
172 repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession],
173 ) -> None:
174 """10 concurrent calls advancing the same branch serialise without data corruption.
175
176 The service uses WITH FOR UPDATE on the branch row — concurrent updates
177 must not produce two branch rows for the same name. Pre-create the branch
178 so all concurrent calls hit the UPDATE path (avoiding the INSERT race).
179 """
180 from musehub.services.musehub_wire import wire_push_unpack_mpack
181 from musehub.core.genesis import compute_branch_id
182
183 # Pre-create the branch so concurrent calls all SELECT-for-update and UPDATE
184 async with session_factory() as seed_sess:
185 seed_sess.add(MusehubBranch(
186 branch_id=compute_branch_id(repo.repo_id, "concurrent-branch"),
187 repo_id=repo.repo_id,
188 name="concurrent-branch",
189 head_commit_id=fake_id("seed-head"),
190 ))
191 await seed_sess.commit()
192
193 n = 10
194
195 async def _do(i: int) -> None:
196 content = f"seq-{i}".encode()
197 oid = blob_id(content)
198 unique_bytes = build_wire_mpack(
199 {"blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": []}
200 )
201 unique_key = blob_id(unique_bytes)
202 head = fake_id(f"commit-{i}")
203 async with session_factory() as sess:
204 from unittest.mock import AsyncMock, MagicMock, patch
205 mock_backend = MagicMock()
206 mock_backend.get_mpack = AsyncMock(return_value=unique_bytes)
207 mock_backend.put = AsyncMock(return_value="s3://muse-objects/objects/fake")
208 mock_backend.put_mpack = AsyncMock(return_value=None)
209 mock_backend.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack?sig=x")
210 with patch("musehub.storage.backends.get_backend", return_value=mock_backend), \
211 patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
212 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
213 await wire_push_unpack_mpack(
214 sess, repo.repo_id, unique_key, _OWNER,
215 branch="concurrent-branch", head_commit_id=head,
216 commits_count=1, blobs_count=0, force=True,
217 )
218 await sess.commit()
219
220 await asyncio.gather(*[_do(i) for i in range(n)])
221
222 async with session_factory() as check_sess:
223 result = await check_sess.execute(
224 select(func.count()).select_from(MusehubBranch).where(
225 MusehubBranch.repo_id == repo.repo_id,
226 MusehubBranch.name == "concurrent-branch",
227 )
228 )
229 count = result.scalar()
230
231 assert count == 1, f"expected exactly 1 branch row, got {count}"
232
233
234 # ---------------------------------------------------------------------------
235 # Tier 5 — Data integrity
236 # ---------------------------------------------------------------------------
237
238 @pytest.mark.tier5
239 @pytest.mark.asyncio
240 async def test_t5_unpack_returns_correct_shape(
241 db_session: AsyncSession, repo: MusehubRepo,
242 ) -> None:
243 """Successful inline unpack returns the expected response keys (Phase 3 — no background job)."""
244 from musehub.services.musehub_wire import wire_push_unpack_mpack
245
246 result = await wire_push_unpack_mpack(
247 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
248 branch="main", head_commit_id="", commits_count=3, blobs_count=7,
249 )
250
251 assert "head" in result
252 assert "branch" in result
253 assert result["branch"] == "main"
254 assert "blobs_in_mpack" in result
255 assert "commits_in_mpack" in result
256
257
258 @pytest.mark.tier5
259 @pytest.mark.asyncio
260 async def test_t5_branch_row_created_when_head_provided(
261 db_session: AsyncSession, repo: MusehubRepo,
262 ) -> None:
263 """Branch row is inserted when head_commit_id is provided and branch doesn't exist."""
264 from musehub.services.musehub_wire import wire_push_unpack_mpack
265
266 await wire_push_unpack_mpack(
267 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
268 branch="feat/new-branch", head_commit_id=_HEAD,
269 commits_count=1, blobs_count=0,
270 )
271
272 result = await db_session.execute(
273 select(MusehubBranch).where(
274 MusehubBranch.repo_id == repo.repo_id,
275 MusehubBranch.name == "feat/new-branch",
276 )
277 )
278 branch_row = result.scalar_one_or_none()
279 assert branch_row is not None, "branch row was not created"
280 assert branch_row.head_commit_id == _HEAD
281
282
283 @pytest.mark.tier5
284 @pytest.mark.asyncio
285 async def test_t5_branch_row_updated_when_branch_exists(
286 db_session: AsyncSession, repo: MusehubRepo,
287 ) -> None:
288 """When branch already exists, unpack updates head_commit_id — no duplicate row."""
289 from musehub.services.musehub_wire import wire_push_unpack_mpack
290
291 first_head = fake_id("first-head")
292 second_head = fake_id("second-head")
293
294 await wire_push_unpack_mpack(
295 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
296 branch="main", head_commit_id=first_head,
297 commits_count=1, blobs_count=0,
298 )
299
300 # Second push to same branch with different mpack (different key)
301 second_content = b"second-push-content"
302 second_oid = blob_id(second_content)
303 second_bytes = build_wire_mpack(
304 {"blobs": [{"object_id": second_oid, "content": second_content}], "commits": [], "snapshots": []}
305 )
306 second_key = blob_id(second_bytes)
307 from unittest.mock import AsyncMock, MagicMock, patch
308 mock_backend2 = MagicMock()
309 mock_backend2.get_mpack = AsyncMock(return_value=second_bytes)
310 mock_backend2.put = AsyncMock(return_value="s3://muse-objects/objects/fake2")
311 mock_backend2.put_mpack = AsyncMock(return_value=None)
312 mock_backend2.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack2?sig=x")
313 with patch("musehub.storage.backends.get_backend", return_value=mock_backend2), \
314 patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend2), \
315 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend2):
316 await wire_push_unpack_mpack(
317 db_session, repo.repo_id, second_key, _OWNER,
318 branch="main", head_commit_id=second_head,
319 commits_count=1, blobs_count=0, force=True,
320 )
321
322 result = await db_session.execute(
323 select(func.count()).select_from(MusehubBranch).where(
324 MusehubBranch.repo_id == repo.repo_id,
325 MusehubBranch.name == "main",
326 )
327 )
328 assert result.scalar() == 1, "duplicate branch row created"
329
330 branch_result = await db_session.execute(
331 select(MusehubBranch.head_commit_id).where(
332 MusehubBranch.repo_id == repo.repo_id,
333 MusehubBranch.name == "main",
334 )
335 )
336 assert branch_result.scalar() == second_head, "branch head not updated to second push"
337
338
339 @pytest.mark.tier5
340 @pytest.mark.asyncio
341 async def test_t5_job_row_not_written_on_mpack_not_found(
342 db_session: AsyncSession, repo: MusehubRepo,
343 ) -> None:
344 """When MinIO returns None, ValueError is raised and no job row is persisted."""
345 from musehub.services.musehub_wire import wire_push_unpack_mpack
346 from unittest.mock import AsyncMock, MagicMock, patch
347
348 mock_backend_none = MagicMock()
349 mock_backend_none.get_mpack = AsyncMock(return_value=None)
350 with patch("musehub.storage.backends.get_backend", return_value=mock_backend_none), \
351 patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend_none), \
352 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend_none):
353 with pytest.raises(ValueError):
354 await wire_push_unpack_mpack(
355 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
356 branch="main", head_commit_id="", commits_count=0, blobs_count=0,
357 )
358
359 result = await db_session.execute(
360 select(func.count()).select_from(MusehubBackgroundJob).where(
361 MusehubBackgroundJob.repo_id == repo.repo_id,
362 MusehubBackgroundJob.job_type == "mpack.index",
363 )
364 )
365 assert result.scalar() == 0, "job row written despite mpack not found"
366
367
368 # ---------------------------------------------------------------------------
369 # Tier 6 — Performance
370 # ---------------------------------------------------------------------------
371
372 @pytest.mark.tier6
373 @pytest.mark.asyncio
374 async def test_t6_unpack_latency_under_100ms(db_session: AsyncSession, repo: MusehubRepo) -> None:
375 """Unpack endpoint (stub MinIO, small mpack) completes in < 100ms."""
376 async with _make_client(db_session) as client:
377 # Warm-up — exclude connection setup from gate
378 await client.post(
379 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
380 content=_body(),
381 headers={"Content-Type": "application/x-msgpack"},
382 )
383
384 t0 = time.perf_counter()
385 resp = await client.post(
386 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
387 content=_body(commits_count=3, blobs_count=10),
388 headers={"Content-Type": "application/x-msgpack"},
389 )
390 elapsed_ms = (time.perf_counter() - t0) * 1000
391
392 app.dependency_overrides.clear()
393 assert resp.status_code == 200
394 assert elapsed_ms < 100, f"unpack took {elapsed_ms:.1f}ms — gate is 100ms"
395
396
397 @pytest.mark.tier6
398 @pytest.mark.asyncio
399 async def test_t6_job_index_query_uses_index(db_session: AsyncSession, repo: MusehubRepo) -> None:
400 """EXPLAIN on the pending job query references ix_musehub_background_jobs_status_created."""
401 plan = await db_session.execute(text(
402 "EXPLAIN SELECT * FROM musehub_background_jobs "
403 "WHERE status = 'pending' "
404 "ORDER BY created_at "
405 "LIMIT 10"
406 ))
407 plan_text = "\n".join(row[0] for row in plan)
408 assert "Index" in plan_text or "index" in plan_text, (
409 f"job queue query not using an index:\n{plan_text}"
410 )
411
412
413 # ---------------------------------------------------------------------------
414 # Tier 7 — Security
415 # ---------------------------------------------------------------------------
416
417 @pytest.mark.tier7
418 @pytest.mark.asyncio
419 async def test_t7_malformed_mpack_key_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
420 """mpack_key without sha256: prefix is rejected with 422."""
421 body = msgpack.packb(
422 {"mpack_key": "not-a-real-key", "branch": "main", "commits_count": 1, "blobs_count": 0},
423 use_bin_type=True,
424 )
425 async with _make_client(db_session) as client:
426 resp = await client.post(
427 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
428 content=body,
429 headers={"Content-Type": "application/x-msgpack"},
430 )
431 app.dependency_overrides.clear()
432 assert resp.status_code == 422, f"expected 422 for malformed key, got {resp.status_code}"
433
434
435 @pytest.mark.tier7
436 @pytest.mark.asyncio
437 async def test_t7_oversized_key_field_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
438 """A pathologically long mpack_key string is rejected, not stored."""
439 body = msgpack.packb(
440 {"mpack_key": "sha256:" + "a" * 10_000, "branch": "main"},
441 use_bin_type=True,
442 )
443 async with _make_client(db_session) as client:
444 resp = await client.post(
445 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
446 content=body,
447 headers={"Content-Type": "application/x-msgpack"},
448 )
449 app.dependency_overrides.clear()
450 assert resp.status_code in (400, 422), (
451 f"expected 4xx for oversized key, got {resp.status_code}"
452 )
453
454
455 @pytest.mark.tier7
456 @pytest.mark.asyncio
457 async def test_t7_unauthenticated_request_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
458 """No auth header → 401/403 before any job enqueue logic runs."""
459 async def _override_db() -> None:
460 yield db_session
461 app.dependency_overrides[get_db] = _override_db
462 # no require_valid_token override — real enforcement
463
464 async with AsyncClient(
465 transport=ASGITransport(app=app),
466 base_url="https://localhost:1337",
467 ) as client:
468 resp = await client.post(
469 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
470 content=_body(),
471 headers={"Content-Type": "application/x-msgpack"},
472 )
473 app.dependency_overrides.clear()
474
475 assert resp.status_code in (401, 403)
476
477 # No job row must exist — auth rejected before any DB write
478 result = await db_session.execute(
479 select(func.count()).select_from(MusehubBackgroundJob).where(
480 MusehubBackgroundJob.repo_id == repo.repo_id,
481 MusehubBackgroundJob.job_type == "mpack.index",
482 )
483 )
484 assert result.scalar() == 0, "job row written for unauthenticated request"
485
486
487 @pytest.mark.tier7
488 @pytest.mark.asyncio
489 async def test_t7_commits_count_overflow_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
490 """commits_count wildly exceeding the limit is rejected with 422 — no job enqueued."""
491 settings = get_settings()
492 body = msgpack.packb(
493 {
494 "mpack_key": _MPACK_KEY,
495 "branch": "main",
496 "commits_count": settings.mpack_max_commits * 100,
497 "blobs_count": 0,
498 },
499 use_bin_type=True,
500 )
501 async with _make_client(db_session) as client:
502 resp = await client.post(
503 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
504 content=body,
505 headers={"Content-Type": "application/x-msgpack"},
506 )
507 app.dependency_overrides.clear()
508 assert resp.status_code == 422
509
510 result = await db_session.execute(
511 select(func.count()).select_from(MusehubBackgroundJob).where(
512 MusehubBackgroundJob.repo_id == repo.repo_id,
513 )
514 )
515 assert result.scalar() == 0, "job row written despite over-limit commits_count"
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 10 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 12 days ago