gabriel / musehub public
test_musehub_coord.py python
899 lines 32.7 KB
Raw
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 9 days ago
1 """Tests for the MuseHub coordination bus.
2
3 Covers all acceptance criteria:
4
5 Unit:
6 - CoordRecordIn validation (kind, record_id, run_id)
7 - CoordPollRequest validation (since_id, kinds, limit)
8 - CoordPushResponse and CoordPullResponse structure
9
10 Integration (service layer):
11 - coord_push: insert, idempotent skip, heartbeat upsert
12 - coord_pull: cursor, kind filter, limit
13 - Push then pull round-trip
14
15 E2E (HTTP endpoints via AsyncClient):
16 - POST /{owner}/{slug}/coord/push — 200 OK, 401 unauth, 403 wrong owner,
17 404 unknown repo, 400 bad kind, 400 bad record_id
18 - POST /{owner}/{slug}/coord/pull — 200 OK, cursor pagination
19 - GET /{owner}/{slug}/coord/watch — SSE stream response headers
20
21 Security:
22 - Path traversal in owner/slug blocked by 404
23 - record_id path traversal rejected by Pydantic (400)
24 - Unknown kind rejected (400)
25 - Private repo invisible to wrong user (404)
26 - Push requires auth (401)
27
28 Stress:
29 - Push 500 records in one batch
30 - Pull 1000 records with cursor pagination
31 - 200 push + pull round-trips with correct cursor tracking
32 """
33
34 from __future__ import annotations
35
36 import json
37 import secrets
38 from collections.abc import AsyncIterator
39 from datetime import datetime, timezone
40 from unittest.mock import patch
41
42 import pytest
43 import pytest_asyncio
44 from httpx import AsyncClient
45 from sqlalchemy.ext.asyncio import AsyncSession
46
47 from musehub.core.genesis import compute_repo_id
48 from musehub.db import coord_models as coord_db
49 from musehub.db.musehub_identity_models import MusehubIdentity
50 from musehub.db.musehub_repo_models import MusehubRepo
51 from musehub.types.json_types import JSONObject, StrDict
52 from musehub.models.coord import (
53 CoordPollRequest,
54 CoordPushRequest,
55 CoordRecordIn,
56 _VALID_KINDS,
57 )
58 from musehub.services.musehub_coord import coord_pull, coord_push
59
60
61 # ── Fixtures ───────────────────────────────────────────────────────────────────
62
63
64 def _new_id() -> str:
65 return secrets.token_hex(16)
66
67
68 def _make_record(
69 kind: str = "reservation",
70 record_id: str | None = None,
71 run_id: str = "agent-1",
72 payload: JSONObject | None = None,
73 expires_at: datetime | None = None,
74 ) -> JSONObject:
75 return {
76 "kind": kind,
77 "record_id": record_id or _new_id(),
78 "run_id": run_id,
79 "payload": payload or {"note": "test"},
80 "expires_at": expires_at,
81 }
82
83
84 @pytest_asyncio.fixture
85 async def repo(db_session: AsyncSession, test_user: MusehubIdentity) -> MusehubRepo:
86 """Create a private test repo with a unique slug to prevent cross-test conflicts."""
87 suffix = _new_id()[:8]
88 slug = f"coord-test-{suffix}"
89 r = MusehubRepo(
90 repo_id=compute_repo_id(test_user.identity_id, slug, "", datetime.now(timezone.utc).isoformat()),
91 name="coord-test",
92 owner="gabriel",
93 slug=slug,
94 visibility="private",
95 owner_user_id=test_user.identity_id,
96 )
97 db_session.add(r)
98 await db_session.commit()
99 await db_session.refresh(r)
100 return r
101
102
103 @pytest_asyncio.fixture
104 async def public_repo(db_session: AsyncSession, test_user: MusehubIdentity) -> MusehubRepo:
105 """Create a public test repo with a unique slug to prevent cross-test conflicts."""
106 suffix = _new_id()[:8]
107 slug = f"coord-public-{suffix}"
108 r = MusehubRepo(
109 repo_id=compute_repo_id(test_user.identity_id, slug, "", datetime.now(timezone.utc).isoformat()),
110 name="coord-public",
111 owner="gabriel",
112 slug=slug,
113 visibility="public",
114 owner_user_id=test_user.identity_id,
115 )
116 db_session.add(r)
117 await db_session.commit()
118 await db_session.refresh(r)
119 return r
120
121
122 # ── Unit: Pydantic model validation ───────────────────────────────────────────
123
124
125 class TestCoordRecordInValidation:
126 def test_valid_record(self) -> None:
127 rec = CoordRecordIn(
128 kind="reservation",
129 record_id=_new_id(),
130 run_id="agent-1",
131 payload={"x": 1},
132 )
133 assert rec.kind == "reservation"
134
135 def test_unknown_kind_rejected(self) -> None:
136 with pytest.raises(Exception, match="kind must be one of"):
137 CoordRecordIn(kind="unknown_kind", record_id=_new_id(), payload={})
138
139 def test_all_valid_kinds_accepted(self) -> None:
140 for kind in _VALID_KINDS:
141 rec = CoordRecordIn(kind=kind, record_id=_new_id(), payload={})
142 assert rec.kind == kind
143
144 def test_invalid_record_id_rejected(self) -> None:
145 with pytest.raises(Exception, match="record_id must be alphanumeric"):
146 CoordRecordIn(kind="reservation", record_id="has a space", payload={})
147
148 def test_path_traversal_in_record_id_rejected(self) -> None:
149 with pytest.raises(Exception):
150 CoordRecordIn(kind="reservation", record_id="../../../etc/passwd", payload={})
151
152 def test_null_byte_in_record_id_rejected(self) -> None:
153 with pytest.raises(Exception):
154 CoordRecordIn(kind="reservation", record_id="\x00" + _new_id()[1:], payload={})
155
156 def test_run_id_defaults_to_empty(self) -> None:
157 rec = CoordRecordIn(kind="reservation", record_id=_new_id(), payload={})
158 assert rec.run_id == ""
159
160 def test_run_id_max_length(self) -> None:
161 with pytest.raises(Exception):
162 CoordRecordIn(
163 kind="reservation",
164 record_id=_new_id(),
165 run_id="x" * 256,
166 payload={},
167 )
168
169 def test_expires_at_optional(self) -> None:
170 rec = CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={})
171 assert rec.expires_at is None
172
173 def test_uppercase_record_id_accepted(self) -> None:
174 upper = _new_id().upper()
175 rec = CoordRecordIn(kind="reservation", record_id=upper, payload={})
176 assert rec.record_id == upper
177
178
179 class TestCoordPollRequestValidation:
180 def test_defaults(self) -> None:
181 req = CoordPollRequest()
182 assert req.since_id == 0
183 assert req.kinds == []
184 assert req.limit == 500
185
186 def test_since_id_must_be_non_negative(self) -> None:
187 with pytest.raises(Exception):
188 CoordPollRequest(since_id=-1)
189
190 def test_limit_bounds(self) -> None:
191 with pytest.raises(Exception):
192 CoordPollRequest(limit=0)
193 with pytest.raises(Exception):
194 CoordPollRequest(limit=1001)
195
196 def test_unknown_kind_in_filter_rejected(self) -> None:
197 with pytest.raises(Exception, match="kind must be one of"):
198 CoordPollRequest(kinds=["bad_kind"])
199
200 def test_valid_kinds_filter(self) -> None:
201 req = CoordPollRequest(kinds=["reservation", "heartbeat"])
202 assert "reservation" in req.kinds
203
204
205 # ── Integration: service layer ─────────────────────────────────────────────────
206
207
208 class TestCoordPush:
209 async def test_push_inserts_records(
210 self, db_session: AsyncSession, repo: MusehubRepo
211 ) -> None:
212 req = CoordPushRequest(records=[
213 CoordRecordIn(kind="reservation", record_id=_new_id(), payload={"addr": "x"}),
214 CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={"ping": 1}),
215 ])
216 resp = await coord_push(db_session, repo.repo_id, req)
217 assert resp.inserted == 2
218 assert resp.skipped == 0
219
220 async def test_push_same_record_twice_is_skipped(
221 self, db_session: AsyncSession, repo: MusehubRepo
222 ) -> None:
223 uid = _new_id()
224 rec = CoordRecordIn(kind="reservation", record_id=uid, payload={"x": 1})
225 req = CoordPushRequest(records=[rec])
226
227 resp1 = await coord_push(db_session, repo.repo_id, req)
228 assert resp1.inserted == 1
229
230 # Re-push the identical record.
231 resp2 = await coord_push(db_session, repo.repo_id, req)
232 assert resp2.inserted == 0
233 assert resp2.skipped == 1
234
235 async def test_heartbeat_upserted(
236 self, db_session: AsyncSession, repo: MusehubRepo
237 ) -> None:
238 uid = _new_id()
239 req1 = CoordPushRequest(records=[
240 CoordRecordIn(kind="heartbeat", record_id=uid, payload={"ts": "t1"}),
241 ])
242 resp1 = await coord_push(db_session, repo.repo_id, req1)
243 assert resp1.inserted == 1
244
245 req2 = CoordPushRequest(records=[
246 CoordRecordIn(kind="heartbeat", record_id=uid, payload={"ts": "t2"}),
247 ])
248 resp2 = await coord_push(db_session, repo.repo_id, req2)
249 # Heartbeat upsert counts as skipped (same row, payload updated).
250 assert resp2.skipped == 1
251 assert resp2.inserted == 0
252
253 async def test_push_mixed_batch(
254 self, db_session: AsyncSession, repo: MusehubRepo
255 ) -> None:
256 uid_dup = _new_id()
257 req = CoordPushRequest(records=[
258 CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}),
259 CoordRecordIn(kind="intent", record_id=_new_id(), payload={}),
260 CoordRecordIn(kind="dependency", record_id=_new_id(), payload={}),
261 ])
262 resp = await coord_push(db_session, repo.repo_id, req)
263 assert resp.inserted == 3
264
265 async def test_push_does_not_cross_repos(
266 self, db_session: AsyncSession, repo: MusehubRepo, public_repo: MusehubRepo
267 ) -> None:
268 uid = _new_id()
269 req = CoordPushRequest(records=[
270 CoordRecordIn(kind="reservation", record_id=uid, payload={}),
271 ])
272 await coord_push(db_session, repo.repo_id, req)
273 # Same ID but different repo_id → should insert, not skip.
274 resp2 = await coord_push(db_session, public_repo.repo_id, req)
275 assert resp2.inserted == 1
276
277
278 class TestCoordPull:
279 async def test_pull_returns_inserted_records(
280 self, db_session: AsyncSession, repo: MusehubRepo
281 ) -> None:
282 uid1, uid2 = _new_id(), _new_id()
283 push_req = CoordPushRequest(records=[
284 CoordRecordIn(kind="reservation", record_id=uid1, payload={"a": 1}),
285 CoordRecordIn(kind="heartbeat", record_id=uid2, payload={"b": 2}),
286 ])
287 await coord_push(db_session, repo.repo_id, push_req)
288
289 poll_req = CoordPollRequest()
290 resp = await coord_pull(db_session, repo.repo_id, poll_req)
291 record_ids = {r.record_id for r in resp.records}
292 assert uid1 in record_ids
293 assert uid2 in record_ids
294
295 async def test_pull_cursor_advances(
296 self, db_session: AsyncSession, repo: MusehubRepo
297 ) -> None:
298 push_req = CoordPushRequest(records=[
299 CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}),
300 ])
301 await coord_push(db_session, repo.repo_id, push_req)
302 resp1 = await coord_pull(db_session, repo.repo_id, CoordPollRequest())
303 cursor = resp1.cursor
304
305 # Push a second record.
306 push_req2 = CoordPushRequest(records=[
307 CoordRecordIn(kind="intent", record_id=_new_id(), payload={}),
308 ])
309 await coord_push(db_session, repo.repo_id, push_req2)
310
311 # Pull since cursor — should only return the second record.
312 resp2 = await coord_pull(
313 db_session, repo.repo_id, CoordPollRequest(since_id=cursor)
314 )
315 assert len(resp2.records) == 1
316 assert resp2.records[0].kind == "intent"
317
318 async def test_pull_empty_when_nothing_pushed(
319 self, db_session: AsyncSession, repo: MusehubRepo
320 ) -> None:
321 resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest())
322 assert resp.records == []
323 assert resp.cursor == 0
324
325 async def test_pull_kind_filter(
326 self, db_session: AsyncSession, repo: MusehubRepo
327 ) -> None:
328 push_req = CoordPushRequest(records=[
329 CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}),
330 CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={}),
331 CoordRecordIn(kind="intent", record_id=_new_id(), payload={}),
332 ])
333 await coord_push(db_session, repo.repo_id, push_req)
334
335 resp = await coord_pull(
336 db_session, repo.repo_id, CoordPollRequest(kinds=["reservation"])
337 )
338 assert all(r.kind == "reservation" for r in resp.records)
339 assert len(resp.records) == 1
340
341 async def test_pull_limit(
342 self, db_session: AsyncSession, repo: MusehubRepo
343 ) -> None:
344 push_req = CoordPushRequest(records=[
345 CoordRecordIn(kind="reservation", record_id=_new_id(), payload={})
346 for _ in range(10)
347 ])
348 await coord_push(db_session, repo.repo_id, push_req)
349
350 resp = await coord_pull(
351 db_session, repo.repo_id, CoordPollRequest(limit=3)
352 )
353 assert len(resp.records) == 3
354
355 async def test_pull_returns_oldest_first(
356 self, db_session: AsyncSession, repo: MusehubRepo
357 ) -> None:
358 uids = [_new_id() for _ in range(5)]
359 push_req = CoordPushRequest(records=[
360 CoordRecordIn(kind="reservation", record_id=uid, payload={})
361 for uid in uids
362 ])
363 await coord_push(db_session, repo.repo_id, push_req)
364
365 resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest())
366 ids = [r.id for r in resp.records]
367 assert ids == sorted(ids) # oldest first = ascending IDs
368
369
370 # ── E2E: HTTP endpoints ────────────────────────────────────────────────────────
371
372
373 class TestPushEndpoint:
374 async def test_push_success(
375 self,
376 client: AsyncClient,
377 auth_headers: StrDict,
378 repo: MusehubRepo,
379 ) -> None:
380 resp = await client.post(
381 f"/{repo.owner}/{repo.slug}/coord/push",
382 json={"records": [_make_record()]},
383 headers=auth_headers,
384 )
385 assert resp.status_code == 200
386 body = resp.json()
387 assert body["inserted"] == 1
388 assert body["skipped"] == 0
389
390 async def test_push_requires_auth(
391 self, client: AsyncClient, repo: MusehubRepo
392 ) -> None:
393 resp = await client.post(
394 f"/{repo.owner}/{repo.slug}/coord/push",
395 json={"records": [_make_record()]},
396 )
397 assert resp.status_code == 401
398
399 async def test_push_unknown_repo_returns_404(
400 self, client: AsyncClient, auth_headers: StrDict
401 ) -> None:
402 resp = await client.post(
403 "/gabriel/no-such-repo/coord/push",
404 json={"records": [_make_record()]},
405 headers=auth_headers,
406 )
407 assert resp.status_code == 404
408
409 async def test_push_bad_kind_returns_400(
410 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
411 ) -> None:
412 resp = await client.post(
413 f"/{repo.owner}/{repo.slug}/coord/push",
414 json={"records": [_make_record(kind="bad_kind")]},
415 headers=auth_headers,
416 )
417 assert resp.status_code == 422 # Pydantic validation error
418
419 async def test_push_bad_record_id_returns_422(
420 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
421 ) -> None:
422 resp = await client.post(
423 f"/{repo.owner}/{repo.slug}/coord/push",
424 json={"records": [{
425 "kind": "reservation",
426 "record_id": "has spaces and slashes/bad",
427 "run_id": "x",
428 "payload": {},
429 }]},
430 headers=auth_headers,
431 )
432 assert resp.status_code == 422
433
434 async def test_push_idempotent(
435 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
436 ) -> None:
437 rec = _make_record()
438 payload = {"records": [rec]}
439
440 r1 = await client.post(f"/{repo.owner}/{repo.slug}/coord/push", json=payload, headers=auth_headers)
441 assert r1.status_code == 200
442 assert r1.json()["inserted"] == 1
443
444 r2 = await client.post(f"/{repo.owner}/{repo.slug}/coord/push", json=payload, headers=auth_headers)
445 assert r2.status_code == 200
446 assert r2.json()["skipped"] == 1
447 assert r2.json()["inserted"] == 0
448
449 async def test_push_empty_records_rejected(
450 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
451 ) -> None:
452 resp = await client.post(
453 f"/{repo.owner}/{repo.slug}/coord/push",
454 json={"records": []},
455 headers=auth_headers,
456 )
457 assert resp.status_code == 422
458
459 async def test_push_multiple_kinds(
460 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
461 ) -> None:
462 records = [_make_record(kind=k) for k in ("reservation", "heartbeat", "intent")]
463 resp = await client.post(
464 f"/{repo.owner}/{repo.slug}/coord/push",
465 json={"records": records},
466 headers=auth_headers,
467 )
468 assert resp.status_code == 200
469 assert resp.json()["inserted"] == 3
470
471
472 class TestPullEndpoint:
473 async def test_pull_empty_initially(
474 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
475 ) -> None:
476 resp = await client.post(
477 f"/{repo.owner}/{repo.slug}/coord/pull",
478 json={},
479 headers=auth_headers,
480 )
481 assert resp.status_code == 200
482 body = resp.json()
483 assert body["records"] == []
484 assert body["cursor"] == 0
485
486 async def test_pull_after_push(
487 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
488 ) -> None:
489 rec = _make_record()
490 push_resp = await client.post(
491 f"/{repo.owner}/{repo.slug}/coord/push",
492 json={"records": [rec]},
493 headers=auth_headers,
494 )
495 assert push_resp.status_code == 200
496
497 pull_resp = await client.post(
498 f"/{repo.owner}/{repo.slug}/coord/pull",
499 json={},
500 headers=auth_headers,
501 )
502 assert pull_resp.status_code == 200
503 body = pull_resp.json()
504 assert len(body["records"]) == 1
505 assert body["records"][0]["record_id"] == rec["record_id"]
506 assert body["cursor"] > 0
507
508 async def test_pull_cursor_pagination(
509 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
510 ) -> None:
511 # Push 5 records.
512 for _ in range(5):
513 await client.post(
514 f"/{repo.owner}/{repo.slug}/coord/push",
515 json={"records": [_make_record()]},
516 headers=auth_headers,
517 )
518
519 # Pull 2 at a time.
520 resp1 = await client.post(
521 f"/{repo.owner}/{repo.slug}/coord/pull",
522 json={"limit": 2},
523 headers=auth_headers,
524 )
525 assert len(resp1.json()["records"]) == 2
526 cursor1 = resp1.json()["cursor"]
527
528 resp2 = await client.post(
529 f"/{repo.owner}/{repo.slug}/coord/pull",
530 json={"since_id": cursor1, "limit": 2},
531 headers=auth_headers,
532 )
533 assert len(resp2.json()["records"]) == 2
534 cursor2 = resp2.json()["cursor"]
535
536 resp3 = await client.post(
537 f"/{repo.owner}/{repo.slug}/coord/pull",
538 json={"since_id": cursor2, "limit": 2},
539 headers=auth_headers,
540 )
541 assert len(resp3.json()["records"]) == 1 # last one
542
543 async def test_pull_kind_filter_via_http(
544 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
545 ) -> None:
546 records = [_make_record(kind="reservation"), _make_record(kind="heartbeat")]
547 await client.post(
548 f"/{repo.owner}/{repo.slug}/coord/push",
549 json={"records": records},
550 headers=auth_headers,
551 )
552 resp = await client.post(
553 f"/{repo.owner}/{repo.slug}/coord/pull",
554 json={"kinds": ["heartbeat"]},
555 headers=auth_headers,
556 )
557 body = resp.json()
558 assert all(r["kind"] == "heartbeat" for r in body["records"])
559
560 async def test_pull_private_repo_requires_auth(
561 self, client: AsyncClient, repo: MusehubRepo
562 ) -> None:
563 resp = await client.post(
564 f"/{repo.owner}/{repo.slug}/coord/pull",
565 json={},
566 )
567 assert resp.status_code == 404 # private repo → 404 not 401
568
569 async def test_pull_public_repo_no_auth_required(
570 self, client: AsyncClient, public_repo: MusehubRepo
571 ) -> None:
572 resp = await client.post(
573 f"/{public_repo.owner}/{public_repo.slug}/coord/pull",
574 json={},
575 )
576 assert resp.status_code == 200
577
578
579 class TestWatchEndpoint:
580 """Watch endpoint tests.
581
582 The SSE stream is infinite by design (it polls forever). All tests that
583 hit the streaming path mock ``coord_watch_stream`` with a finite generator
584 so the test completes without blocking. Tests that exercise pre-stream
585 guard logic (auth, repo resolution, kind validation) send a regular GET
586 request and assert the HTTP status code — those code paths return before
587 the stream generator is entered.
588 """
589
590 @staticmethod
591 async def _one_heartbeat(*args: str, **kwargs: str) -> AsyncIterator[str]:
592 """Finite mock stream — yields one heartbeat then stops."""
593 yield ": heartbeat\n\n"
594
595 async def test_watch_returns_sse_content_type(
596 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
597 ) -> None:
598 with patch(
599 "musehub.api.routes.coord.coord_watch_stream",
600 side_effect=self._one_heartbeat,
601 ):
602 resp = await client.get(
603 f"/{repo.owner}/{repo.slug}/coord/watch",
604 headers=auth_headers,
605 )
606 assert resp.status_code == 200
607 assert "text/event-stream" in resp.headers["content-type"]
608
609 async def test_watch_no_cache_header(
610 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
611 ) -> None:
612 with patch(
613 "musehub.api.routes.coord.coord_watch_stream",
614 side_effect=self._one_heartbeat,
615 ):
616 resp = await client.get(
617 f"/{repo.owner}/{repo.slug}/coord/watch",
618 headers=auth_headers,
619 )
620 assert resp.headers.get("cache-control") == "no-cache"
621
622 async def test_watch_yields_heartbeat_event(
623 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
624 ) -> None:
625 with patch(
626 "musehub.api.routes.coord.coord_watch_stream",
627 side_effect=self._one_heartbeat,
628 ):
629 resp = await client.get(
630 f"/{repo.owner}/{repo.slug}/coord/watch",
631 headers=auth_headers,
632 )
633 assert ": heartbeat" in resp.text
634
635 async def test_watch_yields_coord_record_event(
636 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
637 ) -> None:
638 uid = _new_id()
639
640 async def _one_record(*args: str, **kwargs: str) -> AsyncIterator[str]:
641 yield f'id: 1\nevent: coord_record\ndata: {{"id":1,"kind":"reservation","record_id":"{uid}"}}\n\n'
642
643 with patch(
644 "musehub.api.routes.coord.coord_watch_stream",
645 side_effect=_one_record,
646 ):
647 resp = await client.get(
648 f"/{repo.owner}/{repo.slug}/coord/watch",
649 headers=auth_headers,
650 )
651 assert "coord_record" in resp.text
652 assert uid in resp.text
653
654 async def test_watch_private_repo_no_auth_returns_404(
655 self, client: AsyncClient, repo: MusehubRepo
656 ) -> None:
657 # No auth → private repo is invisible (404 before stream starts).
658 resp = await client.get(f"/{repo.owner}/{repo.slug}/coord/watch")
659 assert resp.status_code == 404
660
661 async def test_watch_invalid_kind_param_returns_400(
662 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
663 ) -> None:
664 # Bad kind → 400 before stream starts.
665 resp = await client.get(
666 f"/{repo.owner}/{repo.slug}/coord/watch?kinds=bad_kind",
667 headers=auth_headers,
668 )
669 assert resp.status_code == 400
670
671 async def test_watch_unknown_repo_returns_404(
672 self, client: AsyncClient, auth_headers: StrDict
673 ) -> None:
674 resp = await client.get(
675 "/gabriel/no-such-repo/coord/watch",
676 headers=auth_headers,
677 )
678 assert resp.status_code == 404
679
680 async def test_watch_since_id_param_passed_to_stream(
681 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
682 ) -> None:
683 """since_id query param is forwarded to coord_watch_stream."""
684 captured = {}
685
686 async def _capture(repo_id: str, since_id: int | None, kinds: list[str] | None, get_session: type) -> AsyncIterator[str]:
687 captured["since_id"] = since_id
688 yield ": heartbeat\n\n"
689
690 with patch(
691 "musehub.api.routes.coord.coord_watch_stream",
692 side_effect=_capture,
693 ):
694 await client.get(
695 f"/{repo.owner}/{repo.slug}/coord/watch?since_id=99",
696 headers=auth_headers,
697 )
698 assert captured.get("since_id") == 99
699
700 async def test_watch_kind_filter_param_passed_to_stream(
701 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
702 ) -> None:
703 captured = {}
704
705 async def _capture(repo_id: str, since_id: int | None, kinds: list[str] | None, get_session: type) -> AsyncIterator[str]:
706 captured["kinds"] = kinds
707 yield ": heartbeat\n\n"
708
709 with patch(
710 "musehub.api.routes.coord.coord_watch_stream",
711 side_effect=_capture,
712 ):
713 await client.get(
714 f"/{repo.owner}/{repo.slug}/coord/watch?kinds=reservation&kinds=heartbeat",
715 headers=auth_headers,
716 )
717 assert set(captured.get("kinds", [])) == {"reservation", "heartbeat"}
718
719
720 # ── Security tests ─────────────────────────────────────────────────────────────
721
722
723 class TestCoordSecurity:
724 async def test_path_traversal_in_owner_blocked(
725 self, client: AsyncClient, auth_headers: StrDict
726 ) -> None:
727 resp = await client.post(
728 "/../../../etc/passwd/coord-test/coord/push",
729 json={"records": [_make_record()]},
730 headers=auth_headers,
731 )
732 # FastAPI/Starlette normalizes the path, resulting in 404 or 400.
733 assert resp.status_code in (400, 404, 422)
734
735 async def test_path_traversal_in_record_id_rejected(
736 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
737 ) -> None:
738 resp = await client.post(
739 f"/{repo.owner}/{repo.slug}/coord/push",
740 json={"records": [{
741 "kind": "reservation",
742 "record_id": "../../etc/passwd",
743 "payload": {},
744 }]},
745 headers=auth_headers,
746 )
747 assert resp.status_code == 422
748
749 async def test_null_byte_in_record_id_rejected(
750 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
751 ) -> None:
752 resp = await client.post(
753 f"/{repo.owner}/{repo.slug}/coord/push",
754 json={"records": [{
755 "kind": "reservation",
756 "record_id": "\x00" + _new_id()[1:],
757 "payload": {},
758 }]},
759 headers=auth_headers,
760 )
761 assert resp.status_code == 422
762
763 async def test_oversized_batch_rejected(
764 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
765 ) -> None:
766 records = [_make_record() for _ in range(501)]
767 resp = await client.post(
768 f"/{repo.owner}/{repo.slug}/coord/push",
769 json={"records": records},
770 headers=auth_headers,
771 )
772 assert resp.status_code == 422
773
774 async def test_different_user_cannot_push_to_private_repo(
775 self, client: AsyncClient, db_session: AsyncSession, repo: MusehubRepo
776 ) -> None:
777 from musehub.db.musehub_identity_models import MusehubIdentity
778 from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request
779 from musehub.main import app as _app
780
781 other_id = secrets.token_hex(16)
782 other_identity = MusehubIdentity(identity_id=other_id, handle="othercoorduser", identity_type="human")
783 db_session.add(other_identity)
784 await db_session.commit()
785 _other_ctx = MSignContext(handle="othercoorduser", identity_id=other_id, is_agent=False, is_admin=False)
786 _app.dependency_overrides[require_signed_request] = lambda: _other_ctx
787 _app.dependency_overrides[optional_signed_request] = lambda: _other_ctx
788
789 resp = await client.post(
790 f"/{repo.owner}/{repo.slug}/coord/push",
791 json={"records": [_make_record()]},
792 )
793 # Repo is invisible to other user (404) or forbidden (403).
794 assert resp.status_code in (403, 404)
795
796 async def test_unknown_kind_in_pull_filter_rejected(
797 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
798 ) -> None:
799 resp = await client.post(
800 f"/{repo.owner}/{repo.slug}/coord/pull",
801 json={"kinds": ["injection_kind']); DROP TABLE--"]},
802 headers=auth_headers,
803 )
804 assert resp.status_code == 422
805
806 async def test_negative_since_id_rejected(
807 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
808 ) -> None:
809 resp = await client.post(
810 f"/{repo.owner}/{repo.slug}/coord/pull",
811 json={"since_id": -1},
812 headers=auth_headers,
813 )
814 assert resp.status_code == 422
815
816
817 # ── Stress tests ───────────────────────────────────────────────────────────────
818
819
820 class TestCoordStress:
821 async def test_push_500_records_single_batch(
822 self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo
823 ) -> None:
824 records = [_make_record() for _ in range(500)]
825 resp = await client.post(
826 f"/{repo.owner}/{repo.slug}/coord/push",
827 json={"records": records},
828 headers=auth_headers,
829 )
830 assert resp.status_code == 200
831 body = resp.json()
832 assert body["inserted"] == 500
833 assert body["skipped"] == 0
834
835 async def test_cursor_pagination_full_1000_records(
836 self, db_session: AsyncSession, repo: MusehubRepo
837 ) -> None:
838 """Push 1000 records in two batches and paginate through all with cursor."""
839 inserted_total = 0
840 for _ in range(2): # two batches of 500
841 records = [
842 CoordRecordIn(kind="reservation", record_id=_new_id(), payload={})
843 for _ in range(500)
844 ]
845 push_req = CoordPushRequest(records=records)
846 resp = await coord_push(db_session, repo.repo_id, push_req)
847 inserted_total += resp.inserted
848 assert inserted_total == 1000
849
850 # Paginate with limit=100.
851 cursor = 0
852 total_pulled = 0
853 pages = 0
854 while True:
855 pull_resp = await coord_pull(
856 db_session, repo.repo_id,
857 CoordPollRequest(since_id=cursor, limit=100)
858 )
859 if not pull_resp.records:
860 break
861 total_pulled += len(pull_resp.records)
862 cursor = pull_resp.cursor
863 pages += 1
864
865 assert total_pulled == 1000
866 assert pages == 10
867
868 async def test_all_kinds_push_and_pull(
869 self, db_session: AsyncSession, repo: MusehubRepo
870 ) -> None:
871 """Push one record per kind, pull all, assert each kind present."""
872 records = [
873 CoordRecordIn(kind=k, record_id=_new_id(), payload={"kind": k})
874 for k in sorted(_VALID_KINDS)
875 ]
876 push_req = CoordPushRequest(records=records)
877 push_resp = await coord_push(db_session, repo.repo_id, push_req)
878 assert push_resp.inserted == len(_VALID_KINDS)
879
880 pull_resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest())
881 pulled_kinds = {r.kind for r in pull_resp.records}
882 assert pulled_kinds == _VALID_KINDS
883
884 async def test_idempotent_push_500_records_twice(
885 self, db_session: AsyncSession, repo: MusehubRepo
886 ) -> None:
887 """Pushing the same 500 records twice: first all inserted, then all skipped."""
888 records = [
889 CoordRecordIn(kind="reservation", record_id=_new_id(), payload={})
890 for _ in range(500)
891 ]
892 req = CoordPushRequest(records=records)
893
894 resp1 = await coord_push(db_session, repo.repo_id, req)
895 assert resp1.inserted == 500
896
897 resp2 = await coord_push(db_session, repo.repo_id, req)
898 assert resp2.inserted == 0
899 assert resp2.skipped == 500
File History 1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 9 days ago