"""Section 15 — MCP Write Tools: 7-layer test suite. Covers ``musehub/mcp/write_tools/`` — issues, proposals, releases, repos. Write tools under test: execute_create_issue, execute_update_issue, execute_create_issue_comment execute_create_proposal, execute_merge_proposal, execute_submit_proposal_review, execute_create_proposal_comment execute_create_release execute_create_repo execute_create_label Bug fixed during intelligence gathering: - _proposal_data referenced the wrong variable name — NameError that would crash every call to execute_create_proposal / execute_merge_proposal. Seven layers: Layer 1 Unit: - MUSEHUB_WRITE_TOOL_NAMES contains all expected write tool names - execute_submit_proposal_review: invalid event → error_code=invalid_mode - execute_create_proposal: same from/to branch → error immediately - _issue_data serialises IssueResponse to correct dict keys Layer 2 Integration: - execute_create_issue: happy path, unknown repo, with labels - execute_update_issue: title change, close state, unknown repo - execute_create_issue_comment: happy path, unknown issue - execute_create_proposal: happy path, unknown repo, same-branch guard - execute_merge_proposal: happy path, unknown proposal - execute_submit_proposal_review: approve / request_changes - execute_create_release: happy path, duplicate tag error - execute_create_repo: happy path returns repo_id + slug - execute_create_label: happy path, duplicate name error Layer 3 E2E (HTTP tools/call): - Anonymous calls to every write tool → isError=True (auth gate) - Authenticated write (mocked _extract_auth): create_issue succeeds - Authenticated write (mocked _extract_auth): create_repo succeeds Layer 4 Stress: - 20 sequential issues → sequential numbers 1..20 Layer 5 Data Integrity: - create_issue entity retrievable via execute_list_issues - create_proposal entity retrievable via execute_list_proposals - create_release tag persisted - update_issue close: state persisted as "closed" Layer 6 Security: - All write tools in MUSEHUB_WRITE_TOOL_NAMES → auth gate in dispatcher - execute_create_proposal same branches → error - Unauthenticated HTTP call to write tool → isError=True Layer 7 Performance: - 10 sequential execute_create_issue under 500 ms - 5 sequential execute_create_repo under 1000 ms """ from __future__ import annotations import json import time import uuid from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from musehub.db import musehub_models as db from musehub.main import app from musehub.mcp.tools.musehub import MUSEHUB_WRITE_TOOL_NAMES from musehub.muse_contracts.json_types import JSONObject, StrDict from musehub.mcp.write_tools.issues import ( _issue_data, execute_create_issue, execute_create_issue_comment, execute_update_issue, ) from musehub.mcp.write_tools.proposals import ( execute_create_proposal, execute_create_proposal_comment, execute_merge_proposal, execute_submit_proposal_review, ) from musehub.mcp.write_tools.releases import execute_create_release from musehub.mcp.write_tools.repos import execute_create_repo from musehub.mcp.write_tools.issues import execute_create_label from musehub.services.musehub_mcp_executor import ( execute_list_issues, execute_list_proposals, ) # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def anyio_backend() -> str: return "asyncio" @pytest_asyncio.fixture async def http_client(db_session: AsyncSession) -> AsyncClient: async with AsyncClient( transport=ASGITransport(app=app), base_url="http://localhost", ) as c: yield c # ── Helpers ─────────────────────────────────────────────────────────────────── def _uid() -> str: return str(uuid.uuid4()) def _slug() -> str: return f"repo-{uuid.uuid4().hex[:8]}" async def _repo( session: AsyncSession, slug: str | None = None, visibility: str = "public", owner: str = "alice", ) -> db.MusehubRepo: name = slug or _slug() r = db.MusehubRepo( name=name, owner=owner, slug=name, visibility=visibility, owner_user_id="uid-alice", ) session.add(r) await session.flush() await session.refresh(r) return r async def _commit_and_branch( session: AsyncSession, repo_id: str, branch: str = "main", ) -> db.MusehubCommit: c = db.MusehubCommit( commit_id=uuid.uuid4().hex[:16], repo_id=repo_id, branch=branch, parent_ids=[], message="init", author="alice", timestamp=datetime.now(tz=timezone.utc), ) b = db.MusehubBranch( repo_id=repo_id, name=branch, head_commit_id=c.commit_id, ) session.add(c) session.add(b) await session.flush() return c def _tools_call(name: str, arguments: JSONObject) -> JSONObject: return { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": name, "arguments": arguments}, } def _unwrap_tool_text(text: str) -> str: """Strip wrapper tags added by the dispatcher.""" text = text.strip() if text.startswith(""): text = text[len(""):].strip() if text.endswith(""): text = text[: -len("")].strip() return text # ── Layer 1 — Unit ──────────────────────────────────────────────────────────── class TestUnitWriteToolCatalogue: def test_write_tool_names_non_empty(self) -> None: assert len(MUSEHUB_WRITE_TOOL_NAMES) > 0 def test_expected_tools_in_write_set(self) -> None: expected = { "musehub_create_issue", "musehub_create_repo", "musehub_create_proposal", "musehub_merge_proposal", "musehub_create_release", "musehub_create_label", } missing = expected - MUSEHUB_WRITE_TOOL_NAMES assert not missing, f"Tools missing from write set: {missing}" class TestUnitEarlyValidation: @pytest.mark.anyio async def test_submit_review_invalid_event(self, db_session: AsyncSession) -> None: result = await execute_submit_proposal_review( repo_id="any", proposal_id="any", event="lgtm", reviewer="alice" ) assert result.ok is False assert result.error_code == "invalid_args" assert "approve" in (result.error_message or "") @pytest.mark.anyio async def test_create_proposal_same_branches(self, db_session: AsyncSession) -> None: result = await execute_create_proposal( repo_id="any", title="T", from_branch="main", to_branch="main", actor="alice" ) assert result.ok is False assert "different" in (result.error_message or "").lower() class TestUnitIssueData: def test_issue_data_produces_correct_keys(self, db_session: AsyncSession) -> None: from musehub.models.musehub import IssueResponse issue = IssueResponse( issue_id="iid-1", number=7, title="Test issue", body="Body", state="open", labels=["bug"], author="alice", created_at=datetime.now(tz=timezone.utc), ) d = _issue_data(issue) for key in ("issue_id", "number", "title", "body", "state", "labels", "author"): assert key in d, f"Missing key: {key}" assert d["number"] == 7 assert d["labels"] == ["bug"] # ── Layer 2 — Integration ───────────────────────────────────────────────────── class TestIntegrationCreateIssue: @pytest.mark.anyio async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue( repo_id=r.repo_id, title="Bass too loud", body="Track 4, bar 12.", actor="alice" ) assert result.ok is True assert "issue_id" in result.data assert result.data["number"] >= 1 assert result.data["title"] == "Bass too loud" @pytest.mark.anyio async def test_unknown_repo(self, db_session: AsyncSession) -> None: result = await execute_create_issue( repo_id="ghost-repo", title="T", actor="alice" ) assert result.ok is False assert result.error_code == "repo_not_found" @pytest.mark.anyio async def test_with_labels(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue( repo_id=r.repo_id, title="Harmony conflict", labels=["harmony", "critical"], actor="alice", ) assert result.ok is True assert set(result.data["labels"]) == {"harmony", "critical"} class TestIntegrationUpdateIssue: @pytest.mark.anyio async def test_update_title(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Old title", actor="alice" ) num = created.data["number"] result = await execute_update_issue( repo_id=r.repo_id, issue_number=num, title="New title" ) assert result.ok is True assert result.data["title"] == "New title" @pytest.mark.anyio async def test_close_issue(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="To close", actor="alice" ) num = created.data["number"] result = await execute_update_issue( repo_id=r.repo_id, issue_number=num, state="closed" ) assert result.ok is True assert result.data["state"] == "closed" @pytest.mark.anyio async def test_unknown_repo(self, db_session: AsyncSession) -> None: result = await execute_update_issue( repo_id="ghost-repo", issue_number=1, title="x" ) assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationCreateIssueComment: @pytest.mark.anyio async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() issue = await execute_create_issue( repo_id=r.repo_id, title="I", actor="alice" ) num = issue.data["number"] result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=num, body="LGTM!", actor="bob" ) assert result.ok is True assert "comment_id" in result.data assert result.data["body"] == "LGTM!" @pytest.mark.anyio async def test_unknown_issue(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=999, body="x", actor="alice" ) assert result.ok is False assert result.error_code == "issue_not_found" class TestIntegrationCreateProposal: @pytest.mark.anyio async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-harmony") await db_session.commit() result = await execute_create_proposal( repo_id=r.repo_id, title="Add harmony layer", from_branch="feat-harmony", to_branch="main", body="Adds new harmonic dimension.", actor="alice", ) assert result.ok is True assert "proposal_id" in result.data assert result.data["state"] == "open" @pytest.mark.anyio async def test_unknown_repo(self, db_session: AsyncSession) -> None: result = await execute_create_proposal( repo_id="ghost-proposal", title="T", from_branch="feat", to_branch="main", actor="alice" ) assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationMergeProposal: @pytest.mark.anyio async def test_merge_open_proposal(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-bass") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Add bass line", from_branch="feat-bass", to_branch="main", actor="alice", ) assert proposal_result.ok is True proposal_id = proposal_result.data["proposal_id"] result = await execute_merge_proposal(repo_id=r.repo_id, proposal_id=proposal_id) assert result.ok is True assert result.data["state"] == "merged" @pytest.mark.anyio async def test_merge_unknown_proposal(self, db_session: AsyncSession) -> None: r = await _repo(db_session) result = await execute_merge_proposal(repo_id=r.repo_id, proposal_id="ghost-proposal-id") assert result.ok is False class TestIntegrationSubmitReview: @pytest.mark.anyio async def test_approve(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-review") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Review test", from_branch="feat-review", to_branch="main", actor="alice", ) proposal_id = proposal_result.data["proposal_id"] result = await execute_submit_proposal_review( repo_id=r.repo_id, proposal_id=proposal_id, event="approve", reviewer="bob" ) assert result.ok is True assert result.data["state"] == "approved" @pytest.mark.anyio async def test_request_changes(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-chg") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Changes test", from_branch="feat-chg", to_branch="main", actor="alice", ) proposal_id = proposal_result.data["proposal_id"] result = await execute_submit_proposal_review( repo_id=r.repo_id, proposal_id=proposal_id, event="request_changes", reviewer="carol" ) assert result.ok is True assert result.data["state"] == "changes_requested" class TestIntegrationCreateRelease: @pytest.mark.anyio async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v1.0.0", title="First Release", body="Initial stable release.", channel="stable", actor="alice", ) assert result.ok is True assert result.data["tag"] == "v1.0.0" assert result.data["channel"] == "stable" assert "release_id" in result.data @pytest.mark.anyio async def test_duplicate_tag_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_release(repo_id=r.repo_id, tag="v1.0.0", actor="alice") result = await execute_create_release(repo_id=r.repo_id, tag="v1.0.0", actor="alice") assert result.ok is False @pytest.mark.anyio async def test_beta_channel(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v2.0.0-beta.1", channel="beta", actor="alice" ) assert result.ok is True assert result.data["channel"] == "beta" class TestIntegrationCreateRepo: @pytest.mark.anyio async def test_happy_path(self, db_session: AsyncSession) -> None: result = await execute_create_repo( name="jazz-standards", owner="alice", owner_user_id="uid-alice", description="My jazz collection", visibility="public", ) assert result.ok is True assert "repo_id" in result.data assert result.data["owner"] == "alice" assert "jazz" in result.data["slug"] @pytest.mark.anyio async def test_private_visibility(self, db_session: AsyncSession) -> None: result = await execute_create_repo( name="private-session", owner="alice", owner_user_id="uid-alice", visibility="private", ) assert result.ok is True assert result.data["visibility"] == "private" class TestIntegrationCreateLabel: @pytest.mark.anyio async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_label( repo_id=r.repo_id, name="harmony", color="e11d48", actor="alice" ) assert result.ok is True assert "label_id" in result.data assert result.data["name"] == "harmony" assert result.data["color"] == "e11d48" @pytest.mark.anyio async def test_duplicate_name_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_label(repo_id=r.repo_id, name="dup-label", color="aabbcc") result = await execute_create_label( repo_id=r.repo_id, name="dup-label", color="112233" ) assert result.ok is False assert "already exists" in (result.error_message or "").lower() # ── Layer 3 — End-to-End ────────────────────────────────────────────────────── class TestE2EAuthGate: """Every write tool call without auth must return 401.""" @pytest.mark.anyio async def test_create_issue_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: r = await _repo(db_session) resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_issue", {"repo_id": r.repo_id, "title": "T"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 @pytest.mark.anyio async def test_create_repo_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_repo", {"name": "test-repo"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 @pytest.mark.anyio async def test_create_proposal_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_proposal", { "repo_id": "x", "title": "T", "from_branch": "a", "to_branch": "b" }), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 @pytest.mark.anyio async def test_merge_proposal_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_merge_proposal", {"repo_id": "x", "proposal_id": "y"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 @pytest.mark.anyio async def test_create_release_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_release", {"repo_id": "x", "tag": "v1.0", "title": "R"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 class TestE2EAuthenticatedWrite: """Write tools succeed when auth is present.""" @pytest.mark.anyio async def test_create_issue_with_auth( self, http_client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: r = await _repo(db_session) await db_session.commit() resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_issue", { "repo_id": r.repo_id, "title": "Auth issue" }), headers=auth_headers, ) assert resp.status_code == 200 result = resp.json()["result"] assert result["isError"] is False payload = json.loads(_unwrap_tool_text(result["content"][0]["text"])) assert payload["title"] == "Auth issue" @pytest.mark.anyio async def test_create_repo_with_auth( self, http_client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_repo", { "name": "e2e-authed-repo", "owner": "testuser", }), headers=auth_headers, ) assert resp.status_code == 200 result = resp.json()["result"] assert result["isError"] is False payload = json.loads(_unwrap_tool_text(result["content"][0]["text"])) assert "repo_id" in payload # ── Layer 4 — Stress ────────────────────────────────────────────────────────── class TestStressWriteTools: @pytest.mark.anyio async def test_20_sequential_issues_have_sequential_numbers( self, db_session: AsyncSession ) -> None: r = await _repo(db_session) await db_session.commit() numbers = [] for i in range(20): result = await execute_create_issue( repo_id=r.repo_id, title=f"Issue {i}", actor="alice" ) assert result.ok is True numbers.append(result.data["number"]) assert numbers == list(range(1, 21)) # ── Layer 5 — Data Integrity ────────────────────────────────────────────────── class TestDataIntegrityIssue: @pytest.mark.anyio async def test_created_issue_retrievable(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_issue( repo_id=r.repo_id, title="Retrievable issue", actor="alice" ) issues_result = await execute_list_issues(r.repo_id, state="open") assert issues_result.ok is True titles = [i["title"] for i in issues_result.data["issues"]] assert "Retrievable issue" in titles @pytest.mark.anyio async def test_closed_issue_state_persisted(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Close me", actor="alice" ) num = created.data["number"] await execute_update_issue(repo_id=r.repo_id, issue_number=num, state="closed") issues_result = await execute_list_issues(r.repo_id, state="closed") assert issues_result.ok is True numbers = [i["number"] for i in issues_result.data["issues"]] assert num in numbers class TestDataIntegrityProposal: @pytest.mark.anyio async def test_created_proposal_retrievable(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-di") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="DI proposal", from_branch="feat-di", to_branch="main", actor="alice", ) assert proposal_result.ok is True list_result = await execute_list_proposals(r.repo_id, state="open") assert list_result.ok is True titles = [p["title"] for p in list_result.data["proposals"]] assert "DI proposal" in titles class TestDataIntegrityRelease: @pytest.mark.anyio async def test_release_tag_persisted(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v3.7.2", title="Minor release", actor="alice" ) assert result.ok is True assert result.data["tag"] == "v3.7.2" @pytest.mark.anyio async def test_release_author_persisted(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v0.1.0", actor="carol" ) assert result.ok is True assert result.data["author"] == "carol" # ── Layer 6 — Security ──────────────────────────────────────────────────────── class TestSecurityWriteTools: def test_all_write_tools_in_auth_gate_set(self) -> None: """All MUSEHUB_WRITE_TOOLS must be in MUSEHUB_WRITE_TOOL_NAMES.""" from musehub.mcp.tools.musehub import MUSEHUB_WRITE_TOOLS write_tool_names_from_list = {t["name"] for t in MUSEHUB_WRITE_TOOLS} # All write tools in the list should be in the gate set. uncovered = write_tool_names_from_list - MUSEHUB_WRITE_TOOL_NAMES assert not uncovered, f"Write tools not in auth gate: {uncovered}" @pytest.mark.anyio async def test_create_proposal_same_branch_guard(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_proposal( repo_id=r.repo_id, title="T", from_branch="main", to_branch="main", actor="alice" ) assert result.ok is False @pytest.mark.anyio async def test_duplicate_label_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_label(repo_id=r.repo_id, name="sec-label", color="aabbcc") result = await execute_create_label( repo_id=r.repo_id, name="sec-label", color="ddeeff" ) assert result.ok is False @pytest.mark.anyio async def test_create_issue_comment_on_wrong_issue_safe( self, db_session: AsyncSession ) -> None: """create_issue_comment on non-existent issue must return not_found, not crash.""" r = await _repo(db_session) await db_session.commit() result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=9999, body="hack", actor="eve" ) assert result.ok is False assert result.error_code == "issue_not_found" # ── Layer 7 — Performance ───────────────────────────────────────────────────── class TestPerformanceWriteTools: @pytest.mark.anyio async def test_10_sequential_issues_under_500ms( self, db_session: AsyncSession ) -> None: r = await _repo(db_session) await db_session.commit() start = time.perf_counter() for i in range(10): result = await execute_create_issue( repo_id=r.repo_id, title=f"Perf issue {i}", actor="alice" ) assert result.ok is True elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 500, f"10 issues took {elapsed_ms:.1f} ms" @pytest.mark.anyio async def test_5_sequential_repos_under_1000ms( self, db_session: AsyncSession ) -> None: start = time.perf_counter() for i in range(5): result = await execute_create_repo( name=f"perf-repo-{i}", owner="alice", owner_user_id="uid-alice", initialize=False, ) assert result.ok is True elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 1000, f"5 repos took {elapsed_ms:.1f} ms"