gabriel / musehub public
test_wire_mpack_unpack_step3_tiers4567.py python
495 lines 18.5 KB
Raw
sha256:ad616c6113d6c00f4efed6b2993734ca46d3e9b5bee25addd4ce8ae6b57136e5 chore: bump version to 0.2.0rc11; typing audit clean + all … Sonnet 4.6 minor ⚠ breaking 19 days ago
1 """Push Protocol Step 3 (unpack-mpack) — Tiers 4–7: stress, integrity, performance, security.
2
3 Tier 4 — Stress: concurrent wire_push_unpack_mpack calls all land without errors.
4 Tier 5 — Data integrity: DB invariants on MusehubBackgroundJob and MusehubBranch.
5 Tier 6 — Performance: unpack endpoint latency gate.
6 Tier 7 — Security: malformed keys, oversized keys, unauthenticated access.
7 """
8 from __future__ import annotations
9
10 import asyncio
11 import time
12
13 import msgpack
14 import pytest
15 import pytest_asyncio
16 from httpx import AsyncClient, ASGITransport
17 from sqlalchemy import select, func, text
18 from sqlalchemy.ext.asyncio import AsyncSession
19
20 from muse.core.mpack import build_wire_mpack
21 from muse.core.types import blob_id, fake_id
22 from musehub.auth.dependencies import require_valid_token
23 from musehub.auth.request_signing import MSignContext
24 from musehub.config import get_settings
25 from musehub.core.genesis import compute_identity_id
26 from musehub.db.database import get_db
27 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
28 import typing
29 from sqlalchemy.ext.asyncio import async_sessionmaker
30 from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo
31 from musehub.main import app
32 from musehub.services.musehub_repository import create_repo
33
34 _OWNER = "gabriel"
35 _IDENTITY_ID = compute_identity_id(b"gabriel")
36 _OTHER_IDENTITY_ID = compute_identity_id(b"aria")
37 _REPO_NAME = "step3-tiers-test"
38
39 _MPACK_BYTES = build_wire_mpack({"blobs": [], "commits": [], "snapshots": []})
40 _MPACK_KEY = blob_id(_MPACK_BYTES)
41 _HEAD = fake_id("step3-tiers-tip")
42
43 _AUTH_CTX = MSignContext(
44 handle=_OWNER,
45 identity_id=_IDENTITY_ID,
46 is_agent=False,
47 is_admin=False,
48 )
49
50
51 # ---------------------------------------------------------------------------
52 # Fixtures
53 # ---------------------------------------------------------------------------
54
55 @pytest_asyncio.fixture()
56 async def repo(db_session: AsyncSession) -> MusehubRepo:
57 r = await create_repo(
58 db_session,
59 name=_REPO_NAME,
60 owner=_OWNER,
61 owner_user_id=_IDENTITY_ID,
62 visibility="public",
63 initialize=False,
64 )
65 await db_session.commit()
66 return r
67
68
69 @pytest_asyncio.fixture(autouse=True)
70 async def mock_get_mpack() -> None:
71 from unittest.mock import AsyncMock, MagicMock, patch
72 mock_backend = MagicMock()
73 mock_backend.get_mpack = AsyncMock(return_value=_MPACK_BYTES)
74 with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
75 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
76 yield mock_backend
77
78
79 def _make_client(db_session: AsyncSession, auth_ctx: MSignContext = _AUTH_CTX) -> None:
80 async def _override_db() -> None:
81 yield db_session
82 app.dependency_overrides[get_db] = _override_db
83 app.dependency_overrides[require_valid_token] = lambda: auth_ctx
84 return AsyncClient(transport=ASGITransport(app=app), base_url="https://localhost:1337")
85
86
87 def _body(
88 mpack_key: str = _MPACK_KEY,
89 branch: str = "main",
90 head: str = "",
91 commits_count: int = 1,
92 blobs_count: int = 2,
93 ) -> bytes:
94 return msgpack.packb(
95 {
96 "mpack_key": mpack_key,
97 "branch": branch,
98 "head": head,
99 "commits_count": commits_count,
100 "blobs_count": blobs_count,
101 },
102 use_bin_type=True,
103 )
104
105
106 async def _unpack(client: AsyncClient, repo_name: str = _REPO_NAME, **kwargs: typing.Any) -> int:
107 resp = await client.post(
108 f"/{_OWNER}/{repo_name}/push/unpack-mpack",
109 content=_body(**kwargs),
110 headers={"Content-Type": "application/x-msgpack"},
111 )
112 return resp.status_code
113
114
115 # ---------------------------------------------------------------------------
116 # Tier 4 — Stress
117 # ---------------------------------------------------------------------------
118
119 @pytest.mark.tier4
120 @pytest.mark.asyncio
121 async def test_t4_concurrent_unpack_calls_all_succeed(
122 repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession],
123 ) -> None:
124 """20 concurrent wire_push_unpack_mpack calls with distinct mpack_keys all succeed.
125
126 Tests inline processing under concurrency — every call must complete without
127 deadlocking. Phase 3: all processing is inline; no background job rows.
128 """
129 from musehub.services.musehub_wire import wire_push_unpack_mpack
130
131 n = 20
132
133 async def _do(i: int) -> None:
134 # Each call gets a unique mpack_key so job_id is unique
135 content = f"object-{i}".encode()
136 oid = blob_id(content)
137 unique_bytes = build_wire_mpack(
138 {"blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": []}
139 )
140 unique_key = blob_id(unique_bytes)
141 async with session_factory() as sess:
142 # Override get_mpack to return the correct bytes for this key
143 from unittest.mock import AsyncMock, MagicMock, patch
144 mock_backend = MagicMock()
145 mock_backend.get_mpack = AsyncMock(return_value=unique_bytes)
146 with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
147 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
148 await wire_push_unpack_mpack(
149 sess, repo.repo_id, unique_key, _OWNER,
150 branch="main", head_commit_id="", commits_count=1, blobs_count=1,
151 )
152 await sess.commit()
153
154 # If any call raises, gather re-raises immediately — no further assertion needed.
155 await asyncio.gather(*[_do(i) for i in range(n)])
156
157
158 @pytest.mark.tier4
159 @pytest.mark.asyncio
160 async def test_t4_concurrent_branch_updates_for_same_branch(
161 repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession],
162 ) -> None:
163 """10 concurrent calls advancing the same branch serialise without data corruption.
164
165 The service uses WITH FOR UPDATE on the branch row — concurrent updates
166 must not produce two branch rows for the same name. Pre-create the branch
167 so all concurrent calls hit the UPDATE path (avoiding the INSERT race).
168 """
169 from musehub.services.musehub_wire import wire_push_unpack_mpack
170 from musehub.core.genesis import compute_branch_id
171
172 # Pre-create the branch so concurrent calls all SELECT-for-update and UPDATE
173 async with session_factory() as seed_sess:
174 seed_sess.add(MusehubBranch(
175 branch_id=compute_branch_id(repo.repo_id, "concurrent-branch"),
176 repo_id=repo.repo_id,
177 name="concurrent-branch",
178 head_commit_id=fake_id("seed-head"),
179 ))
180 await seed_sess.commit()
181
182 n = 10
183
184 async def _do(i: int) -> None:
185 content = f"seq-{i}".encode()
186 oid = blob_id(content)
187 unique_bytes = build_wire_mpack(
188 {"blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": []}
189 )
190 unique_key = blob_id(unique_bytes)
191 head = fake_id(f"commit-{i}")
192 async with session_factory() as sess:
193 from unittest.mock import AsyncMock, MagicMock, patch
194 mock_backend = MagicMock()
195 mock_backend.get_mpack = AsyncMock(return_value=unique_bytes)
196 with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
197 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
198 await wire_push_unpack_mpack(
199 sess, repo.repo_id, unique_key, _OWNER,
200 branch="concurrent-branch", head_commit_id=head,
201 commits_count=1, blobs_count=0, force=True,
202 )
203 await sess.commit()
204
205 await asyncio.gather(*[_do(i) for i in range(n)])
206
207 async with session_factory() as check_sess:
208 result = await check_sess.execute(
209 select(func.count()).select_from(MusehubBranch).where(
210 MusehubBranch.repo_id == repo.repo_id,
211 MusehubBranch.name == "concurrent-branch",
212 )
213 )
214 count = result.scalar()
215
216 assert count == 1, f"expected exactly 1 branch row, got {count}"
217
218
219 # ---------------------------------------------------------------------------
220 # Tier 5 — Data integrity
221 # ---------------------------------------------------------------------------
222
223 @pytest.mark.tier5
224 @pytest.mark.asyncio
225 async def test_t5_unpack_returns_correct_shape(
226 db_session: AsyncSession, repo: MusehubRepo,
227 ) -> None:
228 """Successful inline unpack returns the expected response keys (Phase 3 — no background job)."""
229 from musehub.services.musehub_wire import wire_push_unpack_mpack
230
231 result = await wire_push_unpack_mpack(
232 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
233 branch="main", head_commit_id="", commits_count=3, blobs_count=7,
234 )
235
236 assert "head" in result
237 assert "branch" in result
238 assert result["branch"] == "main"
239 assert "blobs_in_mpack" in result
240 assert "commits_in_mpack" in result
241
242
243 @pytest.mark.tier5
244 @pytest.mark.asyncio
245 async def test_t5_branch_row_created_when_head_provided(
246 db_session: AsyncSession, repo: MusehubRepo,
247 ) -> None:
248 """Branch row is inserted when head_commit_id is provided and branch doesn't exist."""
249 from musehub.services.musehub_wire import wire_push_unpack_mpack
250
251 await wire_push_unpack_mpack(
252 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
253 branch="feat/new-branch", head_commit_id=_HEAD,
254 commits_count=1, blobs_count=0,
255 )
256
257 result = await db_session.execute(
258 select(MusehubBranch).where(
259 MusehubBranch.repo_id == repo.repo_id,
260 MusehubBranch.name == "feat/new-branch",
261 )
262 )
263 branch_row = result.scalar_one_or_none()
264 assert branch_row is not None, "branch row was not created"
265 assert branch_row.head_commit_id == _HEAD
266
267
268 @pytest.mark.tier5
269 @pytest.mark.asyncio
270 async def test_t5_branch_row_updated_when_branch_exists(
271 db_session: AsyncSession, repo: MusehubRepo,
272 ) -> None:
273 """When branch already exists, unpack updates head_commit_id — no duplicate row."""
274 from musehub.services.musehub_wire import wire_push_unpack_mpack
275
276 first_head = fake_id("first-head")
277 second_head = fake_id("second-head")
278
279 await wire_push_unpack_mpack(
280 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
281 branch="main", head_commit_id=first_head,
282 commits_count=1, blobs_count=0,
283 )
284
285 # Second push to same branch with different mpack (different key)
286 second_content = b"second-push-content"
287 second_oid = blob_id(second_content)
288 second_bytes = build_wire_mpack(
289 {"blobs": [{"object_id": second_oid, "content": second_content}], "commits": [], "snapshots": []}
290 )
291 second_key = blob_id(second_bytes)
292 from unittest.mock import AsyncMock, MagicMock, patch
293 mock_backend = MagicMock()
294 mock_backend.get_mpack = AsyncMock(return_value=second_bytes)
295 with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
296 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
297 await wire_push_unpack_mpack(
298 db_session, repo.repo_id, second_key, _OWNER,
299 branch="main", head_commit_id=second_head,
300 commits_count=1, blobs_count=0, force=True,
301 )
302
303 result = await db_session.execute(
304 select(func.count()).select_from(MusehubBranch).where(
305 MusehubBranch.repo_id == repo.repo_id,
306 MusehubBranch.name == "main",
307 )
308 )
309 assert result.scalar() == 1, "duplicate branch row created"
310
311 branch_result = await db_session.execute(
312 select(MusehubBranch.head_commit_id).where(
313 MusehubBranch.repo_id == repo.repo_id,
314 MusehubBranch.name == "main",
315 )
316 )
317 assert branch_result.scalar() == second_head, "branch head not updated to second push"
318
319
320 @pytest.mark.tier5
321 @pytest.mark.asyncio
322 async def test_t5_job_row_not_written_on_mpack_not_found(
323 db_session: AsyncSession, repo: MusehubRepo,
324 ) -> None:
325 """When MinIO returns None, ValueError is raised and no job row is persisted."""
326 from musehub.services.musehub_wire import wire_push_unpack_mpack
327 from unittest.mock import AsyncMock, MagicMock, patch
328
329 mock_backend = MagicMock()
330 mock_backend.get_mpack = AsyncMock(return_value=None)
331 with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \
332 patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend):
333 with pytest.raises(ValueError):
334 await wire_push_unpack_mpack(
335 db_session, repo.repo_id, _MPACK_KEY, _OWNER,
336 branch="main", head_commit_id="", commits_count=0, blobs_count=0,
337 )
338
339 result = await db_session.execute(
340 select(func.count()).select_from(MusehubBackgroundJob).where(
341 MusehubBackgroundJob.repo_id == repo.repo_id,
342 MusehubBackgroundJob.job_type == "mpack.index",
343 )
344 )
345 assert result.scalar() == 0, "job row written despite mpack not found"
346
347
348 # ---------------------------------------------------------------------------
349 # Tier 6 — Performance
350 # ---------------------------------------------------------------------------
351
352 @pytest.mark.tier6
353 @pytest.mark.asyncio
354 async def test_t6_unpack_latency_under_100ms(db_session: AsyncSession, repo: MusehubRepo) -> None:
355 """Unpack endpoint (stub MinIO, small mpack) completes in < 100ms."""
356 async with _make_client(db_session) as client:
357 # Warm-up — exclude connection setup from gate
358 await client.post(
359 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
360 content=_body(),
361 headers={"Content-Type": "application/x-msgpack"},
362 )
363
364 t0 = time.perf_counter()
365 resp = await client.post(
366 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
367 content=_body(commits_count=3, blobs_count=10),
368 headers={"Content-Type": "application/x-msgpack"},
369 )
370 elapsed_ms = (time.perf_counter() - t0) * 1000
371
372 app.dependency_overrides.clear()
373 assert resp.status_code == 200
374 assert elapsed_ms < 100, f"unpack took {elapsed_ms:.1f}ms — gate is 100ms"
375
376
377 @pytest.mark.tier6
378 @pytest.mark.asyncio
379 async def test_t6_job_index_query_uses_index(db_session: AsyncSession, repo: MusehubRepo) -> None:
380 """EXPLAIN on the pending job query references ix_musehub_background_jobs_status_created."""
381 plan = await db_session.execute(text(
382 "EXPLAIN SELECT * FROM musehub_background_jobs "
383 "WHERE status = 'pending' "
384 "ORDER BY created_at "
385 "LIMIT 10"
386 ))
387 plan_text = "\n".join(row[0] for row in plan)
388 assert "Index" in plan_text or "index" in plan_text, (
389 f"job queue query not using an index:\n{plan_text}"
390 )
391
392
393 # ---------------------------------------------------------------------------
394 # Tier 7 — Security
395 # ---------------------------------------------------------------------------
396
397 @pytest.mark.tier7
398 @pytest.mark.asyncio
399 async def test_t7_malformed_mpack_key_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
400 """mpack_key without sha256: prefix is rejected with 422."""
401 body = msgpack.packb(
402 {"mpack_key": "not-a-real-key", "branch": "main", "commits_count": 1, "blobs_count": 0},
403 use_bin_type=True,
404 )
405 async with _make_client(db_session) as client:
406 resp = await client.post(
407 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
408 content=body,
409 headers={"Content-Type": "application/x-msgpack"},
410 )
411 app.dependency_overrides.clear()
412 assert resp.status_code == 422, f"expected 422 for malformed key, got {resp.status_code}"
413
414
415 @pytest.mark.tier7
416 @pytest.mark.asyncio
417 async def test_t7_oversized_key_field_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
418 """A pathologically long mpack_key string is rejected, not stored."""
419 body = msgpack.packb(
420 {"mpack_key": "sha256:" + "a" * 10_000, "branch": "main"},
421 use_bin_type=True,
422 )
423 async with _make_client(db_session) as client:
424 resp = await client.post(
425 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
426 content=body,
427 headers={"Content-Type": "application/x-msgpack"},
428 )
429 app.dependency_overrides.clear()
430 assert resp.status_code in (400, 422), (
431 f"expected 4xx for oversized key, got {resp.status_code}"
432 )
433
434
435 @pytest.mark.tier7
436 @pytest.mark.asyncio
437 async def test_t7_unauthenticated_request_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
438 """No auth header → 401/403 before any job enqueue logic runs."""
439 async def _override_db() -> None:
440 yield db_session
441 app.dependency_overrides[get_db] = _override_db
442 # no require_valid_token override — real enforcement
443
444 async with AsyncClient(
445 transport=ASGITransport(app=app),
446 base_url="https://localhost:1337",
447 ) as client:
448 resp = await client.post(
449 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
450 content=_body(),
451 headers={"Content-Type": "application/x-msgpack"},
452 )
453 app.dependency_overrides.clear()
454
455 assert resp.status_code in (401, 403)
456
457 # No job row must exist — auth rejected before any DB write
458 result = await db_session.execute(
459 select(func.count()).select_from(MusehubBackgroundJob).where(
460 MusehubBackgroundJob.repo_id == repo.repo_id,
461 MusehubBackgroundJob.job_type == "mpack.index",
462 )
463 )
464 assert result.scalar() == 0, "job row written for unauthenticated request"
465
466
467 @pytest.mark.tier7
468 @pytest.mark.asyncio
469 async def test_t7_commits_count_overflow_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None:
470 """commits_count wildly exceeding the limit is rejected with 422 — no job enqueued."""
471 settings = get_settings()
472 body = msgpack.packb(
473 {
474 "mpack_key": _MPACK_KEY,
475 "branch": "main",
476 "commits_count": settings.mpack_max_commits * 100,
477 "blobs_count": 0,
478 },
479 use_bin_type=True,
480 )
481 async with _make_client(db_session) as client:
482 resp = await client.post(
483 f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack",
484 content=body,
485 headers={"Content-Type": "application/x-msgpack"},
486 )
487 app.dependency_overrides.clear()
488 assert resp.status_code == 422
489
490 result = await db_session.execute(
491 select(func.count()).select_from(MusehubBackgroundJob).where(
492 MusehubBackgroundJob.repo_id == repo.repo_id,
493 )
494 )
495 assert result.scalar() == 0, "job row written despite over-limit commits_count"
File History 2 commits
sha256:ad616c6113d6c00f4efed6b2993734ca46d3e9b5bee25addd4ce8ae6b57136e5 chore: bump version to 0.2.0rc11; typing audit clean + all … Sonnet 4.6 minor 19 days ago
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 22 days ago