"""Tests for the MuseHub coordination bus. Covers all acceptance criteria: Unit: - CoordRecordIn validation (kind, record_id, run_id) - CoordPollRequest validation (since_id, kinds, limit) - CoordPushResponse and CoordPullResponse structure Integration (service layer): - coord_push: insert, idempotent skip, heartbeat upsert - coord_pull: cursor, kind filter, limit - Push then pull round-trip E2E (HTTP endpoints via AsyncClient): - POST /{owner}/{slug}/coord/push — 200 OK, 401 unauth, 403 wrong owner, 404 unknown repo, 400 bad kind, 400 bad record_id - POST /{owner}/{slug}/coord/pull — 200 OK, cursor pagination - GET /{owner}/{slug}/coord/watch — SSE stream response headers Security: - Path traversal in owner/slug blocked by 404 - record_id path traversal rejected by Pydantic (400) - Unknown kind rejected (400) - Private repo invisible to wrong user (404) - Push requires auth (401) Stress: - Push 500 records in one batch - Pull 1000 records with cursor pagination - 200 push + pull round-trips with correct cursor tracking """ from __future__ import annotations import json import secrets from collections.abc import AsyncIterator from datetime import datetime, timezone from unittest.mock import patch import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_repo_id from musehub.db import coord_models as coord_db from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubRepo from musehub.types.json_types import JSONObject, StrDict from musehub.models.coord import ( CoordPollRequest, CoordPushRequest, CoordRecordIn, _VALID_KINDS, ) from musehub.services.musehub_coord import coord_pull, coord_push # ── Fixtures ─────────────────────────────────────────────────────────────────── def _new_id() -> str: return secrets.token_hex(16) def _make_record( kind: str = "reservation", record_id: str | None = None, run_id: str = "agent-1", payload: JSONObject | None = None, expires_at: datetime | None = None, ) -> JSONObject: return { "kind": kind, "record_id": record_id or _new_id(), "run_id": run_id, "payload": payload or {"note": "test"}, "expires_at": expires_at, } @pytest_asyncio.fixture async def repo(db_session: AsyncSession, test_user: MusehubIdentity) -> MusehubRepo: """Create a private test repo with a unique slug to prevent cross-test conflicts.""" suffix = _new_id()[:8] slug = f"coord-test-{suffix}" r = MusehubRepo( repo_id=compute_repo_id(test_user.identity_id, slug, "", datetime.now(timezone.utc).isoformat()), name="coord-test", owner="gabriel", slug=slug, visibility="private", owner_user_id=test_user.identity_id, ) db_session.add(r) await db_session.commit() await db_session.refresh(r) return r @pytest_asyncio.fixture async def public_repo(db_session: AsyncSession, test_user: MusehubIdentity) -> MusehubRepo: """Create a public test repo with a unique slug to prevent cross-test conflicts.""" suffix = _new_id()[:8] slug = f"coord-public-{suffix}" r = MusehubRepo( repo_id=compute_repo_id(test_user.identity_id, slug, "", datetime.now(timezone.utc).isoformat()), name="coord-public", owner="gabriel", slug=slug, visibility="public", owner_user_id=test_user.identity_id, ) db_session.add(r) await db_session.commit() await db_session.refresh(r) return r # ── Unit: Pydantic model validation ─────────────────────────────────────────── class TestCoordRecordInValidation: def test_valid_record(self) -> None: rec = CoordRecordIn( kind="reservation", record_id=_new_id(), run_id="agent-1", payload={"x": 1}, ) assert rec.kind == "reservation" def test_unknown_kind_rejected(self) -> None: with pytest.raises(Exception, match="kind must be one of"): CoordRecordIn(kind="unknown_kind", record_id=_new_id(), payload={}) def test_all_valid_kinds_accepted(self) -> None: for kind in _VALID_KINDS: rec = CoordRecordIn(kind=kind, record_id=_new_id(), payload={}) assert rec.kind == kind def test_invalid_record_id_rejected(self) -> None: with pytest.raises(Exception, match="record_id must be alphanumeric"): CoordRecordIn(kind="reservation", record_id="has a space", payload={}) def test_path_traversal_in_record_id_rejected(self) -> None: with pytest.raises(Exception): CoordRecordIn(kind="reservation", record_id="../../../etc/passwd", payload={}) def test_null_byte_in_record_id_rejected(self) -> None: with pytest.raises(Exception): CoordRecordIn(kind="reservation", record_id="\x00" + _new_id()[1:], payload={}) def test_run_id_defaults_to_empty(self) -> None: rec = CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) assert rec.run_id == "" def test_run_id_max_length(self) -> None: with pytest.raises(Exception): CoordRecordIn( kind="reservation", record_id=_new_id(), run_id="x" * 256, payload={}, ) def test_expires_at_optional(self) -> None: rec = CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={}) assert rec.expires_at is None def test_uppercase_record_id_accepted(self) -> None: upper = _new_id().upper() rec = CoordRecordIn(kind="reservation", record_id=upper, payload={}) assert rec.record_id == upper class TestCoordPollRequestValidation: def test_defaults(self) -> None: req = CoordPollRequest() assert req.since_id == 0 assert req.kinds == [] assert req.limit == 500 def test_since_id_must_be_non_negative(self) -> None: with pytest.raises(Exception): CoordPollRequest(since_id=-1) def test_limit_bounds(self) -> None: with pytest.raises(Exception): CoordPollRequest(limit=0) with pytest.raises(Exception): CoordPollRequest(limit=1001) def test_unknown_kind_in_filter_rejected(self) -> None: with pytest.raises(Exception, match="kind must be one of"): CoordPollRequest(kinds=["bad_kind"]) def test_valid_kinds_filter(self) -> None: req = CoordPollRequest(kinds=["reservation", "heartbeat"]) assert "reservation" in req.kinds # ── Integration: service layer ───────────────────────────────────────────────── class TestCoordPush: async def test_push_inserts_records( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=_new_id(), payload={"addr": "x"}), CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={"ping": 1}), ]) resp = await coord_push(db_session, repo.repo_id, req) assert resp.inserted == 2 assert resp.skipped == 0 async def test_push_same_record_twice_is_skipped( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: uid = _new_id() rec = CoordRecordIn(kind="reservation", record_id=uid, payload={"x": 1}) req = CoordPushRequest(records=[rec]) resp1 = await coord_push(db_session, repo.repo_id, req) assert resp1.inserted == 1 # Re-push the identical record. resp2 = await coord_push(db_session, repo.repo_id, req) assert resp2.inserted == 0 assert resp2.skipped == 1 async def test_heartbeat_upserted( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: uid = _new_id() req1 = CoordPushRequest(records=[ CoordRecordIn(kind="heartbeat", record_id=uid, payload={"ts": "t1"}), ]) resp1 = await coord_push(db_session, repo.repo_id, req1) assert resp1.inserted == 1 req2 = CoordPushRequest(records=[ CoordRecordIn(kind="heartbeat", record_id=uid, payload={"ts": "t2"}), ]) resp2 = await coord_push(db_session, repo.repo_id, req2) # Heartbeat upsert counts as skipped (same row, payload updated). assert resp2.skipped == 1 assert resp2.inserted == 0 async def test_push_mixed_batch( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: uid_dup = _new_id() req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}), CoordRecordIn(kind="intent", record_id=_new_id(), payload={}), CoordRecordIn(kind="dependency", record_id=_new_id(), payload={}), ]) resp = await coord_push(db_session, repo.repo_id, req) assert resp.inserted == 3 async def test_push_does_not_cross_repos( self, db_session: AsyncSession, repo: MusehubRepo, public_repo: MusehubRepo ) -> None: uid = _new_id() req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=uid, payload={}), ]) await coord_push(db_session, repo.repo_id, req) # Same ID but different repo_id → should insert, not skip. resp2 = await coord_push(db_session, public_repo.repo_id, req) assert resp2.inserted == 1 class TestCoordPull: async def test_pull_returns_inserted_records( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: uid1, uid2 = _new_id(), _new_id() push_req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=uid1, payload={"a": 1}), CoordRecordIn(kind="heartbeat", record_id=uid2, payload={"b": 2}), ]) await coord_push(db_session, repo.repo_id, push_req) poll_req = CoordPollRequest() resp = await coord_pull(db_session, repo.repo_id, poll_req) record_ids = {r.record_id for r in resp.records} assert uid1 in record_ids assert uid2 in record_ids async def test_pull_cursor_advances( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: push_req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}), ]) await coord_push(db_session, repo.repo_id, push_req) resp1 = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) cursor = resp1.cursor # Push a second record. push_req2 = CoordPushRequest(records=[ CoordRecordIn(kind="intent", record_id=_new_id(), payload={}), ]) await coord_push(db_session, repo.repo_id, push_req2) # Pull since cursor — should only return the second record. resp2 = await coord_pull( db_session, repo.repo_id, CoordPollRequest(since_id=cursor) ) assert len(resp2.records) == 1 assert resp2.records[0].kind == "intent" async def test_pull_empty_when_nothing_pushed( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) assert resp.records == [] assert resp.cursor == 0 async def test_pull_kind_filter( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: push_req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}), CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={}), CoordRecordIn(kind="intent", record_id=_new_id(), payload={}), ]) await coord_push(db_session, repo.repo_id, push_req) resp = await coord_pull( db_session, repo.repo_id, CoordPollRequest(kinds=["reservation"]) ) assert all(r.kind == "reservation" for r in resp.records) assert len(resp.records) == 1 async def test_pull_limit( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: push_req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) for _ in range(10) ]) await coord_push(db_session, repo.repo_id, push_req) resp = await coord_pull( db_session, repo.repo_id, CoordPollRequest(limit=3) ) assert len(resp.records) == 3 async def test_pull_returns_oldest_first( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: uids = [_new_id() for _ in range(5)] push_req = CoordPushRequest(records=[ CoordRecordIn(kind="reservation", record_id=uid, payload={}) for uid in uids ]) await coord_push(db_session, repo.repo_id, push_req) resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) ids = [r.id for r in resp.records] assert ids == sorted(ids) # oldest first = ascending IDs # ── E2E: HTTP endpoints ──────────────────────────────────────────────────────── class TestPushEndpoint: async def test_push_success( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo, ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [_make_record()]}, headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["inserted"] == 1 assert body["skipped"] == 0 async def test_push_requires_auth( self, client: AsyncClient, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [_make_record()]}, ) assert resp.status_code == 401 async def test_push_unknown_repo_returns_404( self, client: AsyncClient, auth_headers: StrDict ) -> None: resp = await client.post( "/gabriel/no-such-repo/coord/push", json={"records": [_make_record()]}, headers=auth_headers, ) assert resp.status_code == 404 async def test_push_bad_kind_returns_400( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [_make_record(kind="bad_kind")]}, headers=auth_headers, ) assert resp.status_code == 422 # Pydantic validation error async def test_push_bad_record_id_returns_422( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [{ "kind": "reservation", "record_id": "has spaces and slashes/bad", "run_id": "x", "payload": {}, }]}, headers=auth_headers, ) assert resp.status_code == 422 async def test_push_idempotent( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: rec = _make_record() payload = {"records": [rec]} r1 = await client.post(f"/{repo.owner}/{repo.slug}/coord/push", json=payload, headers=auth_headers) assert r1.status_code == 200 assert r1.json()["inserted"] == 1 r2 = await client.post(f"/{repo.owner}/{repo.slug}/coord/push", json=payload, headers=auth_headers) assert r2.status_code == 200 assert r2.json()["skipped"] == 1 assert r2.json()["inserted"] == 0 async def test_push_empty_records_rejected( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": []}, headers=auth_headers, ) assert resp.status_code == 422 async def test_push_multiple_kinds( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: records = [_make_record(kind=k) for k in ("reservation", "heartbeat", "intent")] resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": records}, headers=auth_headers, ) assert resp.status_code == 200 assert resp.json()["inserted"] == 3 class TestPullEndpoint: async def test_pull_empty_initially( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={}, headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["records"] == [] assert body["cursor"] == 0 async def test_pull_after_push( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: rec = _make_record() push_resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [rec]}, headers=auth_headers, ) assert push_resp.status_code == 200 pull_resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={}, headers=auth_headers, ) assert pull_resp.status_code == 200 body = pull_resp.json() assert len(body["records"]) == 1 assert body["records"][0]["record_id"] == rec["record_id"] assert body["cursor"] > 0 async def test_pull_cursor_pagination( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: # Push 5 records. for _ in range(5): await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [_make_record()]}, headers=auth_headers, ) # Pull 2 at a time. resp1 = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={"limit": 2}, headers=auth_headers, ) assert len(resp1.json()["records"]) == 2 cursor1 = resp1.json()["cursor"] resp2 = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={"since_id": cursor1, "limit": 2}, headers=auth_headers, ) assert len(resp2.json()["records"]) == 2 cursor2 = resp2.json()["cursor"] resp3 = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={"since_id": cursor2, "limit": 2}, headers=auth_headers, ) assert len(resp3.json()["records"]) == 1 # last one async def test_pull_kind_filter_via_http( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: records = [_make_record(kind="reservation"), _make_record(kind="heartbeat")] await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": records}, headers=auth_headers, ) resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={"kinds": ["heartbeat"]}, headers=auth_headers, ) body = resp.json() assert all(r["kind"] == "heartbeat" for r in body["records"]) async def test_pull_private_repo_requires_auth( self, client: AsyncClient, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={}, ) assert resp.status_code == 404 # private repo → 404 not 401 async def test_pull_public_repo_no_auth_required( self, client: AsyncClient, public_repo: MusehubRepo ) -> None: resp = await client.post( f"/{public_repo.owner}/{public_repo.slug}/coord/pull", json={}, ) assert resp.status_code == 200 class TestWatchEndpoint: """Watch endpoint tests. The SSE stream is infinite by design (it polls forever). All tests that hit the streaming path mock ``coord_watch_stream`` with a finite generator so the test completes without blocking. Tests that exercise pre-stream guard logic (auth, repo resolution, kind validation) send a regular GET request and assert the HTTP status code — those code paths return before the stream generator is entered. """ @staticmethod async def _one_heartbeat(*args: str, **kwargs: str) -> AsyncIterator[str]: """Finite mock stream — yields one heartbeat then stops.""" yield ": heartbeat\n\n" async def test_watch_returns_sse_content_type( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: with patch( "musehub.api.routes.coord.coord_watch_stream", side_effect=self._one_heartbeat, ): resp = await client.get( f"/{repo.owner}/{repo.slug}/coord/watch", headers=auth_headers, ) assert resp.status_code == 200 assert "text/event-stream" in resp.headers["content-type"] async def test_watch_no_cache_header( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: with patch( "musehub.api.routes.coord.coord_watch_stream", side_effect=self._one_heartbeat, ): resp = await client.get( f"/{repo.owner}/{repo.slug}/coord/watch", headers=auth_headers, ) assert resp.headers.get("cache-control") == "no-cache" async def test_watch_yields_heartbeat_event( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: with patch( "musehub.api.routes.coord.coord_watch_stream", side_effect=self._one_heartbeat, ): resp = await client.get( f"/{repo.owner}/{repo.slug}/coord/watch", headers=auth_headers, ) assert ": heartbeat" in resp.text async def test_watch_yields_coord_record_event( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: uid = _new_id() async def _one_record(*args: str, **kwargs: str) -> AsyncIterator[str]: yield f'id: 1\nevent: coord_record\ndata: {{"id":1,"kind":"reservation","record_id":"{uid}"}}\n\n' with patch( "musehub.api.routes.coord.coord_watch_stream", side_effect=_one_record, ): resp = await client.get( f"/{repo.owner}/{repo.slug}/coord/watch", headers=auth_headers, ) assert "coord_record" in resp.text assert uid in resp.text async def test_watch_private_repo_no_auth_returns_404( self, client: AsyncClient, repo: MusehubRepo ) -> None: # No auth → private repo is invisible (404 before stream starts). resp = await client.get(f"/{repo.owner}/{repo.slug}/coord/watch") assert resp.status_code == 404 async def test_watch_invalid_kind_param_returns_400( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: # Bad kind → 400 before stream starts. resp = await client.get( f"/{repo.owner}/{repo.slug}/coord/watch?kinds=bad_kind", headers=auth_headers, ) assert resp.status_code == 400 async def test_watch_unknown_repo_returns_404( self, client: AsyncClient, auth_headers: StrDict ) -> None: resp = await client.get( "/gabriel/no-such-repo/coord/watch", headers=auth_headers, ) assert resp.status_code == 404 async def test_watch_since_id_param_passed_to_stream( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: """since_id query param is forwarded to coord_watch_stream.""" captured = {} async def _capture(repo_id: str, since_id: int | None, kinds: list[str] | None, get_session: type) -> AsyncIterator[str]: captured["since_id"] = since_id yield ": heartbeat\n\n" with patch( "musehub.api.routes.coord.coord_watch_stream", side_effect=_capture, ): await client.get( f"/{repo.owner}/{repo.slug}/coord/watch?since_id=99", headers=auth_headers, ) assert captured.get("since_id") == 99 async def test_watch_kind_filter_param_passed_to_stream( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: captured = {} async def _capture(repo_id: str, since_id: int | None, kinds: list[str] | None, get_session: type) -> AsyncIterator[str]: captured["kinds"] = kinds yield ": heartbeat\n\n" with patch( "musehub.api.routes.coord.coord_watch_stream", side_effect=_capture, ): await client.get( f"/{repo.owner}/{repo.slug}/coord/watch?kinds=reservation&kinds=heartbeat", headers=auth_headers, ) assert set(captured.get("kinds", [])) == {"reservation", "heartbeat"} # ── Security tests ───────────────────────────────────────────────────────────── class TestCoordSecurity: async def test_path_traversal_in_owner_blocked( self, client: AsyncClient, auth_headers: StrDict ) -> None: resp = await client.post( "/../../../etc/passwd/coord-test/coord/push", json={"records": [_make_record()]}, headers=auth_headers, ) # FastAPI/Starlette normalizes the path, resulting in 404 or 400. assert resp.status_code in (400, 404, 422) async def test_path_traversal_in_record_id_rejected( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [{ "kind": "reservation", "record_id": "../../etc/passwd", "payload": {}, }]}, headers=auth_headers, ) assert resp.status_code == 422 async def test_null_byte_in_record_id_rejected( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [{ "kind": "reservation", "record_id": "\x00" + _new_id()[1:], "payload": {}, }]}, headers=auth_headers, ) assert resp.status_code == 422 async def test_oversized_batch_rejected( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: records = [_make_record() for _ in range(501)] resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": records}, headers=auth_headers, ) assert resp.status_code == 422 async def test_different_user_cannot_push_to_private_repo( self, client: AsyncClient, db_session: AsyncSession, repo: MusehubRepo ) -> None: from musehub.db.musehub_identity_models import MusehubIdentity from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.main import app as _app other_id = secrets.token_hex(16) other_identity = MusehubIdentity(identity_id=other_id, handle="othercoorduser", identity_type="human") db_session.add(other_identity) await db_session.commit() _other_ctx = MSignContext(handle="othercoorduser", identity_id=other_id, is_agent=False, is_admin=False) _app.dependency_overrides[require_signed_request] = lambda: _other_ctx _app.dependency_overrides[optional_signed_request] = lambda: _other_ctx resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": [_make_record()]}, ) # Repo is invisible to other user (404) or forbidden (403). assert resp.status_code in (403, 404) async def test_unknown_kind_in_pull_filter_rejected( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={"kinds": ["injection_kind']); DROP TABLE--"]}, headers=auth_headers, ) assert resp.status_code == 422 async def test_negative_since_id_rejected( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json={"since_id": -1}, headers=auth_headers, ) assert resp.status_code == 422 # ── Stress tests ─────────────────────────────────────────────────────────────── class TestCoordStress: async def test_push_500_records_single_batch( self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo ) -> None: records = [_make_record() for _ in range(500)] resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json={"records": records}, headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["inserted"] == 500 assert body["skipped"] == 0 async def test_cursor_pagination_full_1000_records( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Push 1000 records in two batches and paginate through all with cursor.""" inserted_total = 0 for _ in range(2): # two batches of 500 records = [ CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) for _ in range(500) ] push_req = CoordPushRequest(records=records) resp = await coord_push(db_session, repo.repo_id, push_req) inserted_total += resp.inserted assert inserted_total == 1000 # Paginate with limit=100. cursor = 0 total_pulled = 0 pages = 0 while True: pull_resp = await coord_pull( db_session, repo.repo_id, CoordPollRequest(since_id=cursor, limit=100) ) if not pull_resp.records: break total_pulled += len(pull_resp.records) cursor = pull_resp.cursor pages += 1 assert total_pulled == 1000 assert pages == 10 async def test_all_kinds_push_and_pull( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Push one record per kind, pull all, assert each kind present.""" records = [ CoordRecordIn(kind=k, record_id=_new_id(), payload={"kind": k}) for k in sorted(_VALID_KINDS) ] push_req = CoordPushRequest(records=records) push_resp = await coord_push(db_session, repo.repo_id, push_req) assert push_resp.inserted == len(_VALID_KINDS) pull_resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) pulled_kinds = {r.kind for r in pull_resp.records} assert pulled_kinds == _VALID_KINDS async def test_idempotent_push_500_records_twice( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Pushing the same 500 records twice: first all inserted, then all skipped.""" records = [ CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) for _ in range(500) ] req = CoordPushRequest(records=records) resp1 = await coord_push(db_session, repo.repo_id, req) assert resp1.inserted == 500 resp2 = await coord_push(db_session, repo.repo_id, req) assert resp2.inserted == 0 assert resp2.skipped == 500