test_musehub_coord.py
python
sha256:3c58668648c7323bb9f5c6881cfe6a3f14fc93fcb73b537d253732952a5bf8bf
chore: bump version to 0.2.0rc12
Sonnet 4.6
patch
10 days ago
| 1 | """Tests for the MuseHub coordination bus. |
| 2 | |
| 3 | Covers all acceptance criteria: |
| 4 | |
| 5 | Unit: |
| 6 | - CoordRecordIn validation (kind, record_id, run_id) |
| 7 | - CoordPollRequest validation (since_id, kinds, limit) |
| 8 | - CoordPushResponse and CoordPullResponse structure |
| 9 | |
| 10 | Integration (service layer): |
| 11 | - coord_push: insert, idempotent skip, heartbeat upsert |
| 12 | - coord_pull: cursor, kind filter, limit |
| 13 | - Push then pull round-trip |
| 14 | |
| 15 | E2E (HTTP endpoints via AsyncClient): |
| 16 | - POST /{owner}/{slug}/coord/push — 200 OK, 401 unauth, 403 wrong owner, |
| 17 | 404 unknown repo, 400 bad kind, 400 bad record_id |
| 18 | - POST /{owner}/{slug}/coord/pull — 200 OK, cursor pagination |
| 19 | - GET /{owner}/{slug}/coord/watch — SSE stream response headers |
| 20 | |
| 21 | Security: |
| 22 | - Path traversal in owner/slug blocked by 404 |
| 23 | - record_id path traversal rejected by Pydantic (400) |
| 24 | - Unknown kind rejected (400) |
| 25 | - Private repo invisible to wrong user (404) |
| 26 | - Push requires auth (401) |
| 27 | |
| 28 | Stress: |
| 29 | - Push 500 records in one batch |
| 30 | - Pull 1000 records with cursor pagination |
| 31 | - 200 push + pull round-trips with correct cursor tracking |
| 32 | """ |
| 33 | |
| 34 | from __future__ import annotations |
| 35 | |
| 36 | import json |
| 37 | import secrets |
| 38 | from collections.abc import AsyncIterator |
| 39 | from datetime import datetime, timezone |
| 40 | from unittest.mock import patch |
| 41 | |
| 42 | import pytest |
| 43 | import pytest_asyncio |
| 44 | from httpx import AsyncClient |
| 45 | from sqlalchemy.ext.asyncio import AsyncSession |
| 46 | |
| 47 | from musehub.core.genesis import compute_repo_id |
| 48 | from musehub.db import coord_models as coord_db |
| 49 | from musehub.db.musehub_identity_models import MusehubIdentity |
| 50 | from musehub.db.musehub_repo_models import MusehubRepo |
| 51 | from musehub.types.json_types import JSONObject, StrDict |
| 52 | from musehub.models.coord import ( |
| 53 | CoordPollRequest, |
| 54 | CoordPushRequest, |
| 55 | CoordRecordIn, |
| 56 | _VALID_KINDS, |
| 57 | ) |
| 58 | from musehub.services.musehub_coord import coord_pull, coord_push |
| 59 | |
| 60 | |
| 61 | # ── Fixtures ─────────────────────────────────────────────────────────────────── |
| 62 | |
| 63 | |
| 64 | def _new_id() -> str: |
| 65 | return secrets.token_hex(16) |
| 66 | |
| 67 | |
| 68 | def _make_record( |
| 69 | kind: str = "reservation", |
| 70 | record_id: str | None = None, |
| 71 | run_id: str = "agent-1", |
| 72 | payload: JSONObject | None = None, |
| 73 | expires_at: datetime | None = None, |
| 74 | ) -> JSONObject: |
| 75 | return { |
| 76 | "kind": kind, |
| 77 | "record_id": record_id or _new_id(), |
| 78 | "run_id": run_id, |
| 79 | "payload": payload or {"note": "test"}, |
| 80 | "expires_at": expires_at, |
| 81 | } |
| 82 | |
| 83 | |
| 84 | @pytest_asyncio.fixture |
| 85 | async def repo(db_session: AsyncSession, test_user: MusehubIdentity) -> MusehubRepo: |
| 86 | """Create a private test repo with a unique slug to prevent cross-test conflicts.""" |
| 87 | suffix = _new_id()[:8] |
| 88 | slug = f"coord-test-{suffix}" |
| 89 | r = MusehubRepo( |
| 90 | repo_id=compute_repo_id(test_user.identity_id, slug, "", datetime.now(timezone.utc).isoformat()), |
| 91 | name="coord-test", |
| 92 | owner="gabriel", |
| 93 | slug=slug, |
| 94 | visibility="private", |
| 95 | owner_user_id=test_user.identity_id, |
| 96 | ) |
| 97 | db_session.add(r) |
| 98 | await db_session.commit() |
| 99 | await db_session.refresh(r) |
| 100 | return r |
| 101 | |
| 102 | |
| 103 | @pytest_asyncio.fixture |
| 104 | async def public_repo(db_session: AsyncSession, test_user: MusehubIdentity) -> MusehubRepo: |
| 105 | """Create a public test repo with a unique slug to prevent cross-test conflicts.""" |
| 106 | suffix = _new_id()[:8] |
| 107 | slug = f"coord-public-{suffix}" |
| 108 | r = MusehubRepo( |
| 109 | repo_id=compute_repo_id(test_user.identity_id, slug, "", datetime.now(timezone.utc).isoformat()), |
| 110 | name="coord-public", |
| 111 | owner="gabriel", |
| 112 | slug=slug, |
| 113 | visibility="public", |
| 114 | owner_user_id=test_user.identity_id, |
| 115 | ) |
| 116 | db_session.add(r) |
| 117 | await db_session.commit() |
| 118 | await db_session.refresh(r) |
| 119 | return r |
| 120 | |
| 121 | |
| 122 | # ── Unit: Pydantic model validation ─────────────────────────────────────────── |
| 123 | |
| 124 | |
| 125 | class TestCoordRecordInValidation: |
| 126 | def test_valid_record(self) -> None: |
| 127 | rec = CoordRecordIn( |
| 128 | kind="reservation", |
| 129 | record_id=_new_id(), |
| 130 | run_id="agent-1", |
| 131 | payload={"x": 1}, |
| 132 | ) |
| 133 | assert rec.kind == "reservation" |
| 134 | |
| 135 | def test_unknown_kind_rejected(self) -> None: |
| 136 | with pytest.raises(Exception, match="kind must be one of"): |
| 137 | CoordRecordIn(kind="unknown_kind", record_id=_new_id(), payload={}) |
| 138 | |
| 139 | def test_all_valid_kinds_accepted(self) -> None: |
| 140 | for kind in _VALID_KINDS: |
| 141 | rec = CoordRecordIn(kind=kind, record_id=_new_id(), payload={}) |
| 142 | assert rec.kind == kind |
| 143 | |
| 144 | def test_invalid_record_id_rejected(self) -> None: |
| 145 | with pytest.raises(Exception, match="record_id must be alphanumeric"): |
| 146 | CoordRecordIn(kind="reservation", record_id="has a space", payload={}) |
| 147 | |
| 148 | def test_path_traversal_in_record_id_rejected(self) -> None: |
| 149 | with pytest.raises(Exception): |
| 150 | CoordRecordIn(kind="reservation", record_id="../../../etc/passwd", payload={}) |
| 151 | |
| 152 | def test_null_byte_in_record_id_rejected(self) -> None: |
| 153 | with pytest.raises(Exception): |
| 154 | CoordRecordIn(kind="reservation", record_id="\x00" + _new_id()[1:], payload={}) |
| 155 | |
| 156 | def test_run_id_defaults_to_empty(self) -> None: |
| 157 | rec = CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) |
| 158 | assert rec.run_id == "" |
| 159 | |
| 160 | def test_run_id_max_length(self) -> None: |
| 161 | with pytest.raises(Exception): |
| 162 | CoordRecordIn( |
| 163 | kind="reservation", |
| 164 | record_id=_new_id(), |
| 165 | run_id="x" * 256, |
| 166 | payload={}, |
| 167 | ) |
| 168 | |
| 169 | def test_expires_at_optional(self) -> None: |
| 170 | rec = CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={}) |
| 171 | assert rec.expires_at is None |
| 172 | |
| 173 | def test_uppercase_record_id_accepted(self) -> None: |
| 174 | upper = _new_id().upper() |
| 175 | rec = CoordRecordIn(kind="reservation", record_id=upper, payload={}) |
| 176 | assert rec.record_id == upper |
| 177 | |
| 178 | |
| 179 | class TestCoordPollRequestValidation: |
| 180 | def test_defaults(self) -> None: |
| 181 | req = CoordPollRequest() |
| 182 | assert req.since_id == 0 |
| 183 | assert req.kinds == [] |
| 184 | assert req.limit == 500 |
| 185 | |
| 186 | def test_since_id_must_be_non_negative(self) -> None: |
| 187 | with pytest.raises(Exception): |
| 188 | CoordPollRequest(since_id=-1) |
| 189 | |
| 190 | def test_limit_bounds(self) -> None: |
| 191 | with pytest.raises(Exception): |
| 192 | CoordPollRequest(limit=0) |
| 193 | with pytest.raises(Exception): |
| 194 | CoordPollRequest(limit=1001) |
| 195 | |
| 196 | def test_unknown_kind_in_filter_rejected(self) -> None: |
| 197 | with pytest.raises(Exception, match="kind must be one of"): |
| 198 | CoordPollRequest(kinds=["bad_kind"]) |
| 199 | |
| 200 | def test_valid_kinds_filter(self) -> None: |
| 201 | req = CoordPollRequest(kinds=["reservation", "heartbeat"]) |
| 202 | assert "reservation" in req.kinds |
| 203 | |
| 204 | |
| 205 | # ── Integration: service layer ───────────────────────────────────────────────── |
| 206 | |
| 207 | |
| 208 | class TestCoordPush: |
| 209 | async def test_push_inserts_records( |
| 210 | self, db_session: AsyncSession, repo: MusehubRepo |
| 211 | ) -> None: |
| 212 | req = CoordPushRequest(records=[ |
| 213 | CoordRecordIn(kind="reservation", record_id=_new_id(), payload={"addr": "x"}), |
| 214 | CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={"ping": 1}), |
| 215 | ]) |
| 216 | resp = await coord_push(db_session, repo.repo_id, req) |
| 217 | assert resp.inserted == 2 |
| 218 | assert resp.skipped == 0 |
| 219 | |
| 220 | async def test_push_same_record_twice_is_skipped( |
| 221 | self, db_session: AsyncSession, repo: MusehubRepo |
| 222 | ) -> None: |
| 223 | uid = _new_id() |
| 224 | rec = CoordRecordIn(kind="reservation", record_id=uid, payload={"x": 1}) |
| 225 | req = CoordPushRequest(records=[rec]) |
| 226 | |
| 227 | resp1 = await coord_push(db_session, repo.repo_id, req) |
| 228 | assert resp1.inserted == 1 |
| 229 | |
| 230 | # Re-push the identical record. |
| 231 | resp2 = await coord_push(db_session, repo.repo_id, req) |
| 232 | assert resp2.inserted == 0 |
| 233 | assert resp2.skipped == 1 |
| 234 | |
| 235 | async def test_heartbeat_upserted( |
| 236 | self, db_session: AsyncSession, repo: MusehubRepo |
| 237 | ) -> None: |
| 238 | uid = _new_id() |
| 239 | req1 = CoordPushRequest(records=[ |
| 240 | CoordRecordIn(kind="heartbeat", record_id=uid, payload={"ts": "t1"}), |
| 241 | ]) |
| 242 | resp1 = await coord_push(db_session, repo.repo_id, req1) |
| 243 | assert resp1.inserted == 1 |
| 244 | |
| 245 | req2 = CoordPushRequest(records=[ |
| 246 | CoordRecordIn(kind="heartbeat", record_id=uid, payload={"ts": "t2"}), |
| 247 | ]) |
| 248 | resp2 = await coord_push(db_session, repo.repo_id, req2) |
| 249 | # Heartbeat upsert counts as skipped (same row, payload updated). |
| 250 | assert resp2.skipped == 1 |
| 251 | assert resp2.inserted == 0 |
| 252 | |
| 253 | async def test_push_mixed_batch( |
| 254 | self, db_session: AsyncSession, repo: MusehubRepo |
| 255 | ) -> None: |
| 256 | uid_dup = _new_id() |
| 257 | req = CoordPushRequest(records=[ |
| 258 | CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}), |
| 259 | CoordRecordIn(kind="intent", record_id=_new_id(), payload={}), |
| 260 | CoordRecordIn(kind="dependency", record_id=_new_id(), payload={}), |
| 261 | ]) |
| 262 | resp = await coord_push(db_session, repo.repo_id, req) |
| 263 | assert resp.inserted == 3 |
| 264 | |
| 265 | async def test_push_does_not_cross_repos( |
| 266 | self, db_session: AsyncSession, repo: MusehubRepo, public_repo: MusehubRepo |
| 267 | ) -> None: |
| 268 | uid = _new_id() |
| 269 | req = CoordPushRequest(records=[ |
| 270 | CoordRecordIn(kind="reservation", record_id=uid, payload={}), |
| 271 | ]) |
| 272 | await coord_push(db_session, repo.repo_id, req) |
| 273 | # Same ID but different repo_id → should insert, not skip. |
| 274 | resp2 = await coord_push(db_session, public_repo.repo_id, req) |
| 275 | assert resp2.inserted == 1 |
| 276 | |
| 277 | |
| 278 | class TestCoordPull: |
| 279 | async def test_pull_returns_inserted_records( |
| 280 | self, db_session: AsyncSession, repo: MusehubRepo |
| 281 | ) -> None: |
| 282 | uid1, uid2 = _new_id(), _new_id() |
| 283 | push_req = CoordPushRequest(records=[ |
| 284 | CoordRecordIn(kind="reservation", record_id=uid1, payload={"a": 1}), |
| 285 | CoordRecordIn(kind="heartbeat", record_id=uid2, payload={"b": 2}), |
| 286 | ]) |
| 287 | await coord_push(db_session, repo.repo_id, push_req) |
| 288 | |
| 289 | poll_req = CoordPollRequest() |
| 290 | resp = await coord_pull(db_session, repo.repo_id, poll_req) |
| 291 | record_ids = {r.record_id for r in resp.records} |
| 292 | assert uid1 in record_ids |
| 293 | assert uid2 in record_ids |
| 294 | |
| 295 | async def test_pull_cursor_advances( |
| 296 | self, db_session: AsyncSession, repo: MusehubRepo |
| 297 | ) -> None: |
| 298 | push_req = CoordPushRequest(records=[ |
| 299 | CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}), |
| 300 | ]) |
| 301 | await coord_push(db_session, repo.repo_id, push_req) |
| 302 | resp1 = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) |
| 303 | cursor = resp1.cursor |
| 304 | |
| 305 | # Push a second record. |
| 306 | push_req2 = CoordPushRequest(records=[ |
| 307 | CoordRecordIn(kind="intent", record_id=_new_id(), payload={}), |
| 308 | ]) |
| 309 | await coord_push(db_session, repo.repo_id, push_req2) |
| 310 | |
| 311 | # Pull since cursor — should only return the second record. |
| 312 | resp2 = await coord_pull( |
| 313 | db_session, repo.repo_id, CoordPollRequest(since_id=cursor) |
| 314 | ) |
| 315 | assert len(resp2.records) == 1 |
| 316 | assert resp2.records[0].kind == "intent" |
| 317 | |
| 318 | async def test_pull_empty_when_nothing_pushed( |
| 319 | self, db_session: AsyncSession, repo: MusehubRepo |
| 320 | ) -> None: |
| 321 | resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) |
| 322 | assert resp.records == [] |
| 323 | assert resp.cursor == 0 |
| 324 | |
| 325 | async def test_pull_kind_filter( |
| 326 | self, db_session: AsyncSession, repo: MusehubRepo |
| 327 | ) -> None: |
| 328 | push_req = CoordPushRequest(records=[ |
| 329 | CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}), |
| 330 | CoordRecordIn(kind="heartbeat", record_id=_new_id(), payload={}), |
| 331 | CoordRecordIn(kind="intent", record_id=_new_id(), payload={}), |
| 332 | ]) |
| 333 | await coord_push(db_session, repo.repo_id, push_req) |
| 334 | |
| 335 | resp = await coord_pull( |
| 336 | db_session, repo.repo_id, CoordPollRequest(kinds=["reservation"]) |
| 337 | ) |
| 338 | assert all(r.kind == "reservation" for r in resp.records) |
| 339 | assert len(resp.records) == 1 |
| 340 | |
| 341 | async def test_pull_limit( |
| 342 | self, db_session: AsyncSession, repo: MusehubRepo |
| 343 | ) -> None: |
| 344 | push_req = CoordPushRequest(records=[ |
| 345 | CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) |
| 346 | for _ in range(10) |
| 347 | ]) |
| 348 | await coord_push(db_session, repo.repo_id, push_req) |
| 349 | |
| 350 | resp = await coord_pull( |
| 351 | db_session, repo.repo_id, CoordPollRequest(limit=3) |
| 352 | ) |
| 353 | assert len(resp.records) == 3 |
| 354 | |
| 355 | async def test_pull_returns_oldest_first( |
| 356 | self, db_session: AsyncSession, repo: MusehubRepo |
| 357 | ) -> None: |
| 358 | uids = [_new_id() for _ in range(5)] |
| 359 | push_req = CoordPushRequest(records=[ |
| 360 | CoordRecordIn(kind="reservation", record_id=uid, payload={}) |
| 361 | for uid in uids |
| 362 | ]) |
| 363 | await coord_push(db_session, repo.repo_id, push_req) |
| 364 | |
| 365 | resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) |
| 366 | ids = [r.id for r in resp.records] |
| 367 | assert ids == sorted(ids) # oldest first = ascending IDs |
| 368 | |
| 369 | |
| 370 | # ── E2E: HTTP endpoints ──────────────────────────────────────────────────────── |
| 371 | |
| 372 | |
| 373 | class TestPushEndpoint: |
| 374 | async def test_push_success( |
| 375 | self, |
| 376 | client: AsyncClient, |
| 377 | auth_headers: StrDict, |
| 378 | repo: MusehubRepo, |
| 379 | ) -> None: |
| 380 | resp = await client.post( |
| 381 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 382 | json={"records": [_make_record()]}, |
| 383 | headers=auth_headers, |
| 384 | ) |
| 385 | assert resp.status_code == 200 |
| 386 | body = resp.json() |
| 387 | assert body["inserted"] == 1 |
| 388 | assert body["skipped"] == 0 |
| 389 | |
| 390 | async def test_push_requires_auth( |
| 391 | self, client: AsyncClient, repo: MusehubRepo |
| 392 | ) -> None: |
| 393 | resp = await client.post( |
| 394 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 395 | json={"records": [_make_record()]}, |
| 396 | ) |
| 397 | assert resp.status_code == 401 |
| 398 | |
| 399 | async def test_push_unknown_repo_returns_404( |
| 400 | self, client: AsyncClient, auth_headers: StrDict |
| 401 | ) -> None: |
| 402 | resp = await client.post( |
| 403 | "/gabriel/no-such-repo/coord/push", |
| 404 | json={"records": [_make_record()]}, |
| 405 | headers=auth_headers, |
| 406 | ) |
| 407 | assert resp.status_code == 404 |
| 408 | |
| 409 | async def test_push_bad_kind_returns_400( |
| 410 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 411 | ) -> None: |
| 412 | resp = await client.post( |
| 413 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 414 | json={"records": [_make_record(kind="bad_kind")]}, |
| 415 | headers=auth_headers, |
| 416 | ) |
| 417 | assert resp.status_code == 422 # Pydantic validation error |
| 418 | |
| 419 | async def test_push_bad_record_id_returns_422( |
| 420 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 421 | ) -> None: |
| 422 | resp = await client.post( |
| 423 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 424 | json={"records": [{ |
| 425 | "kind": "reservation", |
| 426 | "record_id": "has spaces and slashes/bad", |
| 427 | "run_id": "x", |
| 428 | "payload": {}, |
| 429 | }]}, |
| 430 | headers=auth_headers, |
| 431 | ) |
| 432 | assert resp.status_code == 422 |
| 433 | |
| 434 | async def test_push_idempotent( |
| 435 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 436 | ) -> None: |
| 437 | rec = _make_record() |
| 438 | payload = {"records": [rec]} |
| 439 | |
| 440 | r1 = await client.post(f"/{repo.owner}/{repo.slug}/coord/push", json=payload, headers=auth_headers) |
| 441 | assert r1.status_code == 200 |
| 442 | assert r1.json()["inserted"] == 1 |
| 443 | |
| 444 | r2 = await client.post(f"/{repo.owner}/{repo.slug}/coord/push", json=payload, headers=auth_headers) |
| 445 | assert r2.status_code == 200 |
| 446 | assert r2.json()["skipped"] == 1 |
| 447 | assert r2.json()["inserted"] == 0 |
| 448 | |
| 449 | async def test_push_empty_records_rejected( |
| 450 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 451 | ) -> None: |
| 452 | resp = await client.post( |
| 453 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 454 | json={"records": []}, |
| 455 | headers=auth_headers, |
| 456 | ) |
| 457 | assert resp.status_code == 422 |
| 458 | |
| 459 | async def test_push_multiple_kinds( |
| 460 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 461 | ) -> None: |
| 462 | records = [_make_record(kind=k) for k in ("reservation", "heartbeat", "intent")] |
| 463 | resp = await client.post( |
| 464 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 465 | json={"records": records}, |
| 466 | headers=auth_headers, |
| 467 | ) |
| 468 | assert resp.status_code == 200 |
| 469 | assert resp.json()["inserted"] == 3 |
| 470 | |
| 471 | |
| 472 | class TestPullEndpoint: |
| 473 | async def test_pull_empty_initially( |
| 474 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 475 | ) -> None: |
| 476 | resp = await client.post( |
| 477 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 478 | json={}, |
| 479 | headers=auth_headers, |
| 480 | ) |
| 481 | assert resp.status_code == 200 |
| 482 | body = resp.json() |
| 483 | assert body["records"] == [] |
| 484 | assert body["cursor"] == 0 |
| 485 | |
| 486 | async def test_pull_after_push( |
| 487 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 488 | ) -> None: |
| 489 | rec = _make_record() |
| 490 | push_resp = await client.post( |
| 491 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 492 | json={"records": [rec]}, |
| 493 | headers=auth_headers, |
| 494 | ) |
| 495 | assert push_resp.status_code == 200 |
| 496 | |
| 497 | pull_resp = await client.post( |
| 498 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 499 | json={}, |
| 500 | headers=auth_headers, |
| 501 | ) |
| 502 | assert pull_resp.status_code == 200 |
| 503 | body = pull_resp.json() |
| 504 | assert len(body["records"]) == 1 |
| 505 | assert body["records"][0]["record_id"] == rec["record_id"] |
| 506 | assert body["cursor"] > 0 |
| 507 | |
| 508 | async def test_pull_cursor_pagination( |
| 509 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 510 | ) -> None: |
| 511 | # Push 5 records. |
| 512 | for _ in range(5): |
| 513 | await client.post( |
| 514 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 515 | json={"records": [_make_record()]}, |
| 516 | headers=auth_headers, |
| 517 | ) |
| 518 | |
| 519 | # Pull 2 at a time. |
| 520 | resp1 = await client.post( |
| 521 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 522 | json={"limit": 2}, |
| 523 | headers=auth_headers, |
| 524 | ) |
| 525 | assert len(resp1.json()["records"]) == 2 |
| 526 | cursor1 = resp1.json()["cursor"] |
| 527 | |
| 528 | resp2 = await client.post( |
| 529 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 530 | json={"since_id": cursor1, "limit": 2}, |
| 531 | headers=auth_headers, |
| 532 | ) |
| 533 | assert len(resp2.json()["records"]) == 2 |
| 534 | cursor2 = resp2.json()["cursor"] |
| 535 | |
| 536 | resp3 = await client.post( |
| 537 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 538 | json={"since_id": cursor2, "limit": 2}, |
| 539 | headers=auth_headers, |
| 540 | ) |
| 541 | assert len(resp3.json()["records"]) == 1 # last one |
| 542 | |
| 543 | async def test_pull_kind_filter_via_http( |
| 544 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 545 | ) -> None: |
| 546 | records = [_make_record(kind="reservation"), _make_record(kind="heartbeat")] |
| 547 | await client.post( |
| 548 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 549 | json={"records": records}, |
| 550 | headers=auth_headers, |
| 551 | ) |
| 552 | resp = await client.post( |
| 553 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 554 | json={"kinds": ["heartbeat"]}, |
| 555 | headers=auth_headers, |
| 556 | ) |
| 557 | body = resp.json() |
| 558 | assert all(r["kind"] == "heartbeat" for r in body["records"]) |
| 559 | |
| 560 | async def test_pull_private_repo_requires_auth( |
| 561 | self, client: AsyncClient, repo: MusehubRepo |
| 562 | ) -> None: |
| 563 | resp = await client.post( |
| 564 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 565 | json={}, |
| 566 | ) |
| 567 | assert resp.status_code == 404 # private repo → 404 not 401 |
| 568 | |
| 569 | async def test_pull_public_repo_no_auth_required( |
| 570 | self, client: AsyncClient, public_repo: MusehubRepo |
| 571 | ) -> None: |
| 572 | resp = await client.post( |
| 573 | f"/{public_repo.owner}/{public_repo.slug}/coord/pull", |
| 574 | json={}, |
| 575 | ) |
| 576 | assert resp.status_code == 200 |
| 577 | |
| 578 | |
| 579 | class TestWatchEndpoint: |
| 580 | """Watch endpoint tests. |
| 581 | |
| 582 | The SSE stream is infinite by design (it polls forever). All tests that |
| 583 | hit the streaming path mock ``coord_watch_stream`` with a finite generator |
| 584 | so the test completes without blocking. Tests that exercise pre-stream |
| 585 | guard logic (auth, repo resolution, kind validation) send a regular GET |
| 586 | request and assert the HTTP status code — those code paths return before |
| 587 | the stream generator is entered. |
| 588 | """ |
| 589 | |
| 590 | @staticmethod |
| 591 | async def _one_heartbeat(*args: str, **kwargs: str) -> AsyncIterator[str]: |
| 592 | """Finite mock stream — yields one heartbeat then stops.""" |
| 593 | yield ": heartbeat\n\n" |
| 594 | |
| 595 | async def test_watch_returns_sse_content_type( |
| 596 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 597 | ) -> None: |
| 598 | with patch( |
| 599 | "musehub.api.routes.coord.coord_watch_stream", |
| 600 | side_effect=self._one_heartbeat, |
| 601 | ): |
| 602 | resp = await client.get( |
| 603 | f"/{repo.owner}/{repo.slug}/coord/watch", |
| 604 | headers=auth_headers, |
| 605 | ) |
| 606 | assert resp.status_code == 200 |
| 607 | assert "text/event-stream" in resp.headers["content-type"] |
| 608 | |
| 609 | async def test_watch_no_cache_header( |
| 610 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 611 | ) -> None: |
| 612 | with patch( |
| 613 | "musehub.api.routes.coord.coord_watch_stream", |
| 614 | side_effect=self._one_heartbeat, |
| 615 | ): |
| 616 | resp = await client.get( |
| 617 | f"/{repo.owner}/{repo.slug}/coord/watch", |
| 618 | headers=auth_headers, |
| 619 | ) |
| 620 | assert resp.headers.get("cache-control") == "no-cache" |
| 621 | |
| 622 | async def test_watch_yields_heartbeat_event( |
| 623 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 624 | ) -> None: |
| 625 | with patch( |
| 626 | "musehub.api.routes.coord.coord_watch_stream", |
| 627 | side_effect=self._one_heartbeat, |
| 628 | ): |
| 629 | resp = await client.get( |
| 630 | f"/{repo.owner}/{repo.slug}/coord/watch", |
| 631 | headers=auth_headers, |
| 632 | ) |
| 633 | assert ": heartbeat" in resp.text |
| 634 | |
| 635 | async def test_watch_yields_coord_record_event( |
| 636 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 637 | ) -> None: |
| 638 | uid = _new_id() |
| 639 | |
| 640 | async def _one_record(*args: str, **kwargs: str) -> AsyncIterator[str]: |
| 641 | yield f'id: 1\nevent: coord_record\ndata: {{"id":1,"kind":"reservation","record_id":"{uid}"}}\n\n' |
| 642 | |
| 643 | with patch( |
| 644 | "musehub.api.routes.coord.coord_watch_stream", |
| 645 | side_effect=_one_record, |
| 646 | ): |
| 647 | resp = await client.get( |
| 648 | f"/{repo.owner}/{repo.slug}/coord/watch", |
| 649 | headers=auth_headers, |
| 650 | ) |
| 651 | assert "coord_record" in resp.text |
| 652 | assert uid in resp.text |
| 653 | |
| 654 | async def test_watch_private_repo_no_auth_returns_404( |
| 655 | self, client: AsyncClient, repo: MusehubRepo |
| 656 | ) -> None: |
| 657 | # No auth → private repo is invisible (404 before stream starts). |
| 658 | resp = await client.get(f"/{repo.owner}/{repo.slug}/coord/watch") |
| 659 | assert resp.status_code == 404 |
| 660 | |
| 661 | async def test_watch_invalid_kind_param_returns_400( |
| 662 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 663 | ) -> None: |
| 664 | # Bad kind → 400 before stream starts. |
| 665 | resp = await client.get( |
| 666 | f"/{repo.owner}/{repo.slug}/coord/watch?kinds=bad_kind", |
| 667 | headers=auth_headers, |
| 668 | ) |
| 669 | assert resp.status_code == 400 |
| 670 | |
| 671 | async def test_watch_unknown_repo_returns_404( |
| 672 | self, client: AsyncClient, auth_headers: StrDict |
| 673 | ) -> None: |
| 674 | resp = await client.get( |
| 675 | "/gabriel/no-such-repo/coord/watch", |
| 676 | headers=auth_headers, |
| 677 | ) |
| 678 | assert resp.status_code == 404 |
| 679 | |
| 680 | async def test_watch_since_id_param_passed_to_stream( |
| 681 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 682 | ) -> None: |
| 683 | """since_id query param is forwarded to coord_watch_stream.""" |
| 684 | captured = {} |
| 685 | |
| 686 | async def _capture(repo_id: str, since_id: int | None, kinds: list[str] | None, get_session: type) -> AsyncIterator[str]: |
| 687 | captured["since_id"] = since_id |
| 688 | yield ": heartbeat\n\n" |
| 689 | |
| 690 | with patch( |
| 691 | "musehub.api.routes.coord.coord_watch_stream", |
| 692 | side_effect=_capture, |
| 693 | ): |
| 694 | await client.get( |
| 695 | f"/{repo.owner}/{repo.slug}/coord/watch?since_id=99", |
| 696 | headers=auth_headers, |
| 697 | ) |
| 698 | assert captured.get("since_id") == 99 |
| 699 | |
| 700 | async def test_watch_kind_filter_param_passed_to_stream( |
| 701 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 702 | ) -> None: |
| 703 | captured = {} |
| 704 | |
| 705 | async def _capture(repo_id: str, since_id: int | None, kinds: list[str] | None, get_session: type) -> AsyncIterator[str]: |
| 706 | captured["kinds"] = kinds |
| 707 | yield ": heartbeat\n\n" |
| 708 | |
| 709 | with patch( |
| 710 | "musehub.api.routes.coord.coord_watch_stream", |
| 711 | side_effect=_capture, |
| 712 | ): |
| 713 | await client.get( |
| 714 | f"/{repo.owner}/{repo.slug}/coord/watch?kinds=reservation&kinds=heartbeat", |
| 715 | headers=auth_headers, |
| 716 | ) |
| 717 | assert set(captured.get("kinds", [])) == {"reservation", "heartbeat"} |
| 718 | |
| 719 | |
| 720 | # ── Security tests ───────────────────────────────────────────────────────────── |
| 721 | |
| 722 | |
| 723 | class TestCoordSecurity: |
| 724 | async def test_path_traversal_in_owner_blocked( |
| 725 | self, client: AsyncClient, auth_headers: StrDict |
| 726 | ) -> None: |
| 727 | resp = await client.post( |
| 728 | "/../../../etc/passwd/coord-test/coord/push", |
| 729 | json={"records": [_make_record()]}, |
| 730 | headers=auth_headers, |
| 731 | ) |
| 732 | # FastAPI/Starlette normalizes the path, resulting in 404 or 400. |
| 733 | assert resp.status_code in (400, 404, 422) |
| 734 | |
| 735 | async def test_path_traversal_in_record_id_rejected( |
| 736 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 737 | ) -> None: |
| 738 | resp = await client.post( |
| 739 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 740 | json={"records": [{ |
| 741 | "kind": "reservation", |
| 742 | "record_id": "../../etc/passwd", |
| 743 | "payload": {}, |
| 744 | }]}, |
| 745 | headers=auth_headers, |
| 746 | ) |
| 747 | assert resp.status_code == 422 |
| 748 | |
| 749 | async def test_null_byte_in_record_id_rejected( |
| 750 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 751 | ) -> None: |
| 752 | resp = await client.post( |
| 753 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 754 | json={"records": [{ |
| 755 | "kind": "reservation", |
| 756 | "record_id": "\x00" + _new_id()[1:], |
| 757 | "payload": {}, |
| 758 | }]}, |
| 759 | headers=auth_headers, |
| 760 | ) |
| 761 | assert resp.status_code == 422 |
| 762 | |
| 763 | async def test_oversized_batch_rejected( |
| 764 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 765 | ) -> None: |
| 766 | records = [_make_record() for _ in range(501)] |
| 767 | resp = await client.post( |
| 768 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 769 | json={"records": records}, |
| 770 | headers=auth_headers, |
| 771 | ) |
| 772 | assert resp.status_code == 422 |
| 773 | |
| 774 | async def test_different_user_cannot_push_to_private_repo( |
| 775 | self, client: AsyncClient, db_session: AsyncSession, repo: MusehubRepo |
| 776 | ) -> None: |
| 777 | from musehub.db.musehub_identity_models import MusehubIdentity |
| 778 | from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request |
| 779 | from musehub.main import app as _app |
| 780 | |
| 781 | other_id = secrets.token_hex(16) |
| 782 | other_identity = MusehubIdentity(identity_id=other_id, handle="othercoorduser", identity_type="human") |
| 783 | db_session.add(other_identity) |
| 784 | await db_session.commit() |
| 785 | _other_ctx = MSignContext(handle="othercoorduser", identity_id=other_id, is_agent=False, is_admin=False) |
| 786 | _app.dependency_overrides[require_signed_request] = lambda: _other_ctx |
| 787 | _app.dependency_overrides[optional_signed_request] = lambda: _other_ctx |
| 788 | |
| 789 | resp = await client.post( |
| 790 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 791 | json={"records": [_make_record()]}, |
| 792 | ) |
| 793 | # Repo is invisible to other user (404) or forbidden (403). |
| 794 | assert resp.status_code in (403, 404) |
| 795 | |
| 796 | async def test_unknown_kind_in_pull_filter_rejected( |
| 797 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 798 | ) -> None: |
| 799 | resp = await client.post( |
| 800 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 801 | json={"kinds": ["injection_kind']); DROP TABLE--"]}, |
| 802 | headers=auth_headers, |
| 803 | ) |
| 804 | assert resp.status_code == 422 |
| 805 | |
| 806 | async def test_negative_since_id_rejected( |
| 807 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 808 | ) -> None: |
| 809 | resp = await client.post( |
| 810 | f"/{repo.owner}/{repo.slug}/coord/pull", |
| 811 | json={"since_id": -1}, |
| 812 | headers=auth_headers, |
| 813 | ) |
| 814 | assert resp.status_code == 422 |
| 815 | |
| 816 | |
| 817 | # ── Stress tests ─────────────────────────────────────────────────────────────── |
| 818 | |
| 819 | |
| 820 | class TestCoordStress: |
| 821 | async def test_push_500_records_single_batch( |
| 822 | self, client: AsyncClient, auth_headers: StrDict, repo: MusehubRepo |
| 823 | ) -> None: |
| 824 | records = [_make_record() for _ in range(500)] |
| 825 | resp = await client.post( |
| 826 | f"/{repo.owner}/{repo.slug}/coord/push", |
| 827 | json={"records": records}, |
| 828 | headers=auth_headers, |
| 829 | ) |
| 830 | assert resp.status_code == 200 |
| 831 | body = resp.json() |
| 832 | assert body["inserted"] == 500 |
| 833 | assert body["skipped"] == 0 |
| 834 | |
| 835 | async def test_cursor_pagination_full_1000_records( |
| 836 | self, db_session: AsyncSession, repo: MusehubRepo |
| 837 | ) -> None: |
| 838 | """Push 1000 records in two batches and paginate through all with cursor.""" |
| 839 | inserted_total = 0 |
| 840 | for _ in range(2): # two batches of 500 |
| 841 | records = [ |
| 842 | CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) |
| 843 | for _ in range(500) |
| 844 | ] |
| 845 | push_req = CoordPushRequest(records=records) |
| 846 | resp = await coord_push(db_session, repo.repo_id, push_req) |
| 847 | inserted_total += resp.inserted |
| 848 | assert inserted_total == 1000 |
| 849 | |
| 850 | # Paginate with limit=100. |
| 851 | cursor = 0 |
| 852 | total_pulled = 0 |
| 853 | pages = 0 |
| 854 | while True: |
| 855 | pull_resp = await coord_pull( |
| 856 | db_session, repo.repo_id, |
| 857 | CoordPollRequest(since_id=cursor, limit=100) |
| 858 | ) |
| 859 | if not pull_resp.records: |
| 860 | break |
| 861 | total_pulled += len(pull_resp.records) |
| 862 | cursor = pull_resp.cursor |
| 863 | pages += 1 |
| 864 | |
| 865 | assert total_pulled == 1000 |
| 866 | assert pages == 10 |
| 867 | |
| 868 | async def test_all_kinds_push_and_pull( |
| 869 | self, db_session: AsyncSession, repo: MusehubRepo |
| 870 | ) -> None: |
| 871 | """Push one record per kind, pull all, assert each kind present.""" |
| 872 | records = [ |
| 873 | CoordRecordIn(kind=k, record_id=_new_id(), payload={"kind": k}) |
| 874 | for k in sorted(_VALID_KINDS) |
| 875 | ] |
| 876 | push_req = CoordPushRequest(records=records) |
| 877 | push_resp = await coord_push(db_session, repo.repo_id, push_req) |
| 878 | assert push_resp.inserted == len(_VALID_KINDS) |
| 879 | |
| 880 | pull_resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) |
| 881 | pulled_kinds = {r.kind for r in pull_resp.records} |
| 882 | assert pulled_kinds == _VALID_KINDS |
| 883 | |
| 884 | async def test_idempotent_push_500_records_twice( |
| 885 | self, db_session: AsyncSession, repo: MusehubRepo |
| 886 | ) -> None: |
| 887 | """Pushing the same 500 records twice: first all inserted, then all skipped.""" |
| 888 | records = [ |
| 889 | CoordRecordIn(kind="reservation", record_id=_new_id(), payload={}) |
| 890 | for _ in range(500) |
| 891 | ] |
| 892 | req = CoordPushRequest(records=records) |
| 893 | |
| 894 | resp1 = await coord_push(db_session, repo.repo_id, req) |
| 895 | assert resp1.inserted == 500 |
| 896 | |
| 897 | resp2 = await coord_push(db_session, repo.repo_id, req) |
| 898 | assert resp2.inserted == 0 |
| 899 | assert resp2.skipped == 500 |
File History
1 commit
sha256:35d76015db2541686c33edd44343ea2d9f751325b4a5556cc9c4c9c0f84edbbe
chore: bump version to 0.2.0rc12
Sonnet 4.6
patch
8 days ago