"""Full MCP smoke test — exercises every tool in MUSEHUB_TOOLS. This is an integration smoke test, not a unit test. It: 1. Initializes a real MCP session through the ASGI transport 2. Calls every registered tool with minimal valid arguments 3. Asserts no tool returns an RPC-level error or an unexpected 5xx 4. Reports isError=true results as failures (tool logic broken) Run with: python -m pytest tests/test_mcp_smoke.py -v --tb=short """ from __future__ import annotations import json import re import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubRepo from tests.factories import create_issue, create_proposal, create_repo from musehub.types.json_types import JSONObject, StrDict type _McpCtx = tuple[AsyncClient, str, StrDict, MusehubRepo] # ── helpers ────────────────────────────────────────────────────────────────── def _text(result: JSONObject) -> str: """Extract text content from a tools/call result.""" content = result.get("result", {}).get("content", []) return " ".join(c.get("text", "") for c in content if c.get("type") == "text") def _is_error(result: JSONObject) -> bool: return result.get("result", {}).get("isError", False) def _rpc_error(result: JSONObject) -> str | None: if "error" in result: return result["error"].get("message", "unknown RPC error") return None async def _init_session(client: AsyncClient, auth_headers: StrDict) -> str: r = await client.post( "/mcp", json={ "jsonrpc": "2.0", "id": 0, "method": "initialize", "params": { "protocolVersion": "2025-11-25", "capabilities": {}, "clientInfo": {"name": "smoke-test", "version": "1.0"}, }, }, headers=auth_headers, ) assert r.status_code == 200, f"MCP initialize failed: {r.text[:200]}" return r.headers["mcp-session-id"] async def call( client: AsyncClient, sid: str, auth_headers: StrDict, name: str, arguments: JSONObject, rpc_id: int = 1, ) -> JSONObject: r = await client.post( "/mcp", json={ "jsonrpc": "2.0", "id": rpc_id, "method": "tools/call", "params": {"name": name, "arguments": arguments}, }, headers={**auth_headers, "Mcp-Session-Id": sid}, ) assert r.status_code in (200, 202), f"{name} HTTP {r.status_code}: {r.text[:200]}" if r.status_code == 202: return {"result": {"content": [{"type": "text", "text": "(202 accepted)"}]}} return r.json() # ── fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture async def mcp_ctx(client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, test_user: MusehubIdentity) -> _McpCtx: """Provides (client, session_id, auth_headers, repo) ready for tool calls. The repo has a 'main' branch seeded so proposal tools (which validate branch existence) work without extra setup in each test. """ from tests.factories import create_branch repo = await create_repo(db_session, owner=test_user.handle, visibility="public") await create_branch(db_session, repo_id=str(repo.repo_id), name="main") await create_branch(db_session, repo_id=str(repo.repo_id), name="feature/smoke") sid = await _init_session(client, auth_headers) return client, sid, auth_headers, repo # ── READ TOOLS ──────────────────────────────────────────────────────────────── async def test_mcp_whoami(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_whoami", {}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_search_repos(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_search_repos", {"query": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_get_context(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_context", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_list_branches(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_branches", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_list_commits(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_commits", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_list_issues(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_issues", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_read_issue(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx issue = await create_issue(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_read_issue", {"owner": repo.owner, "slug": repo.slug, "issue_number": issue.number}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_list_proposals(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_proposals", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_read_proposal(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx proposal = await create_proposal(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_read_proposal", {"owner": repo.owner, "slug": repo.slug, "proposal_id": proposal.proposal_id}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_proposal_risk(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx proposal = await create_proposal(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_read_proposal_risk", {"owner": repo.owner, "slug": repo.slug, "proposal_id": proposal.proposal_id}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_proposal_symbol_diff(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx proposal = await create_proposal(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_read_proposal_diff", {"owner": repo.owner, "slug": repo.slug, "proposal_id": proposal.proposal_id}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_proposal_breakage(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx proposal = await create_proposal(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_read_proposal_breakage", {"owner": repo.owner, "slug": repo.slug, "proposal_id": proposal.proposal_id}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_list_releases(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_releases", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_list_domains(mcp_ctx: _McpCtx) -> None: """musehub_list_domains accepts cursor (not offset) for pagination.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_domains", {}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_list_symbols(mcp_ctx: _McpCtx) -> None: """musehub_list_symbols returns next_cursor (not total with offset) for pagination.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_symbols", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) # Response data must include next_cursor (may be null on last page). text = _text(r) assert "next_cursor" in text, f"musehub_list_symbols must include next_cursor in response; got: {text[:200]}" async def test_mcp_list_domains_schema_has_cursor_not_offset(mcp_ctx: _McpCtx) -> None: """musehub_list_domains tool schema must use cursor, not offset.""" c, sid, hdrs, _ = mcp_ctx r = await c.post( "/mcp", json={ "jsonrpc": "2.0", "id": 99, "method": "tools/list", "params": {}, }, headers={**hdrs, "Mcp-Session-Id": await _init_session(c, hdrs)}, ) assert r.status_code == 200 tools = r.json().get("result", {}).get("tools", []) list_domains = next((t for t in tools if t["name"] == "musehub_list_domains"), None) assert list_domains is not None, "musehub_list_domains not found in tools/list" props = list_domains.get("inputSchema", {}).get("properties", {}) assert "offset" not in props, "musehub_list_domains schema must not expose offset" list_symbols = next((t for t in tools if t["name"] == "musehub_list_symbols"), None) assert list_symbols is not None, "musehub_list_symbols not found in tools/list" sym_props = list_symbols.get("inputSchema", {}).get("properties", {}) assert "offset" not in sym_props, "musehub_list_symbols schema must not expose offset" async def test_mcp_intel_index_status(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_intel_index_status", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_intel_health_score(mcp_ctx: _McpCtx) -> None: """Health score requires a built symbol index — not_ready is the expected result for a new repo.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_intel_health_score", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) # A repo with no commits will report not_ready — that's correct behaviour, not a crash text = _text(r) assert "not_ready" in text or "health" in text.lower() or not _is_error(r), ( f"Unexpected error from health_score: {text[:200]}" ) async def test_mcp_intel_hotspots(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_intel_hotspots", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_intel_dead(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_intel_dead", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_intel_blast_risk(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_intel_blast_risk", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_coord_swarm(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_coord_swarm", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_coord_reservations(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_coord_reservations", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_coord_tasks(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_list_coord_tasks", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_coord_check_conflicts(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_coord_conflicts", { "owner": repo.owner, "slug": repo.slug, "symbols": ["main.py::MyClass"] }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_get_prompt(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_prompt", {"name": "musehub/orientation"}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_muse_remote(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "muse_remote", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_muse_config(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "muse_config", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_workspace_intel(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_workspace_intel", {"owner": repo.owner}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_cross_repo_impact(mcp_ctx: _McpCtx) -> None: """cross_repo_impact requires a built symbol index — not_found is expected for a new repo.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_cross_repo_impact", { "owner": repo.owner, "slug": repo.slug, "address": "main.py::App", }) assert _rpc_error(r) is None, _rpc_error(r) # A repo with no commits has no symbol index — not_found is correct, not a crash text = _text(r) assert "not_found" in text or "not_ready" in text or not _is_error(r), ( f"Unexpected error: {text[:200]}" ) async def test_mcp_muse_pull(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "muse_pull", { "owner": repo.owner, "slug": repo.slug, "branch": "main" }) assert _rpc_error(r) is None, _rpc_error(r) # muse_pull may return isError=true if branch has no commits — that's ok # as long as there's no crash # ── WRITE TOOLS ─────────────────────────────────────────────────────────────── async def test_mcp_create_repo(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, _repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_create_repo", { "name": "smoke-new-repo", "description": "mcp smoke test", "visibility": "public" }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_create_issue(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_create_issue", { "owner": repo.owner, "slug": repo.slug, "title": "Smoke test issue", "body": "created by mcp smoke test", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) # Number is in the response assert re.search(r'"number"', _text(r)) or "number" in _text(r).lower() or _text(r) async def test_mcp_update_issue(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx issue = await create_issue(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_update_issue", { "owner": repo.owner, "slug": repo.slug, "issue_number": issue.number, "state": "closed", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_create_issue_comment(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx issue = await create_issue(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_create_issue_comment", { "owner": repo.owner, "slug": repo.slug, "issue_number": issue.number, "body": "smoke test comment", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_create_proposal(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_create_proposal", { "owner": repo.owner, "slug": repo.slug, "title": "Smoke proposal", "body": "mcp smoke test", "from_branch": "feature/smoke", "to_branch": "main", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_create_proposal_comment(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx proposal = await create_proposal(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_create_proposal_comment", { "owner": repo.owner, "slug": repo.slug, "proposal_id": proposal.proposal_id, "body": "smoke proposal comment", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_submit_proposal_review(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: c, sid, hdrs, repo = mcp_ctx proposal = await create_proposal(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_create_proposal_review", { "owner": repo.owner, "slug": repo.slug, "proposal_id": proposal.proposal_id, "verdict": "approve", "body": "lgtm", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_create_label(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_create_label", { "owner": repo.owner, "slug": repo.slug, "name": "smoke-label", "color": "#ff5500", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_create_release(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_create_release", { "owner": repo.owner, "slug": repo.slug, "tag": "v0.1.0-smoke", "name": "Smoke Release", "body": "mcp smoke", }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) async def test_mcp_coord_claim_task(mcp_ctx: _McpCtx) -> None: c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_claim_coord_task", { "owner": repo.owner, "slug": repo.slug, "queue": "tasks", "agent_id": "smoke-agent-1", }) assert _rpc_error(r) is None, _rpc_error(r) # No tasks in queue — ok=False is expected here assert r.get("result") is not None async def test_mcp_merge_proposal_no_commits(mcp_ctx: _McpCtx, db_session: AsyncSession) -> None: """merge_proposal on a proposal with no commits should fail gracefully (not 500).""" c, sid, hdrs, repo = mcp_ctx proposal = await create_proposal(db_session, repo_id=str(repo.repo_id), author=repo.owner) r = await call(c, sid, hdrs, "musehub_merge_proposal", { "owner": repo.owner, "slug": repo.slug, "proposal_id": proposal.proposal_id, }) assert _rpc_error(r) is None, _rpc_error(r) # Merge with no commits → isError=True is acceptable; crash is not async def test_mcp_publish_domain_no_manifest(mcp_ctx: _McpCtx) -> None: """publish_domain with missing manifest should fail gracefully.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_publish_domain", { "owner": repo.owner, "slug": repo.slug, "domain_name": "smoke-domain", "manifest": {"name": "smoke", "version": "0.1.0"}, }) assert _rpc_error(r) is None, _rpc_error(r) # ── MULTI-STEP: read-back after write ──────────────────────────────────────── async def test_mcp_issue_roundtrip(mcp_ctx: _McpCtx) -> None: """Create an issue via MCP, then read it back.""" c, sid, hdrs, repo = mcp_ctx # Create cr = await call(c, sid, hdrs, "musehub_create_issue", { "owner": repo.owner, "slug": repo.slug, "title": "Roundtrip issue", "body": "created and read back", }) assert not _is_error(cr), _text(cr) m = re.search(r'"number":\s*(\d+)', _text(cr)) assert m, f"No issue number in response: {_text(cr)[:300]}" number = int(m.group(1)) # Read back gr = await call(c, sid, hdrs, "musehub_read_issue", { "owner": repo.owner, "slug": repo.slug, "issue_number": number, }) assert not _is_error(gr), _text(gr) assert "Roundtrip issue" in _text(gr), f"Issue title missing: {_text(gr)[:300]}" async def test_mcp_label_then_list(mcp_ctx: _McpCtx) -> None: """Create a label, then verify list_issues still works (no crash on label join).""" c, sid, hdrs, repo = mcp_ctx await call(c, sid, hdrs, "musehub_create_label", { "owner": repo.owner, "slug": repo.slug, "name": "bug", "color": "#d73a4a", }) r = await call(c, sid, hdrs, "musehub_list_issues", {"owner": repo.owner, "slug": repo.slug}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) # ── SECURITY: write re-verification ────────────────────────────────────────── async def test_mcp_write_without_msign_rejected( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, test_user: MusehubIdentity, ) -> None: """Write tools must be rejected with 401 when no MSign is present on the request. Simulates a stolen session ID: the session was created with auth (initialize carries MSign), but subsequent write calls arrive with no Authorization header. Read tools on the same unauthenticated session must still succeed. The auth_headers fixture sets a global DI override that bypasses real MSign verification. We temporarily lift it for the anon calls so the real optional_signed_request runs (returning None for headerless requests). """ from musehub.main import app from musehub.auth.request_signing import optional_signed_request, require_signed_request repo = await create_repo(db_session, owner=test_user.handle, visibility="public") # Initialize with auth (normal flow — DI override active). sid = await _init_session(client, auth_headers) # Headers that carry the session ID but NO Authorization header. anon_headers = {"Mcp-Session-Id": sid, "MCP-Protocol-Version": "2025-11-25"} # Lift the DI overrides so real optional_signed_request runs on the next calls. saved_opt = app.dependency_overrides.pop(optional_signed_request, None) saved_req = app.dependency_overrides.pop(require_signed_request, None) try: # Read tool — must succeed anonymously (no MSign needed for reads). r_read = await client.post( "/mcp", json={ "jsonrpc": "2.0", "id": 10, "method": "tools/call", "params": {"name": "musehub_search_repos", "arguments": {"query": "test"}}, }, headers=anon_headers, ) assert r_read.status_code == 200, f"Read tool should pass: {r_read.text[:200]}" read_body = r_read.json() assert "error" not in read_body or read_body.get("error") is None, ( f"Read tool returned RPC error: {read_body}" ) # Write tool — must be rejected with 401. r_write = await client.post( "/mcp", json={ "jsonrpc": "2.0", "id": 11, "method": "tools/call", "params": { "name": "musehub_create_issue", "arguments": { "owner": repo.owner, "slug": repo.slug, "title": "Should be blocked", "body": "", }, }, }, headers=anon_headers, ) assert r_write.status_code == 401, ( f"Write tool without MSign should return 401, got {r_write.status_code}: {r_write.text[:200]}" ) finally: if saved_opt is not None: app.dependency_overrides[optional_signed_request] = saved_opt if saved_req is not None: app.dependency_overrides[require_signed_request] = saved_req # ── SESSION CONTEXT ─────────────────────────────────────────────────────────── async def test_mcp_set_context(mcp_ctx: _McpCtx) -> None: """musehub_set_context stores session focus and returns confirmation.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_set_context", { "owner": repo.owner, "slug": repo.slug, }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) assert repo.slug in _text(r), f"slug missing from confirmation: {_text(r)[:300]}" async def test_mcp_context_inheritance(mcp_ctx: _McpCtx) -> None: """After set_context, tool calls with no owner/slug use session focus.""" c, sid, hdrs, repo = mcp_ctx # Set session focus. sr = await call(c, sid, hdrs, "musehub_set_context", { "owner": repo.owner, "slug": repo.slug, }) assert not _is_error(sr), _text(sr) # list_branches with NO owner/slug — should resolve via session focus. r = await call(c, sid, hdrs, "musehub_list_branches", {}) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), f"list_branches without args failed: {_text(r)[:300]}" # list_issues with NO owner/slug. ri = await call(c, sid, hdrs, "musehub_list_issues", {}) assert _rpc_error(ri) is None, _rpc_error(ri) assert not _is_error(ri), f"list_issues without args failed: {_text(ri)[:300]}" # ── COORD TOOLS ─────────────────────────────────────────────────────────────── async def test_mcp_coord_reserve_and_release(mcp_ctx: _McpCtx) -> None: """Reserve symbols, then release them.""" c, sid, hdrs, repo = mcp_ctx # Reserve rr = await call(c, sid, hdrs, "musehub_create_coord_reservation", { "owner": repo.owner, "slug": repo.slug, "addresses": ["src/engine.py::AudioEngine"], "agent_id": "smoke-agent-1", "ttl_s": 60, }) assert _rpc_error(rr) is None, _rpc_error(rr) assert not _is_error(rr), _text(rr) assert "reservation_id" in _text(rr), f"No reservation_id: {_text(rr)[:300]}" import re match = re.search(r'"reservation_id":\s*"([^"]+)"', _text(rr)) assert match, f"Could not parse reservation_id: {_text(rr)[:300]}" reservation_id = match.group(1) # Release rl = await call(c, sid, hdrs, "musehub_delete_coord_reservation", { "owner": repo.owner, "slug": repo.slug, "reservation_id": reservation_id, "agent_id": "smoke-agent-1", }) assert _rpc_error(rl) is None, _rpc_error(rl) assert not _is_error(rl), _text(rl) async def test_mcp_coord_enqueue_and_claim(mcp_ctx: _McpCtx) -> None: """Enqueue a task, then claim it.""" c, sid, hdrs, repo = mcp_ctx # Enqueue eq = await call(c, sid, hdrs, "musehub_enqueue_coord_task", { "owner": repo.owner, "slug": repo.slug, "queue": "smoke-queue", "payload": {"action": "analyse", "target": "main.py"}, "agent_id": "orchestrator", "priority": 75, }) assert _rpc_error(eq) is None, _rpc_error(eq) assert not _is_error(eq), _text(eq) import re match = re.search(r'"task_id":\s*"([^"]+)"', _text(eq)) assert match, f"No task_id in response: {_text(eq)[:300]}" task_id = match.group(1) # Claim cl = await call(c, sid, hdrs, "musehub_claim_coord_task", { "owner": repo.owner, "slug": repo.slug, "task_id": task_id, "agent_id": "worker-1", }) assert _rpc_error(cl) is None, _rpc_error(cl) assert not _is_error(cl), _text(cl) async def test_mcp_coord_check_conflicts_clear(mcp_ctx: _McpCtx) -> None: """check_conflicts returns no conflicts for symbols with no reservations.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_read_coord_conflicts", { "owner": repo.owner, "slug": repo.slug, "addresses": ["src/free_symbol.py::FreeClass"], }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), _text(r) assert '"has_conflicts": false' in _text(r) or "has_conflicts" in _text(r) # ── AGENT-TO-AGENT SIGNALING ────────────────────────────────────────────────── async def test_mcp_agent_notify_no_target_session(mcp_ctx: _McpCtx) -> None: """notify returns not_ready when target has no active sessions.""" c, sid, hdrs, repo = mcp_ctx r = await call(c, sid, hdrs, "musehub_agent_notify", { "target_handle": "ghost-agent-that-does-not-exist", "event": "ping", "payload": {"msg": "hello"}, }) assert _rpc_error(r) is None, _rpc_error(r) # Target has no sessions — isError=True with not_ready is correct assert _is_error(r), f"Expected isError=True for unknown target: {_text(r)[:300]}" assert "not_ready" in _text(r), f"Expected not_ready: {_text(r)[:300]}" async def test_mcp_agent_broadcast_no_focus(mcp_ctx: _McpCtx) -> None: """broadcast without set_context returns missing_args.""" c, sid, hdrs, repo = mcp_ctx # Fresh session with no repo focus new_sid = await _init_session(c, hdrs) anon_hdrs = {**hdrs, "Mcp-Session-Id": new_sid, "MCP-Protocol-Version": "2025-11-25"} # Remove Mcp-Session-Id from hdrs and use new_sid call_hdrs = {k: v for k, v in hdrs.items() if k != "Mcp-Session-Id"} call_hdrs["Mcp-Session-Id"] = new_sid r = await call(c, new_sid, call_hdrs, "musehub_agent_broadcast", { "event": "ping", "payload": {}, }) assert _rpc_error(r) is None, _rpc_error(r) assert _is_error(r), f"Expected isError=True without focus: {_text(r)[:300]}" assert "missing_args" in _text(r), f"Expected missing_args: {_text(r)[:300]}" async def test_mcp_agent_broadcast_with_focus_no_peers(mcp_ctx: _McpCtx) -> None: """broadcast with set_context succeeds with 0 peers (no error, sessions_reached=0).""" c, sid, hdrs, repo = mcp_ctx # Set context first sc = await call(c, sid, hdrs, "musehub_set_context", { "owner": repo.owner, "slug": repo.slug, }) assert not _is_error(sc), _text(sc) r = await call(c, sid, hdrs, "musehub_agent_broadcast", { "event": "phase_complete", "payload": {"phase": 1, "result": "ok"}, }) assert _rpc_error(r) is None, _rpc_error(r) assert not _is_error(r), f"broadcast should succeed even with 0 peers: {_text(r)[:300]}" assert "sessions_reached" in _text(r), f"Missing sessions_reached: {_text(r)[:300]}"