"""Section 7 — Coordination (muse coord): 7-layer test suite. Covers: - musehub/api/routes/coord.py (push_coord, pull_coord, watch_coord HTTP handlers, _resolve_repo, _assert_readable, _assert_writable) - musehub/services/musehub_coord.py (coord_push, coord_pull, coord_watch_stream, _row_to_out, write-once semantics, heartbeat upsert) - musehub/services/musehub_coord_server.py (materialize_coord_record, list_reservations, conflict_check, extend_reservation, list_tasks, claim_task, complete_task, fail_task) - musehub/models/coord.py (CoordRecordIn validators, CoordPushRequest, CoordPollRequest) - musehub/db/coord_models.py (MusehubCoordRecord, MusehubCoordReservation, MusehubCoordTask) Layers: 1. Unit — model validators, pure helpers, no DB 2. Integration — real DB (PostgreSQL), service-layer calls, no HTTP 3. End-to-End — full HTTP via AsyncClient, real DB 4. Stress — 500-record push, 100 tasks, cursor pagination 5. Data Integrity — write-once, heartbeat upsert, constraint enforcement, task lifecycle state machine 6. Security — auth guards, ownership enforcement, private-repo 404, invalid kind rejection 7. Performance — latency budgets for push/pull/materialize """ import asyncio import secrets import time from datetime import datetime, timedelta, timezone import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id from musehub.types.json_types import JSONObject, StrDict from musehub.models.coord import ( CoordPollRequest, CoordPushRequest, CoordRecordIn, _VALID_KINDS, ) from muse.core.types import fake_id from tests.factories import create_repo # --------------------------------------------------------------------------- # Local helpers # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _new_id() -> str: return secrets.token_hex(16) def _record( kind: str = "intent", record_id: str | None = None, run_id: str = "agent-1", payload: JSONObject | None = None, expires_at: datetime | None = None, ) -> CoordRecordIn: return CoordRecordIn( kind=kind, record_id=record_id or _new_id(), run_id=run_id, payload=payload or {"action": kind, "data": "test"}, expires_at=expires_at, ) def _push_body(*records: CoordRecordIn) -> JSONObject: return {"records": [r.model_dump(mode="json") for r in records]} def _pull_body(since_id: int = 0, kinds: list[str] | None = None, limit: int = 500) -> JSONObject: return {"since_id": since_id, "kinds": kinds or [], "limit": limit} # =========================================================================== # Layer 1 — Unit tests (model validators, pure helpers) # =========================================================================== class TestCoordRecordInValidators: def test_valid_kinds_accepted(self) -> None: for kind in _VALID_KINDS: r = _record(kind=kind) assert r.kind == kind def test_invalid_kind_raises(self) -> None: import pytest with pytest.raises(Exception): _record(kind="garbage") def test_invalid_record_id_rejected(self) -> None: with pytest.raises(Exception): _record(record_id="has spaces and slashes/bad") def test_uppercase_record_id_accepted(self) -> None: uid = secrets.token_hex(16).upper() r = _record(record_id=uid) assert r.record_id == uid def test_run_id_empty_string_allowed(self) -> None: r = _record(run_id="") assert r.run_id == "" def test_expires_at_optional(self) -> None: r = _record() assert r.expires_at is None def test_expires_at_accepted(self) -> None: exp = _now() + timedelta(seconds=300) r = _record(expires_at=exp) assert r.expires_at is not None class TestCoordPushRequestValidators: def test_empty_records_rejected(self) -> None: with pytest.raises(Exception): CoordPushRequest(records=[]) def test_max_500_records_accepted(self) -> None: records = [_record() for _ in range(500)] req = CoordPushRequest(records=records) assert len(req.records) == 500 def test_501_records_rejected(self) -> None: with pytest.raises(Exception): CoordPushRequest(records=[_record() for _ in range(501)]) def test_single_record_accepted(self) -> None: req = CoordPushRequest(records=[_record()]) assert len(req.records) == 1 class TestCoordPollRequestValidators: def test_default_values(self) -> None: req = CoordPollRequest() assert req.since_id == 0 assert req.kinds == [] assert req.limit == 500 def test_since_id_must_be_non_negative(self) -> None: with pytest.raises(Exception): CoordPollRequest(since_id=-1) def test_invalid_kind_in_pull_rejected(self) -> None: with pytest.raises(Exception): CoordPollRequest(kinds=["garbage"]) def test_valid_kinds_filter_accepted(self) -> None: req = CoordPollRequest(kinds=["reservation", "task"]) assert set(req.kinds) == {"reservation", "task"} def test_limit_range(self) -> None: assert CoordPollRequest(limit=1).limit == 1 assert CoordPollRequest(limit=1000).limit == 1000 with pytest.raises(Exception): CoordPollRequest(limit=0) with pytest.raises(Exception): CoordPollRequest(limit=1001) class TestValidKinds: def test_all_expected_kinds_present(self) -> None: expected = {"reservation", "intent", "release", "heartbeat", "dependency", "task", "claim"} assert expected == _VALID_KINDS # =========================================================================== # Layer 2 — Integration tests (real DB, service layer, no HTTP) # =========================================================================== class TestCoordPushIntegration: @pytest.mark.asyncio async def test_push_inserts_records(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push repo = await create_repo(db_session, slug="push-insert") req = CoordPushRequest(records=[_record("intent"), _record("dependency")]) resp = await coord_push(db_session, repo.repo_id, req) assert resp.inserted == 2 assert resp.skipped == 0 @pytest.mark.asyncio async def test_push_write_once_skips_duplicate(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push repo = await create_repo(db_session, slug="push-writeonce") rec = _record("intent") req = CoordPushRequest(records=[rec]) resp1 = await coord_push(db_session, repo.repo_id, req) assert resp1.inserted == 1 resp2 = await coord_push(db_session, repo.repo_id, req) assert resp2.skipped == 1 assert resp2.inserted == 0 @pytest.mark.asyncio async def test_push_heartbeat_upserts_payload(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="push-hb-upsert") uid = _new_id() req1 = CoordPushRequest(records=[_record("heartbeat", record_id=uid, payload={"tick": 1})]) req2 = CoordPushRequest(records=[_record("heartbeat", record_id=uid, payload={"tick": 2})]) r1 = await coord_push(db_session, repo.repo_id, req1) assert r1.inserted == 1 r2 = await coord_push(db_session, repo.repo_id, req2) # Re-push of same heartbeat → upsert, counted as skipped (no new row) assert r2.skipped == 1 # Payload should be updated pull_resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest(kinds=["heartbeat"])) assert len(pull_resp.records) == 1 assert pull_resp.records[0].payload["tick"] == 2 @pytest.mark.asyncio async def test_push_all_valid_kinds(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push repo = await create_repo(db_session, slug="push-all-kinds") records = [_record(k) for k in _VALID_KINDS] req = CoordPushRequest(records=records) resp = await coord_push(db_session, repo.repo_id, req) assert resp.inserted == len(_VALID_KINDS) @pytest.mark.asyncio async def test_push_materializes_reservation(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import list_reservations repo = await create_repo(db_session, slug="push-materialize-res") rec_id = _new_id() res_id = fake_id(rec_id) payload = { "reservation_id": res_id, "run_id": "agent-42", "addresses": ["src/main.py::process"], "ttl_s": 300, } req = CoordPushRequest(records=[_record("reservation", record_id=rec_id, payload=payload)]) await coord_push(db_session, repo.repo_id, req) reservations = await list_reservations(db_session, repo.repo_id) assert len(reservations) == 1 assert reservations[0].symbol_address == "src/main.py::process" assert reservations[0].agent_id == "agent-42" @pytest.mark.asyncio async def test_push_materializes_task(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import list_tasks repo = await create_repo(db_session, slug="push-materialize-task") rec_id = _new_id() task_id = fake_id(rec_id) payload = { "task_id": task_id, "queue": "ci", "priority": 10, "created_by": "dispatcher", } req = CoordPushRequest(records=[_record("task", record_id=rec_id, payload=payload)]) await coord_push(db_session, repo.repo_id, req) tasks = await list_tasks(db_session, repo.repo_id) assert len(tasks) == 1 assert tasks[0].task_id == task_id assert tasks[0].queue == "ci" assert tasks[0].priority == 10 assert tasks[0].status == "pending" class TestCoordPullIntegration: @pytest.mark.asyncio async def test_pull_empty_returns_cursor_zero(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_pull repo = await create_repo(db_session, slug="pull-empty") resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) assert resp.records == [] assert resp.cursor == 0 @pytest.mark.asyncio async def test_pull_returns_all_pushed_records(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="pull-all") req = CoordPushRequest(records=[_record("intent"), _record("dependency"), _record("heartbeat")]) await coord_push(db_session, repo.repo_id, req) resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) assert len(resp.records) == 3 assert resp.cursor == resp.records[-1].id @pytest.mark.asyncio async def test_pull_since_id_cursor_pagination(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="pull-cursor") for _ in range(5): await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("intent")])) # Fetch first 3 resp1 = await coord_pull(db_session, repo.repo_id, CoordPollRequest(limit=3)) assert len(resp1.records) == 3 cursor = resp1.cursor # Fetch next 2 using cursor resp2 = await coord_pull(db_session, repo.repo_id, CoordPollRequest(since_id=cursor)) assert len(resp2.records) == 2 # IDs must be strictly greater than cursor assert all(r.id > cursor for r in resp2.records) @pytest.mark.asyncio async def test_pull_kinds_filter(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="pull-kinds-filter") await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("intent"), _record("heartbeat"), _record("dependency")])) resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest(kinds=["intent"])) assert len(resp.records) == 1 assert resp.records[0].kind == "intent" @pytest.mark.asyncio async def test_pull_ordered_oldest_first(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="pull-ordered") for _ in range(3): await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("intent")])) resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest()) ids = [r.id for r in resp.records] assert ids == sorted(ids) @pytest.mark.asyncio async def test_pull_limit_respected(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="pull-limit") for _ in range(10): await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("intent")])) resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest(limit=4)) assert len(resp.records) == 4 class TestCoordServerIntegration: @pytest.mark.asyncio async def test_conflict_check_no_reservations(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord_server import conflict_check repo = await create_repo(db_session, slug="conflict-empty") result = await conflict_check(db_session, repo.repo_id, ["a.py::Fn"]) assert result == [] @pytest.mark.asyncio async def test_conflict_check_finds_active_reservation( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import conflict_check repo = await create_repo(db_session, slug="conflict-found") rec_id = _new_id() res_id = fake_id(rec_id) exp = _now() + timedelta(seconds=300) payload = { "reservation_id": res_id, "run_id": "worker-1", "addresses": ["a.py::MyFn"], "ttl_s": 300, "expires_at": exp.isoformat(), } await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("reservation", record_id=rec_id, payload=payload, expires_at=exp)])) conflicts = await conflict_check(db_session, repo.repo_id, ["a.py::MyFn"]) assert len(conflicts) == 1 assert conflicts[0]["symbol_address"] == "a.py::MyFn" @pytest.mark.asyncio async def test_conflict_check_ignores_expired_reservation( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import conflict_check from musehub.db import coord_models as _cm repo = await create_repo(db_session, slug="conflict-expired") # Insert reservation directly with past expires_at past = _now() - timedelta(seconds=10) row = _cm.MusehubCoordReservation( reservation_id=fake_id("expired-reservation"), repo_id=repo.repo_id, symbol_address="a.py::OldFn", agent_id="old-agent", ttl_s=10, created_at=_now() - timedelta(seconds=20), expires_at=past, ) db_session.add(row) await db_session.commit() conflicts = await conflict_check(db_session, repo.repo_id, ["a.py::OldFn"]) assert conflicts == [] @pytest.mark.asyncio async def test_extend_reservation(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import extend_reservation, list_reservations repo = await create_repo(db_session, slug="extend-reservation") rec_id = _new_id() res_id = fake_id(rec_id) exp = _now() + timedelta(seconds=60) payload = { "reservation_id": res_id, "run_id": "agent-ext", "addresses": ["b.py::Fn"], "ttl_s": 60, "expires_at": exp.isoformat(), } await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("reservation", record_id=rec_id, payload=payload, expires_at=exp)])) res_before = await list_reservations(db_session, repo.repo_id) old_exp = res_before[0].expires_at updated = await extend_reservation(db_session, repo.repo_id, res_id, extend_by_s=600) assert updated is not None # New expiry must be later than original new_exp = updated.expires_at if old_exp.tzinfo is None: old_exp = old_exp.replace(tzinfo=timezone.utc) if new_exp.tzinfo is None: new_exp = new_exp.replace(tzinfo=timezone.utc) assert new_exp > old_exp @pytest.mark.asyncio async def test_task_lifecycle_claim_complete(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import claim_task, complete_task repo = await create_repo(db_session, slug="task-lifecycle") rec_id = _new_id() task_id = fake_id(rec_id) payload = {"task_id": task_id, "queue": "default", "priority": 50, "created_by": "dispatcher"} await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload=payload)])) claimed = await claim_task(db_session, repo.repo_id, task_id, "worker-1") assert claimed is not None assert claimed.status == "claimed" assert claimed.claimed_by == "worker-1" completed = await complete_task(db_session, repo.repo_id, task_id, "worker-1", result={"output": "done"}) assert completed is not None assert completed.status == "completed" assert completed.payload.get("result") == {"output": "done"} @pytest.mark.asyncio async def test_task_lifecycle_claim_fail(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import claim_task, fail_task repo = await create_repo(db_session, slug="task-fail") rec_id = _new_id() task_id = fake_id(rec_id) payload = {"task_id": task_id, "queue": "default", "priority": 50} await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload=payload)])) await claim_task(db_session, repo.repo_id, task_id, "worker-2") failed = await fail_task(db_session, repo.repo_id, task_id, "worker-2", reason="OOM") assert failed is not None assert failed.status == "failed" assert failed.payload.get("failure_reason") == "OOM" @pytest.mark.asyncio async def test_claim_already_claimed_task_returns_none( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import claim_task repo = await create_repo(db_session, slug="double-claim") rec_id = _new_id() task_id = fake_id(rec_id) payload = {"task_id": task_id, "queue": "default"} await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload=payload)])) r1 = await claim_task(db_session, repo.repo_id, task_id, "worker-A") assert r1 is not None r2 = await claim_task(db_session, repo.repo_id, task_id, "worker-B") assert r2 is None # Already claimed by worker-A @pytest.mark.asyncio async def test_list_tasks_filter_by_status(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import claim_task, list_tasks repo = await create_repo(db_session, slug="list-tasks-status") tids = [] for i in range(3): rec_id = _new_id() tid = fake_id(rec_id) tids.append(tid) await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload={"task_id": tid})])) if i == 0: await claim_task(db_session, repo.repo_id, tid, "worker-X") pending = await list_tasks(db_session, repo.repo_id, status="pending") claimed = await list_tasks(db_session, repo.repo_id, status="claimed") assert len(pending) == 2 assert len(claimed) == 1 # =========================================================================== # Layer 3 — End-to-End tests (full HTTP via AsyncClient, real DB) # =========================================================================== class TestCoordEndToEnd: @pytest.mark.asyncio async def test_push_404_unknown_repo( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: resp = await client.post( "/ghost-owner/ghost-repo/coord/push", json=_push_body(_record()), headers=auth_headers, ) assert resp.status_code == 404 @pytest.mark.asyncio async def test_push_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="push-noauth") await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=_push_body(_record()), ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_push_403_non_owner( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: # auth_headers gives identity_id = _TEST_IDENTITY_ID; create repo with different owner_user_id repo = await create_repo(db_session, slug="push-nonowner", owner_user_id=compute_identity_id(b"other-user")) await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=_push_body(_record()), headers=auth_headers, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_push_success_returns_counts( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: from tests.conftest import _TEST_IDENTITY_ID repo = await create_repo(db_session, slug="push-e2e-ok", owner_user_id=_TEST_IDENTITY_ID) await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=_push_body(_record("intent"), _record("dependency")), headers=auth_headers, ) assert resp.status_code == 200 data = resp.json() assert data["inserted"] == 2 assert data["skipped"] == 0 @pytest.mark.asyncio async def test_pull_public_repo_no_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="pull-e2e-pub", visibility="public") await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json=_pull_body(), ) assert resp.status_code == 200 data = resp.json() assert "records" in data assert "cursor" in data @pytest.mark.asyncio async def test_pull_private_repo_404_no_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="pull-e2e-priv", visibility="private") await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json=_pull_body(), ) assert resp.status_code == 404 @pytest.mark.asyncio async def test_pull_returns_pushed_records( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: from tests.conftest import _TEST_IDENTITY_ID repo = await create_repo(db_session, slug="pull-e2e-round", owner_user_id=_TEST_IDENTITY_ID, visibility="public") await db_session.commit() push_resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=_push_body(_record("intent"), _record("heartbeat")), headers=auth_headers, ) assert push_resp.status_code == 200 pull_resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json=_pull_body(), ) assert pull_resp.status_code == 200 data = pull_resp.json() assert len(data["records"]) == 2 @pytest.mark.asyncio async def test_pull_kinds_filter_via_http( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: from tests.conftest import _TEST_IDENTITY_ID repo = await create_repo(db_session, slug="pull-e2e-filter", owner_user_id=_TEST_IDENTITY_ID, visibility="public") await db_session.commit() await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=_push_body(_record("intent"), _record("dependency"), _record("heartbeat")), headers=auth_headers, ) pull_resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json=_pull_body(kinds=["heartbeat"]), ) assert pull_resp.status_code == 200 records = pull_resp.json()["records"] assert len(records) == 1 assert records[0]["kind"] == "heartbeat" @pytest.mark.asyncio async def test_watch_invalid_kind_400( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="watch-invalid-kind", visibility="public") await db_session.commit() resp = await client.get( f"/{repo.owner}/{repo.slug}/coord/watch", params={"kinds": "garbage"}, ) assert resp.status_code == 400 @pytest.mark.asyncio async def test_watch_404_unknown_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/ghost/norepo/coord/watch") assert resp.status_code == 404 # =========================================================================== # Layer 4 — Stress tests # =========================================================================== class TestStress: @pytest.mark.asyncio async def test_push_500_records_single_call(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="stress-push-500") records = [_record("intent") for _ in range(500)] req = CoordPushRequest(records=records) resp = await coord_push(db_session, repo.repo_id, req) assert resp.inserted == 500 assert resp.skipped == 0 # All 500 must be pullable pull = await coord_pull(db_session, repo.repo_id, CoordPollRequest(limit=1000)) assert len(pull.records) == 500 @pytest.mark.asyncio async def test_cursor_pagination_through_500_records( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="stress-cursor-500") records = [_record("dependency") for _ in range(500)] await coord_push(db_session, repo.repo_id, CoordPushRequest(records=records)) cursor = 0 fetched = 0 pages = 0 while True: page = await coord_pull(db_session, repo.repo_id, CoordPollRequest(since_id=cursor, limit=100)) if not page.records: break fetched += len(page.records) cursor = page.cursor pages += 1 assert fetched == 500 assert pages == 5 @pytest.mark.asyncio async def test_task_queue_100_tasks(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import list_tasks, claim_task repo = await create_repo(db_session, slug="stress-100-tasks") for _ in range(100): rec_id = _new_id() tid = fake_id(rec_id) payload = {"task_id": tid, "queue": "batch", "priority": 50} await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload=payload)])) tasks = await list_tasks(db_session, repo.repo_id, queue="batch", limit=100) assert len(tasks) == 100 # Claim first 10 claimed_count = 0 for task in tasks[:10]: result = await claim_task(db_session, repo.repo_id, task.task_id, "batch-worker") if result is not None: claimed_count += 1 assert claimed_count == 10 @pytest.mark.asyncio async def test_conflict_check_100_reserved_symbols( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import conflict_check from musehub.db import coord_models as _cm repo = await create_repo(db_session, slug="stress-conflict-100") # Insert 100 reservations directly exp = _now() + timedelta(seconds=300) for i in range(100): row = _cm.MusehubCoordReservation( reservation_id=fake_id(f"stress-res-{i}"), repo_id=repo.repo_id, symbol_address=f"module/file_{i}.py::Fn{i}", agent_id=f"agent-{i}", ttl_s=300, created_at=_now(), expires_at=exp, ) db_session.add(row) await db_session.commit() # Check the last 50 — all should conflict addresses = [f"module/file_{i}.py::Fn{i}" for i in range(50, 100)] conflicts = await conflict_check(db_session, repo.repo_id, addresses) assert len(conflicts) == 50 # =========================================================================== # Layer 5 — Data Integrity tests # =========================================================================== class TestDataIntegrity: @pytest.mark.asyncio async def test_write_once_constraint_enforced(self, db_session: AsyncSession) -> None: """The UniqueConstraint on (repo_id, kind, record_id) must hold.""" from musehub.services.musehub_coord import coord_push repo = await create_repo(db_session, slug="di-unique-constraint") uid = _new_id() rec = _record("intent", record_id=uid) r1 = await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[rec])) r2 = await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[rec])) # First → inserted, second → skipped (not error) assert r1.inserted == 1 assert r2.skipped == 1 @pytest.mark.asyncio async def test_heartbeat_upsert_does_not_create_new_row( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="di-hb-no-dup") uid = _new_id() for i in range(5): rec = _record("heartbeat", record_id=uid, payload={"tick": i}) await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[rec])) resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest(kinds=["heartbeat"])) # Only 1 row despite 5 pushes assert len(resp.records) == 1 assert resp.records[0].payload["tick"] == 4 @pytest.mark.asyncio async def test_coord_record_fields_complete(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="di-record-fields") uid = _new_id() exp = _now() + timedelta(seconds=120) rec = _record("dependency", record_id=uid, run_id="run-99", payload={"dep": "x"}, expires_at=exp) await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[rec])) resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest(kinds=["dependency"])) r = resp.records[0] assert r.kind == "dependency" assert r.record_id == uid assert r.run_id == "run-99" assert r.payload == {"dep": "x"} assert r.repo_id == repo.repo_id assert r.created_at is not None @pytest.mark.asyncio async def test_task_depends_on_preserved(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import list_tasks repo = await create_repo(db_session, slug="di-depends-on") dep_a = fake_id("dep-a") dep_b = fake_id("dep-b") rec_id = _new_id() task_id = fake_id(rec_id) payload = {"task_id": task_id, "queue": "default", "depends_on": [dep_a, dep_b]} await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload=payload)])) tasks = await list_tasks(db_session, repo.repo_id) assert tasks[0].depends_on == [dep_a, dep_b] @pytest.mark.asyncio async def test_release_marks_reservation_released(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import list_reservations from musehub.db import coord_models as _cm repo = await create_repo(db_session, slug="di-release") rec_id = _new_id() res_id = fake_id(rec_id) exp = _now() + timedelta(seconds=300) res_payload = { "reservation_id": res_id, "run_id": "agent-r", "addresses": ["c.py::Fn"], "ttl_s": 300, "expires_at": exp.isoformat(), } await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("reservation", record_id=rec_id, payload=res_payload, expires_at=exp)])) # Confirm reservation exists active = await list_reservations(db_session, repo.repo_id) assert len(active) == 1 # Push a release record rel_id = _new_id() rel_payload = {"reservation_id": res_id} await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("release", record_id=rel_id, payload=rel_payload)])) # Reservation should now be gone from active list active_after = await list_reservations(db_session, repo.repo_id) assert len(active_after) == 0 @pytest.mark.asyncio async def test_reservation_multi_address_stores_all_rows(self, db_session: AsyncSession) -> None: """A reservation covering N addresses must create N rows — one per address. This is a regression test for a bug where session.get(PK) on the second loop iteration found the first row and skipped the remaining addresses, leaving only the first address in the DB. """ from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import list_reservations, conflict_check from sqlalchemy import select from musehub.db import coord_models as _cm repo = await create_repo(db_session, slug="multi-addr-reservation") rec_id = _new_id() res_id = fake_id(rec_id) addresses = ["src/engine.py::AudioEngine", "src/mixer.py::Mixer", "src/output.py::Output"] exp = _now() + timedelta(seconds=300) res_payload = { "reservation_id": res_id, "run_id": "agent-multi", "addresses": addresses, "ttl_s": 300, "expires_at": exp.isoformat(), } await coord_push( db_session, repo.repo_id, CoordPushRequest(records=[_record("reservation", record_id=rec_id, payload=res_payload, expires_at=exp)]) ) # Fetch all rows for this reservation_id directly result = await db_session.execute( select(_cm.MusehubCoordReservation).where( _cm.MusehubCoordReservation.reservation_id == res_id ) ) rows = result.scalars().all() stored_addresses = {r.symbol_address for r in rows} assert stored_addresses == set(addresses), ( f"Expected all 3 addresses stored, got: {stored_addresses}" ) # All three addresses should show in conflict_check conflicts = await conflict_check(db_session, repo.repo_id, addresses) assert len(conflicts) == len(addresses), ( f"Expected {len(addresses)} conflicts, got {len(conflicts)}" ) # =========================================================================== # Layer 6 — Security tests # =========================================================================== class TestSecurity: @pytest.mark.asyncio async def test_push_requires_authentication( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-push-noauth", visibility="public") await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=_push_body(_record()), ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_push_403_for_non_owner_authenticated( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await create_repo(db_session, slug="sec-push-nonowner", owner_user_id=compute_identity_id(b"other-owner")) await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=_push_body(_record()), headers=auth_headers, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_private_repo_pull_returns_404_unauthenticated( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-priv-pull", visibility="private") await db_session.commit() resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/pull", json=_pull_body(), ) assert resp.status_code == 404 @pytest.mark.asyncio async def test_push_invalid_kind_rejected( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: from tests.conftest import _TEST_IDENTITY_ID repo = await create_repo(db_session, slug="sec-invalid-kind", owner_user_id=_TEST_IDENTITY_ID) await db_session.commit() bad_payload = { "records": [{ "kind": "INJECT_SQL", "record_id": secrets.token_hex(16), "run_id": "", "payload": {}, }] } resp = await client.post( f"/{repo.owner}/{repo.slug}/coord/push", json=bad_payload, headers=auth_headers, ) assert resp.status_code == 422 @pytest.mark.asyncio async def test_watch_invalid_kind_query_param_400( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-watch-kind", visibility="public") await db_session.commit() resp = await client.get( f"/{repo.owner}/{repo.slug}/coord/watch", params={"kinds": "evil_kind"}, ) assert resp.status_code == 400 @pytest.mark.asyncio async def test_complete_task_wrong_agent_rejected( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import claim_task, complete_task repo = await create_repo(db_session, slug="sec-complete-wrong-agent") rec_id = _new_id() task_id = fake_id(rec_id) payload = {"task_id": task_id, "queue": "default"} await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload=payload)])) await claim_task(db_session, repo.repo_id, task_id, "worker-A") result = await complete_task(db_session, repo.repo_id, task_id, "worker-B") # worker-B did not claim it — must return None assert result is None # =========================================================================== # Layer 7 — Performance tests # =========================================================================== class TestPerformance: @pytest.mark.asyncio async def test_push_100_records_under_500ms(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push repo = await create_repo(db_session, slug="perf-push-100") records = [_record("intent") for _ in range(100)] req = CoordPushRequest(records=records) t0 = time.perf_counter() resp = await coord_push(db_session, repo.repo_id, req) elapsed_ms = (time.perf_counter() - t0) * 1000 assert resp.inserted == 100 assert elapsed_ms < 500, f"push 100 records took {elapsed_ms:.1f}ms" @pytest.mark.asyncio async def test_pull_500_records_under_200ms(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push, coord_pull repo = await create_repo(db_session, slug="perf-pull-500") records = [_record("dependency") for _ in range(500)] await coord_push(db_session, repo.repo_id, CoordPushRequest(records=records)) t0 = time.perf_counter() resp = await coord_pull(db_session, repo.repo_id, CoordPollRequest(limit=1000)) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(resp.records) == 500 assert elapsed_ms < 200, f"pull 500 records took {elapsed_ms:.1f}ms" @pytest.mark.asyncio async def test_conflict_check_50_addresses_under_100ms( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_coord_server import conflict_check from musehub.db import coord_models as _cm repo = await create_repo(db_session, slug="perf-conflict") exp = _now() + timedelta(seconds=300) for i in range(50): db_session.add(_cm.MusehubCoordReservation( reservation_id=fake_id(f"perf-res-{i}"), repo_id=repo.repo_id, symbol_address=f"pkg/file_{i}.py::Fn{i}", agent_id="agent", ttl_s=300, created_at=_now(), expires_at=exp, )) await db_session.commit() addresses = [f"pkg/file_{i}.py::Fn{i}" for i in range(50)] t0 = time.perf_counter() conflicts = await conflict_check(db_session, repo.repo_id, addresses) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(conflicts) == 50 assert elapsed_ms < 100, f"conflict_check 50 addresses took {elapsed_ms:.1f}ms" @pytest.mark.asyncio async def test_task_queue_list_100_under_100ms(self, db_session: AsyncSession) -> None: from musehub.services.musehub_coord import coord_push from musehub.services.musehub_coord_server import list_tasks repo = await create_repo(db_session, slug="perf-tasklist-100") for _ in range(100): rec_id = _new_id() tid = fake_id(rec_id) await coord_push(db_session, repo.repo_id, CoordPushRequest(records=[_record("task", record_id=rec_id, payload={"task_id": tid, "queue": "perf"})])) t0 = time.perf_counter() tasks = await list_tasks(db_session, repo.repo_id, limit=100) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(tasks) == 100 assert elapsed_ms < 100, f"list_tasks 100 took {elapsed_ms:.1f}ms"