"""Section 15 — MCP Write Tools: 7-layer test suite. Covers ``musehub/mcp/write_tools/`` — issues, proposals, releases, repos, labels. Write tools under test: execute_create_issue, execute_update_issue, execute_create_issue_comment execute_create_proposal, execute_merge_proposal, execute_create_proposal_review, execute_create_proposal_comment execute_create_release execute_create_repo execute_create_label, execute_update_label, execute_delete_label Bug fixed during intelligence gathering: - _proposal_data referenced the wrong variable name — NameError that would crash every call to execute_create_proposal / execute_merge_proposal. Seven layers: Layer 1 Unit: - MUSEHUB_WRITE_TOOL_NAMES contains all expected write tool names - execute_create_proposal_review: invalid event → error_code=invalid_mode - execute_create_proposal: same from/to branch → error immediately - _issue_data serialises IssueResponse to correct dict keys Layer 2 Integration: - execute_create_issue: happy path, unknown repo, with labels - execute_update_issue: title change, close state, unknown repo - execute_create_issue_comment: happy path, unknown issue - execute_create_proposal: happy path, unknown repo, same-branch guard - execute_merge_proposal: happy path, unknown proposal - execute_create_proposal_review: approve / request_changes - execute_create_release: happy path, duplicate tag error - execute_create_repo: happy path returns repo_id + slug - execute_create_label: happy path, duplicate name error Layer 3 E2E (HTTP tools/call): - Anonymous calls to every write tool → isError=True (auth gate) - Authenticated write (mocked _extract_auth): create_issue succeeds - Authenticated write (mocked _extract_auth): create_repo succeeds Layer 4 Stress: - 20 sequential issues → sequential numbers 1..20 Layer 5 Data Integrity: - create_issue entity retrievable via execute_list_issues - create_proposal entity retrievable via execute_list_proposals - create_release tag persisted - update_issue close: state persisted as "closed" Layer 6 Security: - All write tools in MUSEHUB_WRITE_TOOL_NAMES → auth gate in dispatcher - execute_create_proposal same branches → error - Unauthenticated HTTP call to write tool → isError=True Layer 7 Performance: - 10 sequential execute_create_issue under 500 ms - 5 sequential execute_create_repo under 1000 ms """ from __future__ import annotations import json import secrets import time from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id from musehub.core.genesis import compute_branch_id, compute_identity_id, compute_issue_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.main import app from musehub.mcp.tools.musehub import MUSEHUB_WRITE_TOOL_NAMES from musehub.types.json_types import JSONObject, StrDict from musehub.mcp.write_tools.issues import ( _issue_data, execute_close_issue, execute_create_issue, execute_create_issue_comment, execute_reopen_issue, execute_assign_issue, execute_update_issue_labels, execute_remove_issue_label, execute_update_issue, ) from musehub.mcp.write_tools.proposals import ( execute_create_proposal, execute_create_proposal_comment, execute_merge_proposal, execute_create_proposal_review, execute_list_proposal_comments, execute_request_proposal_reviewers, execute_remove_proposal_reviewer, execute_list_proposal_reviews, ) from musehub.mcp.write_tools.releases import ( execute_create_release, execute_attach_release_asset, execute_delete_release_asset, ) from musehub.mcp.write_tools.repos import ( execute_create_repo, execute_delete_repo, execute_update_repo, execute_transfer_repo_ownership, ) from musehub.mcp.write_tools.labels import ( execute_create_label, execute_delete_label, execute_update_label, ) from musehub.mcp.write_tools.collaborators import ( execute_invite_collaborator, execute_list_collaborators, execute_remove_collaborator, execute_update_collaborator_permission, ) from musehub.mcp.write_tools.webhooks import ( execute_create_webhook, execute_delete_webhook, execute_list_webhooks, ) from musehub.mcp.write_tools.issues import execute_delete_issue_comment from musehub.services.musehub_mcp_executor import ( execute_list_issues, execute_list_labels, execute_list_proposals, ) # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def anyio_backend() -> str: return "asyncio" @pytest_asyncio.fixture async def http_client(db_session: AsyncSession) -> AsyncClient: async with AsyncClient( transport=ASGITransport(app=app), base_url="http://localhost", ) as c: yield c # ── Helpers ─────────────────────────────────────────────────────────────────── def _uid() -> str: return secrets.token_hex(16) def _slug() -> str: return f"repo-{secrets.token_hex(4)}" async def _repo( session: AsyncSession, slug: str | None = None, visibility: str = "public", owner: str = "alice", ) -> MusehubRepo: name = slug or _slug() created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(owner.encode()) r = MusehubRepo( repo_id=compute_repo_id(owner_id, name, "code", created_at.isoformat()), name=name, owner=owner, slug=name, visibility=visibility, owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) session.add(r) await session.flush() await session.refresh(r) return r async def _commit_and_branch( session: AsyncSession, repo_id: str, branch: str = "main", ) -> MusehubCommit: c = MusehubCommit( commit_id=fake_id(f"{repo_id}{branch}{secrets.token_hex(4)}"), branch=branch, parent_ids=[], message="init", author="alice", timestamp=datetime.now(tz=timezone.utc), ) b = MusehubBranch( branch_id=compute_branch_id(repo_id, branch), repo_id=repo_id, name=branch, head_commit_id=c.commit_id, ) session.add(c) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=c.commit_id)) session.add(b) await session.flush() return c def _tools_call(name: str, arguments: JSONObject) -> JSONObject: return { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": name, "arguments": arguments}, } def _unwrap_tool_text(text: str) -> str: """Strip wrapper tags added by the dispatcher.""" text = text.strip() if text.startswith(""): text = text[len(""):].strip() if text.endswith(""): text = text[: -len("")].strip() return text # ── Layer 1 — Unit ──────────────────────────────────────────────────────────── class TestUnitWriteToolCatalogue: def test_write_tool_names_non_empty(self) -> None: assert len(MUSEHUB_WRITE_TOOL_NAMES) > 0 def test_expected_tools_in_write_set(self) -> None: expected = { "musehub_create_issue", "musehub_create_repo", "musehub_create_proposal", "musehub_merge_proposal", "musehub_create_release", "musehub_create_label", "musehub_update_label", "musehub_delete_label", } missing = expected - MUSEHUB_WRITE_TOOL_NAMES assert not missing, f"Tools missing from write set: {missing}" class TestUnitEarlyValidation: async def test_submit_review_invalid_event(self, db_session: AsyncSession) -> None: result = await execute_create_proposal_review( repo_id="any", proposal_id="any", verdict="lgtm", reviewer="alice" ) assert result.ok is False assert result.error_code == "invalid_args" assert "approve" in (result.error_message or "") async def test_create_proposal_same_branches(self, db_session: AsyncSession) -> None: result = await execute_create_proposal( repo_id="any", title="T", from_branch="main", to_branch="main", actor="alice" ) assert result.ok is False assert "different" in (result.error_message or "").lower() class TestUnitIssueData: def test_issue_data_produces_correct_keys(self, db_session: AsyncSession) -> None: from musehub.models.musehub import IssueResponse _now = datetime.now(tz=timezone.utc) issue = IssueResponse( issue_id=compute_issue_id(fake_id("repo"), compute_identity_id(b"alice"), _now.isoformat()), number=7, title="Test issue", body="Body", state="open", labels=["bug"], author="alice", created_at=_now, ) d = _issue_data(issue) for key in ("issue_id", "number", "title", "body", "state", "labels", "author"): assert key in d, f"Missing key: {key}" assert d["number"] == 7 assert d["labels"] == ["bug"] # ── Layer 2 — Integration ───────────────────────────────────────────────────── class TestIntegrationCreateIssue: async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue( repo_id=r.repo_id, title="Bass too loud", body="Track 4, bar 12.", actor="alice" ) assert result.ok is True assert "issue_id" in result.data assert result.data["number"] >= 1 assert result.data["title"] == "Bass too loud" async def test_any_user_can_create_issue_on_public_repo(self, db_session: AsyncSession) -> None: r = await _repo(db_session, visibility="public") await db_session.commit() result = await execute_create_issue( repo_id=r.repo_id, title="Public comment", actor="carol" ) assert result.ok is True async def test_forbidden_on_private_repo_for_non_owner(self, db_session: AsyncSession) -> None: r = await _repo(db_session, visibility="private") await db_session.commit() result = await execute_create_issue( repo_id=r.repo_id, title="Private issue", actor="carol" ) assert result.ok is False assert result.error_code == "forbidden" async def test_forbidden_without_auth(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue( repo_id=r.repo_id, title="T", actor="" ) assert result.ok is False assert result.error_code == "forbidden" async def test_unknown_repo(self, db_session: AsyncSession) -> None: result = await execute_create_issue( repo_id=fake_id("ghost-repo"), title="T", actor="alice" ) assert result.ok is False assert result.error_code == "repo_not_found" async def test_with_labels(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue( repo_id=r.repo_id, title="Harmony conflict", labels=["harmony", "critical"], actor="alice", ) assert result.ok is True assert set(result.data["labels"]) == {"harmony", "critical"} class TestIntegrationUpdateIssue: async def test_update_title(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Old title", actor="alice" ) num = created.data["number"] result = await execute_update_issue( repo_id=r.repo_id, issue_number=num, title="New title", actor="alice" ) assert result.ok is True assert result.data["title"] == "New title" async def test_close_issue(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="To close", actor="alice" ) num = created.data["number"] result = await execute_update_issue( repo_id=r.repo_id, issue_number=num, state="closed", actor="alice" ) assert result.ok is True assert result.data["state"] == "closed" async def test_unknown_repo(self, db_session: AsyncSession) -> None: result = await execute_update_issue( repo_id=fake_id("ghost-repo"), issue_number=1, title="x", actor="alice" ) assert result.ok is False assert result.error_code == "repo_not_found" async def test_forbidden_without_auth(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_update_issue( repo_id=r.repo_id, issue_number=1, title="x", actor="" ) assert result.ok is False assert result.error_code == "forbidden" async def test_forbidden_non_collaborator(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Some issue", actor="alice" ) num = created.data["number"] result = await execute_update_issue( repo_id=r.repo_id, issue_number=num, title="x", actor="carol" ) assert result.ok is False assert result.error_code == "forbidden" class TestIntegrationCreateIssueComment: async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session, visibility="public") await db_session.commit() issue = await execute_create_issue( repo_id=r.repo_id, title="I", actor="alice" ) num = issue.data["number"] # Any authenticated user can comment on a public repo result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=num, body="LGTM!", actor="bob" ) assert result.ok is True assert "comment_id" in result.data assert result.data["body"] == "LGTM!" async def test_forbidden_without_auth(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=1, body="x", actor="" ) assert result.ok is False assert result.error_code == "forbidden" async def test_forbidden_on_private_repo_for_non_owner(self, db_session: AsyncSession) -> None: r = await _repo(db_session, visibility="private") await db_session.commit() issue = await execute_create_issue(repo_id=r.repo_id, title="I", actor="alice") num = issue.data["number"] result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=num, body="x", actor="carol" ) assert result.ok is False assert result.error_code == "forbidden" async def test_unknown_issue(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=999, body="x", actor="alice" ) assert result.ok is False assert result.error_code == "issue_not_found" class TestIntegrationCreateProposal: async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-harmony") await db_session.commit() result = await execute_create_proposal( repo_id=r.repo_id, title="Add harmony layer", from_branch="feat-harmony", to_branch="main", body="Adds new harmonic dimension.", actor="alice", ) assert result.ok is True assert "proposal_id" in result.data assert result.data["state"] == "open" async def test_any_user_can_create_proposal_on_public_repo(self, db_session: AsyncSession) -> None: r = await _repo(db_session, visibility="public") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-pub") await db_session.commit() result = await execute_create_proposal( repo_id=r.repo_id, title="Public PR", from_branch="feat-pub", to_branch="main", actor="carol" ) assert result.ok is True async def test_forbidden_on_private_repo_for_non_owner(self, db_session: AsyncSession) -> None: r = await _repo(db_session, visibility="private") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-prv") await db_session.commit() result = await execute_create_proposal( repo_id=r.repo_id, title="Private PR", from_branch="feat-prv", to_branch="main", actor="carol" ) assert result.ok is False assert result.error_code == "forbidden" async def test_forbidden_without_auth(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-noauth") await db_session.commit() result = await execute_create_proposal( repo_id=r.repo_id, title="T", from_branch="feat-noauth", to_branch="main", actor="" ) assert result.ok is False assert result.error_code == "forbidden" async def test_unknown_repo(self, db_session: AsyncSession) -> None: result = await execute_create_proposal( repo_id=fake_id("ghost-proposal"), title="T", from_branch="feat", to_branch="main", actor="alice" ) assert result.ok is False assert result.error_code == "repo_not_found" class TestProposalMergeAccessGuard: """Ownership checks on execute_merge_proposal.""" async def test_merge_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner actor receives error_code='forbidden' from execute_merge_proposal.""" r = await _repo(db_session, owner="alice") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-x") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Non-owner merge attempt", from_branch="feat-x", to_branch="main", actor="alice", ) assert proposal_result.ok is True proposal_id = proposal_result.data["proposal_id"] # "carol" is not the owner and has no collaborator entry. result = await execute_merge_proposal(repo_id=r.repo_id, proposal_id=proposal_id, actor="carol") assert result.ok is False assert result.error_code == "forbidden" async def test_merge_forbidden_when_unauthenticated(self, db_session: AsyncSession) -> None: """Empty actor string (unauthenticated) receives error_code='forbidden'.""" r = await _repo(db_session, owner="alice") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-anon") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Unauthenticated merge attempt", from_branch="feat-anon", to_branch="main", actor="alice", ) assert proposal_result.ok is True proposal_id = proposal_result.data["proposal_id"] result = await execute_merge_proposal(repo_id=r.repo_id, proposal_id=proposal_id, actor="") assert result.ok is False assert result.error_code == "forbidden" class TestIntegrationMergeProposal: async def test_merge_open_proposal(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-bass") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Add bass line", from_branch="feat-bass", to_branch="main", actor="alice", ) assert proposal_result.ok is True proposal_id = proposal_result.data["proposal_id"] result = await execute_merge_proposal(repo_id=r.repo_id, proposal_id=proposal_id, actor="alice") assert result.ok is True assert result.data["state"] == "merged" async def test_merge_unknown_proposal(self, db_session: AsyncSession) -> None: r = await _repo(db_session) result = await execute_merge_proposal(repo_id=r.repo_id, proposal_id="ghost-proposal-id", actor="alice") assert result.ok is False class TestIntegrationSubmitReview: async def test_approve(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-review") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Review test", from_branch="feat-review", to_branch="main", actor="alice", ) proposal_id = proposal_result.data["proposal_id"] result = await execute_create_proposal_review( repo_id=r.repo_id, proposal_id=proposal_id, verdict="approve", reviewer="bob" ) assert result.ok is True assert result.data["state"] == "approved" async def test_request_changes(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-chg") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Changes test", from_branch="feat-chg", to_branch="main", actor="alice", ) proposal_id = proposal_result.data["proposal_id"] result = await execute_create_proposal_review( repo_id=r.repo_id, proposal_id=proposal_id, verdict="request_changes", reviewer="carol" ) assert result.ok is True assert result.data["state"] == "changes_requested" async def test_forbidden_without_auth(self, db_session: AsyncSession) -> None: """Empty reviewer (unauthenticated) must be rejected before any DB access.""" r = await _repo(db_session) await db_session.commit() result = await execute_create_proposal_review( repo_id=r.repo_id, proposal_id="any-id", verdict="approve", reviewer="" ) assert result.ok is False assert result.error_code == "forbidden" async def test_forbidden_on_private_repo_for_non_member(self, db_session: AsyncSession) -> None: """Non-member carol cannot review proposals on a private repo.""" r = await _repo(db_session, visibility="private") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-prv") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Private review test", from_branch="feat-prv", to_branch="main", actor="alice", ) proposal_id = proposal_result.data["proposal_id"] result = await execute_create_proposal_review( repo_id=r.repo_id, proposal_id=proposal_id, verdict="approve", reviewer="carol" ) assert result.ok is False assert result.error_code == "forbidden" async def test_any_user_can_review_public_repo(self, db_session: AsyncSession) -> None: """Any authenticated user may submit a review on a public repo.""" r = await _repo(db_session, visibility="public") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-pub") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="Public review test", from_branch="feat-pub", to_branch="main", actor="alice", ) proposal_id = proposal_result.data["proposal_id"] result = await execute_create_proposal_review( repo_id=r.repo_id, proposal_id=proposal_id, verdict="request_changes", reviewer="dave" ) assert result.ok is True assert result.data["state"] in ("pending", "changes_requested") class TestIntegrationCreateRelease: async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v1.0.0", title="First Release", body="Initial stable release.", channel="stable", actor="alice", ) assert result.ok is True assert result.data["tag"] == "v1.0.0" assert result.data["channel"] == "stable" assert "release_id" in result.data async def test_duplicate_tag_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_release(repo_id=r.repo_id, tag="v1.0.0", actor="alice") result = await execute_create_release(repo_id=r.repo_id, tag="v1.0.0", actor="alice") assert result.ok is False async def test_beta_channel(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v2.0.0-beta.1", channel="beta", actor="alice" ) assert result.ok is True assert result.data["channel"] == "beta" class TestIntegrationCreateRepo: async def test_happy_path(self, db_session: AsyncSession) -> None: result = await execute_create_repo( name="jazz-standards", owner="alice", owner_user_id=compute_identity_id(b"alice"), description="My jazz collection", visibility="public", ) assert result.ok is True assert "repo_id" in result.data assert result.data["owner"] == "alice" assert "jazz" in result.data["slug"] async def test_private_visibility(self, db_session: AsyncSession) -> None: result = await execute_create_repo( name="private-session", owner="alice", owner_user_id=compute_identity_id(b"alice"), visibility="private", ) assert result.ok is True assert result.data["visibility"] == "private" async def test_forbidden_when_unauthenticated(self, db_session: AsyncSession) -> None: """Empty owner_user_id (unauthenticated caller) must be rejected.""" result = await execute_create_repo( name="hacked-repo", owner="", owner_user_id="", ) assert result.ok is False assert result.error_code == "forbidden" class TestIntegrationCreateLabel: async def test_happy_path(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_label( repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice" ) assert result.ok is True assert "label_id" in result.data assert result.data["name"] == "bug" assert result.data["color"] == "#d73a4a" async def test_duplicate_name_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_label(repo_id=r.repo_id, name="dup-label", color="#aabbcc", actor="alice") result = await execute_create_label( repo_id=r.repo_id, name="dup-label", color="#112233", actor="alice" ) assert result.ok is False assert "already exists" in (result.error_message or "").lower() async def test_invalid_color_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_label( repo_id=r.repo_id, name="bad-color", color="d73a4a" # missing # ) assert result.ok is False assert result.error_code == "invalid_args" async def test_empty_name_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_label(repo_id=r.repo_id, name=" ", color="#d73a4a") assert result.ok is False assert result.error_code == "invalid_args" async def test_unknown_repo_returns_error(self, db_session: AsyncSession) -> None: result = await execute_create_label( repo_id="00000000-0000-0000-0000-000000000000", name="bug", color="#d73a4a", ) assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationListLabels: async def test_returns_all_labels(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") await execute_create_label(repo_id=r.repo_id, name="enhancement", color="#a2eeef", actor="alice") result = await execute_list_labels(r.repo_id) assert result.ok is True assert result.data["total"] == 2 names = {lbl["name"] for lbl in result.data["labels"]} assert names == {"bug", "enhancement"} async def test_empty_repo_returns_empty_list(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_list_labels(r.repo_id) assert result.ok is True assert result.data["total"] == 0 assert result.data["labels"] == [] async def test_unknown_repo_returns_error(self, db_session: AsyncSession) -> None: result = await execute_list_labels("00000000-0000-0000-0000-000000000000") assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationUpdateLabel: async def test_rename_label(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="old-name", color="#d73a4a", actor="alice") label_id: str = created.data["label_id"] result = await execute_update_label( repo_id=r.repo_id, label_id=label_id, name="new-name", actor="alice" ) assert result.ok is True assert result.data["name"] == "new-name" assert result.data["color"] == "#d73a4a" # unchanged async def test_change_color(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") label_id: str = created.data["label_id"] result = await execute_update_label( repo_id=r.repo_id, label_id=label_id, color="#b60205", actor="alice" ) assert result.ok is True assert result.data["color"] == "#b60205" assert result.data["name"] == "bug" # unchanged async def test_no_fields_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") result = await execute_update_label( repo_id=r.repo_id, label_id=created.data["label_id"] ) assert result.ok is False assert result.error_code == "invalid_args" async def test_rename_conflict_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") await execute_create_label(repo_id=r.repo_id, name="enhancement", color="#a2eeef", actor="alice") result = await execute_update_label( repo_id=r.repo_id, label_id=created.data["label_id"], name="enhancement", actor="alice" ) assert result.ok is False assert result.error_code == "already_exists" async def test_not_found_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_update_label( repo_id=r.repo_id, label_id="00000000-0000-0000-0000-000000000000", name="new-name", actor="alice", ) assert result.ok is False assert result.error_code == "not_found" async def test_invalid_color_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") result = await execute_update_label( repo_id=r.repo_id, label_id=created.data["label_id"], color="b60205" # missing # ) assert result.ok is False assert result.error_code == "invalid_args" class TestIntegrationDeleteLabel: async def test_deletes_label(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") label_id: str = created.data["label_id"] result = await execute_delete_label(repo_id=r.repo_id, label_id=label_id, actor="alice") assert result.ok is True assert result.data["deleted"] is True assert result.data["name"] == "bug" # Confirm it no longer exists. listed = await execute_list_labels(r.repo_id) assert listed.data["total"] == 0 async def test_not_found_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_delete_label( repo_id=r.repo_id, label_id="00000000-0000-0000-0000-000000000000", actor="alice", ) assert result.ok is False assert result.error_code == "not_found" class TestLabelWriteAccessGuard: """Non-owners must be rejected when managing labels.""" async def test_create_label_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_label( repo_id=r.repo_id, name="bug", color="#d73a4a", actor="eve" ) assert result.ok is False assert result.error_code == "forbidden" async def test_update_label_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") result = await execute_update_label( repo_id=r.repo_id, label_id=created.data["label_id"], name="hacked", actor="eve" ) assert result.ok is False assert result.error_code == "forbidden" async def test_delete_label_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a", actor="alice") result = await execute_delete_label( repo_id=r.repo_id, label_id=created.data["label_id"], actor="eve" ) assert result.ok is False assert result.error_code == "forbidden" async def test_create_label_forbidden_when_unauthenticated(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_label(repo_id=r.repo_id, name="bug", color="#d73a4a") assert result.ok is False assert result.error_code == "forbidden" # ── Layer 3 — End-to-End ────────────────────────────────────────────────────── class TestE2EAuthGate: """Every write tool call without auth must return 401.""" async def test_create_issue_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: r = await _repo(db_session) resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_issue", {"repo_id": r.repo_id, "title": "T"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 async def test_create_repo_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_repo", {"name": "test-repo"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 async def test_create_proposal_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_proposal", { "repo_id": "x", "title": "T", "from_branch": "a", "to_branch": "b" }), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 async def test_merge_proposal_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_merge_proposal", {"repo_id": "x", "proposal_id": "y"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 async def test_create_release_no_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_release", {"repo_id": "x", "tag": "v1.0", "title": "R"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 class TestE2EAuthenticatedWrite: """Write tools succeed when auth is present.""" async def test_create_issue_with_auth( self, http_client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: r = await _repo(db_session) await db_session.commit() resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_issue", { "repo_id": r.repo_id, "title": "Auth issue" }), headers=auth_headers, ) assert resp.status_code == 200 result = resp.json()["result"] assert result["isError"] is False payload = json.loads(_unwrap_tool_text(result["content"][0]["text"])) assert payload["title"] == "Auth issue" async def test_create_repo_with_auth( self, http_client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_repo", { "name": "e2e-authed-repo", "owner": "testuser", }), headers=auth_headers, ) assert resp.status_code == 200 result = resp.json()["result"] assert result["isError"] is False payload = json.loads(_unwrap_tool_text(result["content"][0]["text"])) assert "repo_id" in payload # ── Layer 4 — Stress ────────────────────────────────────────────────────────── class TestStressWriteTools: async def test_20_sequential_issues_have_sequential_numbers( self, db_session: AsyncSession ) -> None: r = await _repo(db_session) await db_session.commit() numbers = [] for i in range(20): result = await execute_create_issue( repo_id=r.repo_id, title=f"Issue {i}", actor="alice" ) assert result.ok is True numbers.append(result.data["number"]) assert numbers == list(range(1, 21)) # ── Layer 5 — Data Integrity ────────────────────────────────────────────────── class TestDataIntegrityIssue: async def test_created_issue_retrievable(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_issue( repo_id=r.repo_id, title="Retrievable issue", actor="alice" ) issues_result = await execute_list_issues(r.repo_id, state="open") assert issues_result.ok is True titles = [i["title"] for i in issues_result.data["issues"]] assert "Retrievable issue" in titles async def test_closed_issue_state_persisted(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Close me", actor="alice" ) num = created.data["number"] await execute_update_issue(repo_id=r.repo_id, issue_number=num, state="closed", actor="alice") issues_result = await execute_list_issues(r.repo_id, state="closed") assert issues_result.ok is True numbers = [i["number"] for i in issues_result.data["issues"]] assert num in numbers class TestDataIntegrityProposal: async def test_created_proposal_retrievable(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-di") await db_session.commit() proposal_result = await execute_create_proposal( repo_id=r.repo_id, title="DI proposal", from_branch="feat-di", to_branch="main", actor="alice", ) assert proposal_result.ok is True list_result = await execute_list_proposals(r.repo_id, state="open") assert list_result.ok is True titles = [p["title"] for p in list_result.data["pulls"]] assert "DI proposal" in titles class TestDataIntegrityRelease: async def test_release_tag_persisted(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v3.7.2", title="Minor release", actor="alice" ) assert result.ok is True assert result.data["tag"] == "v3.7.2" async def test_release_author_persisted(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_release( repo_id=r.repo_id, tag="v0.1.0", actor="alice" ) assert result.ok is True assert result.data["author"] == "alice" # ── Layer 6 — Security ──────────────────────────────────────────────────────── class TestSecurityWriteTools: def test_all_write_tools_in_auth_gate_set(self) -> None: """All MUSEHUB_WRITE_TOOLS must be in MUSEHUB_WRITE_TOOL_NAMES.""" from musehub.mcp.tools.musehub import MUSEHUB_WRITE_TOOLS write_tool_names_from_list = {t["name"] for t in MUSEHUB_WRITE_TOOLS} # All write tools in the list should be in the gate set. uncovered = write_tool_names_from_list - MUSEHUB_WRITE_TOOL_NAMES assert not uncovered, f"Write tools not in auth gate: {uncovered}" async def test_create_proposal_same_branch_guard(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_create_proposal( repo_id=r.repo_id, title="T", from_branch="main", to_branch="main", actor="alice" ) assert result.ok is False async def test_duplicate_label_rejected(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() await execute_create_label(repo_id=r.repo_id, name="sec-label", color="aabbcc") result = await execute_create_label( repo_id=r.repo_id, name="sec-label", color="ddeeff" ) assert result.ok is False async def test_create_issue_comment_on_wrong_issue_safe( self, db_session: AsyncSession ) -> None: """create_issue_comment on non-existent issue must return not_found, not crash.""" r = await _repo(db_session) await db_session.commit() result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=9999, body="hack", actor="eve" ) assert result.ok is False assert result.error_code == "issue_not_found" # ── Layer 7 — Performance ───────────────────────────────────────────────────── class TestPerformanceWriteTools: async def test_10_sequential_issues_under_500ms( self, db_session: AsyncSession ) -> None: r = await _repo(db_session) await db_session.commit() start = time.perf_counter() for i in range(10): result = await execute_create_issue( repo_id=r.repo_id, title=f"Perf issue {i}", actor="alice" ) assert result.ok is True elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 2000, f"10 issues took {elapsed_ms:.1f} ms" async def test_5_sequential_repos_under_1000ms( self, db_session: AsyncSession ) -> None: start = time.perf_counter() for i in range(5): result = await execute_create_repo( name=f"perf-repo-{i}", owner="alice", owner_user_id=compute_identity_id(b"alice"), initialize=False, ) assert result.ok is True elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 1000, f"5 repos took {elapsed_ms:.1f} ms" # ── Issue state transition tests ─────────────────────────────────────────────── class TestIntegrationCloseIssue: async def test_close_open_issue(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Bug to close", actor="alice") num: int = created.data["number"] result = await execute_close_issue(repo_id=r.repo_id, issue_number=num, actor="alice") assert result.ok is True assert result.data["state"] == "closed" assert result.data["number"] == num async def test_close_already_closed_is_idempotent(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Double close", actor="alice") num: int = created.data["number"] await execute_close_issue(repo_id=r.repo_id, issue_number=num, actor="alice") result = await execute_close_issue(repo_id=r.repo_id, issue_number=num, actor="alice") assert result.ok is True assert result.data["state"] == "closed" async def test_close_unknown_issue_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_close_issue(repo_id=r.repo_id, issue_number=9999, actor="alice") assert result.ok is False assert result.error_code == "issue_not_found" async def test_close_unknown_repo_returns_error(self, db_session: AsyncSession) -> None: nonexistent_repo_id = compute_repo_id(compute_identity_id(b"ghost"), "ghost-repo", "code", "2025-01-01T00:00:00") result = await execute_close_issue(repo_id=nonexistent_repo_id, issue_number=1, actor="alice") assert result.ok is False # close_issue returns None for unknown repo (issue not found path) assert result.error_code in ("issue_not_found", "repo_not_found", "invalid_args") class TestIntegrationReopenIssue: async def test_reopen_closed_issue(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Reopen me", actor="alice") num: int = created.data["number"] await execute_close_issue(repo_id=r.repo_id, issue_number=num, actor="alice") result = await execute_reopen_issue(repo_id=r.repo_id, issue_number=num, actor="alice") assert result.ok is True assert result.data["state"] == "open" assert result.data["number"] == num async def test_reopen_already_open_is_idempotent(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Already open", actor="alice") num: int = created.data["number"] result = await execute_reopen_issue(repo_id=r.repo_id, issue_number=num, actor="alice") assert result.ok is True assert result.data["state"] == "open" async def test_reopen_unknown_issue_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_reopen_issue(repo_id=r.repo_id, issue_number=9999, actor="alice") assert result.ok is False assert result.error_code == "issue_not_found" class TestIntegrationAssignIssue: async def test_assign_sets_assignee(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Assignable", actor="alice") num: int = created.data["number"] result = await execute_assign_issue( repo_id=r.repo_id, issue_number=num, assignee="bob", actor="alice" ) assert result.ok is True assert result.data["assignee"] == "bob" async def test_unassign_with_empty_string(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Unassignable", actor="alice") num: int = created.data["number"] await execute_assign_issue(repo_id=r.repo_id, issue_number=num, assignee="bob", actor="alice") result = await execute_assign_issue( repo_id=r.repo_id, issue_number=num, assignee="", actor="alice" ) assert result.ok is True assert result.data["assignee"] is None or result.data["assignee"] == "" async def test_assign_unknown_issue_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_assign_issue( repo_id=r.repo_id, issue_number=9999, assignee="bob", actor="alice" ) assert result.ok is False assert result.error_code == "issue_not_found" class TestIntegrationSetIssueLabels: async def test_set_labels_replaces_all(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Label me", labels=["bug"], actor="alice" ) num: int = created.data["number"] result = await execute_update_issue_labels( repo_id=r.repo_id, issue_number=num, labels=["enhancement", "help-wanted"], actor="alice" ) assert result.ok is True assert set(result.data["labels"]) == {"enhancement", "help-wanted"} async def test_set_empty_labels_clears_all(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Clear labels", labels=["bug"], actor="alice" ) num: int = created.data["number"] result = await execute_update_issue_labels( repo_id=r.repo_id, issue_number=num, labels=[], actor="alice" ) assert result.ok is True assert result.data["labels"] == [] async def test_set_labels_unknown_issue_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_update_issue_labels( repo_id=r.repo_id, issue_number=9999, labels=["bug"], actor="alice" ) assert result.ok is False assert result.error_code == "issue_not_found" class TestIntegrationRemoveIssueLabel: async def test_remove_existing_label(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="Multi-label", labels=["bug", "enhancement"], actor="alice" ) num: int = created.data["number"] result = await execute_remove_issue_label( repo_id=r.repo_id, issue_number=num, label="bug", actor="alice" ) assert result.ok is True assert "bug" not in result.data["labels"] assert "enhancement" in result.data["labels"] async def test_remove_absent_label_is_idempotent(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue( repo_id=r.repo_id, title="No label", labels=[], actor="alice" ) num: int = created.data["number"] result = await execute_remove_issue_label( repo_id=r.repo_id, issue_number=num, label="nonexistent", actor="alice" ) assert result.ok is True assert result.data["labels"] == [] async def test_remove_label_unknown_issue_returns_error(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() result = await execute_remove_issue_label( repo_id=r.repo_id, issue_number=9999, label="bug", actor="alice" ) assert result.ok is False assert result.error_code == "issue_not_found" class TestDataIntegrityIssueStateTransitions: async def test_close_then_reopen_cycle(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Cycle issue", actor="alice") num: int = created.data["number"] await execute_close_issue(repo_id=r.repo_id, issue_number=num, actor="alice") closed_result = await execute_list_issues(r.repo_id, state="closed") assert num in [i["number"] for i in closed_result.data["issues"]] await execute_reopen_issue(repo_id=r.repo_id, issue_number=num, actor="alice") open_result = await execute_list_issues(r.repo_id, state="open") assert num in [i["number"] for i in open_result.data["issues"]] async def test_assign_then_unassign_persisted(self, db_session: AsyncSession) -> None: r = await _repo(db_session) await db_session.commit() created = await execute_create_issue(repo_id=r.repo_id, title="Assign cycle", actor="alice") num: int = created.data["number"] await execute_assign_issue(repo_id=r.repo_id, issue_number=num, assignee="carol", actor="alice") await execute_assign_issue(repo_id=r.repo_id, issue_number=num, assignee="", actor="alice") open_list = await execute_list_issues(r.repo_id, state="open") issue = next((i for i in open_list.data["issues"] if i["number"] == num), None) assert issue is not None assert issue.get("assignee") is None or issue.get("assignee") == "" class TestIntegrationDeleteIssueComment: """Tests for execute_delete_issue_comment.""" async def test_delete_existing_comment(self, db_session: AsyncSession) -> None: """Deleting an existing comment returns ok=True and deleted=True.""" from musehub.mcp.write_tools.issues import execute_create_issue_comment r = await _repo(db_session) await db_session.commit() issue_result = await execute_create_issue(repo_id=r.repo_id, title="Issue with comment", actor="alice") num: int = issue_result.data["number"] comment_result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=num, body="A comment", actor="alice" ) assert comment_result.ok is True comment_id: str = comment_result.data["comment_id"] delete_result = await execute_delete_issue_comment( repo_id=r.repo_id, issue_number=num, comment_id=comment_id, actor="alice" ) assert delete_result.ok is True assert delete_result.data["deleted"] is True assert delete_result.data["comment_id"] == comment_id async def test_delete_unknown_comment_returns_error(self, db_session: AsyncSession) -> None: """Deleting a non-existent comment returns ok=False with comment_not_found.""" r = await _repo(db_session) await db_session.commit() issue_result = await execute_create_issue(repo_id=r.repo_id, title="Commentless issue", actor="alice") num: int = issue_result.data["number"] result = await execute_delete_issue_comment( repo_id=r.repo_id, issue_number=num, comment_id="ghost-comment-id", actor="alice" ) assert result.ok is False assert result.error_code == "comment_not_found" async def test_delete_comment_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner cannot delete a comment — returns forbidden.""" from musehub.mcp.write_tools.issues import execute_create_issue_comment r = await _repo(db_session, owner="alice") await db_session.commit() issue_result = await execute_create_issue(repo_id=r.repo_id, title="Protected issue", actor="alice") num: int = issue_result.data["number"] comment_result = await execute_create_issue_comment( repo_id=r.repo_id, issue_number=num, body="Comment to protect", actor="alice" ) comment_id: str = comment_result.data["comment_id"] result = await execute_delete_issue_comment( repo_id=r.repo_id, issue_number=num, comment_id=comment_id, actor="carol" ) assert result.ok is False assert result.error_code == "forbidden" class TestIntegrationCollaborators: """Tests for execute_list/invite/update/remove_collaborator.""" async def test_invite_and_list_collaborator(self, db_session: AsyncSession) -> None: """Inviting a collaborator then listing shows the new entry.""" r = await _repo(db_session, owner="alice") await db_session.commit() invite = await execute_invite_collaborator( repo_id=r.repo_id, handle="carol", permission="write", actor="alice" ) assert invite.ok is True assert invite.data["handle"] == "carol" assert invite.data["permission"] == "write" listing = await execute_list_collaborators(repo_id=r.repo_id, actor="alice") assert listing.ok is True handles = [c["handle"] for c in listing.data["collaborators"]] assert "carol" in handles async def test_invite_duplicate_returns_conflict(self, db_session: AsyncSession) -> None: """Inviting the same handle twice returns error_code='conflict'.""" r = await _repo(db_session, owner="alice") await db_session.commit() await execute_invite_collaborator(repo_id=r.repo_id, handle="carol", actor="alice") result = await execute_invite_collaborator(repo_id=r.repo_id, handle="carol", actor="alice") assert result.ok is False assert result.error_code == "conflict" async def test_update_collaborator_permission(self, db_session: AsyncSession) -> None: """Updating a collaborator's permission is reflected immediately.""" r = await _repo(db_session, owner="alice") await db_session.commit() await execute_invite_collaborator(repo_id=r.repo_id, handle="carol", permission="read", actor="alice") update = await execute_update_collaborator_permission( repo_id=r.repo_id, handle="carol", permission="admin", actor="alice" ) assert update.ok is True assert update.data["permission"] == "admin" async def test_remove_collaborator(self, db_session: AsyncSession) -> None: """Removing a collaborator causes them to disappear from the list.""" r = await _repo(db_session, owner="alice") await db_session.commit() await execute_invite_collaborator(repo_id=r.repo_id, handle="carol", actor="alice") remove = await execute_remove_collaborator(repo_id=r.repo_id, handle="carol", actor="alice") assert remove.ok is True assert remove.data["removed"] is True listing = await execute_list_collaborators(repo_id=r.repo_id, actor="alice") handles = [c["handle"] for c in listing.data["collaborators"]] assert "carol" not in handles async def test_invite_forbidden_for_non_admin(self, db_session: AsyncSession) -> None: """A non-admin collaborator cannot invite others.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_invite_collaborator( repo_id=r.repo_id, handle="dave", permission="write", actor="carol" ) assert result.ok is False assert result.error_code == "forbidden" async def test_list_collaborators_unknown_repo(self, db_session: AsyncSession) -> None: """Listing collaborators for a non-existent repo returns repo_not_found.""" result = await execute_list_collaborators(repo_id=fake_id("ghost-repo"), actor="alice") assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationWebhooksMCP: """Tests for execute_create/list/delete_webhook via MCP layer.""" async def test_create_and_list_webhook(self, db_session: AsyncSession) -> None: """Creating a webhook then listing shows the new subscription.""" r = await _repo(db_session, owner="alice") await db_session.commit() created = await execute_create_webhook( repo_id=r.repo_id, url="https://example.com/hook", events=["push", "issue"], actor="alice", ) assert created.ok is True assert created.data["url"] == "https://example.com/hook" assert set(created.data["events"]) == {"push", "issue"} webhook_id: str = created.data["webhook_id"] listing = await execute_list_webhooks(repo_id=r.repo_id, actor="alice") assert listing.ok is True ids = [w["webhook_id"] for w in listing.data["webhooks"]] assert webhook_id in ids async def test_delete_webhook(self, db_session: AsyncSession) -> None: """Deleting a webhook removes it from the listing.""" r = await _repo(db_session, owner="alice") await db_session.commit() created = await execute_create_webhook( repo_id=r.repo_id, url="https://example.com/delete-hook", events=["push"], actor="alice", ) webhook_id: str = created.data["webhook_id"] deleted = await execute_delete_webhook(repo_id=r.repo_id, webhook_id=webhook_id, actor="alice") assert deleted.ok is True assert deleted.data["deleted"] is True listing = await execute_list_webhooks(repo_id=r.repo_id, actor="alice") ids = [w["webhook_id"] for w in listing.data["webhooks"]] assert webhook_id not in ids async def test_create_webhook_invalid_event_type(self, db_session: AsyncSession) -> None: """Unknown event types are rejected with invalid_args.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_create_webhook( repo_id=r.repo_id, url="https://example.com/hook", events=["not_a_real_event"], actor="alice", ) assert result.ok is False assert result.error_code == "invalid_args" async def test_create_webhook_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner cannot create webhooks.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_create_webhook( repo_id=r.repo_id, url="https://example.com/hook", events=["push"], actor="carol", ) assert result.ok is False assert result.error_code == "forbidden" async def test_delete_unknown_webhook_returns_error(self, db_session: AsyncSession) -> None: """Deleting a non-existent webhook returns webhook_not_found.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_delete_webhook( repo_id=r.repo_id, webhook_id="ghost-webhook-id", actor="alice" ) assert result.ok is False assert result.error_code == "webhook_not_found" async def test_list_webhooks_forbidden_without_auth(self, db_session: AsyncSession) -> None: """Empty actor (unauthenticated) is forbidden from listing webhooks.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_list_webhooks(repo_id=r.repo_id, actor="") assert result.ok is False assert result.error_code == "forbidden" async def test_list_webhooks_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner carol cannot list webhooks — they may contain sensitive URLs.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_list_webhooks(repo_id=r.repo_id, actor="carol") assert result.ok is False assert result.error_code == "forbidden" class TestIntegrationReleaseAssets: """Tests for execute_attach/delete_release_asset via MCP layer.""" async def test_attach_and_delete_asset(self, db_session: AsyncSession) -> None: """Attaching an asset then deleting it returns deleted=True.""" r = await _repo(db_session, owner="alice") await db_session.commit() release_result = await execute_create_release( repo_id=r.repo_id, tag="v1.0.0", title="First release", actor="alice" ) assert release_result.ok is True attach = await execute_attach_release_asset( repo_id=r.repo_id, tag="v1.0.0", name="myapp-v1.0.0.tar.gz", download_url="https://cdn.example.com/myapp-v1.0.0.tar.gz", actor="alice", ) assert attach.ok is True assert attach.data["name"] == "myapp-v1.0.0.tar.gz" asset_id: str = attach.data["asset_id"] delete = await execute_delete_release_asset( repo_id=r.repo_id, tag="v1.0.0", asset_id=asset_id, actor="alice" ) assert delete.ok is True assert delete.data["deleted"] is True async def test_attach_asset_release_not_found(self, db_session: AsyncSession) -> None: """Attaching an asset to a non-existent release returns release_not_found.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_attach_release_asset( repo_id=r.repo_id, tag="v9.9.9", name="ghost.tar.gz", download_url="https://cdn.example.com/ghost.tar.gz", actor="alice", ) assert result.ok is False assert result.error_code == "release_not_found" async def test_attach_asset_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner cannot attach assets.""" r = await _repo(db_session, owner="alice") await db_session.commit() await execute_create_release(repo_id=r.repo_id, tag="v1.0.0", actor="alice") result = await execute_attach_release_asset( repo_id=r.repo_id, tag="v1.0.0", name="evil.tar.gz", download_url="https://cdn.example.com/evil.tar.gz", actor="carol", ) assert result.ok is False assert result.error_code == "forbidden" class TestIntegrationProposalComments: """Integration tests for execute_list_proposal_comments.""" async def test_list_proposal_comments_happy_path(self, db_session: AsyncSession) -> None: """list_proposal_comments returns threaded comments for a proposal.""" r = await _repo(db_session, owner="alice") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-pc") await db_session.commit() proposal = await execute_create_proposal( repo_id=r.repo_id, title="Test proposal", from_branch="feat-pc", to_branch="main", actor="alice", ) assert proposal.ok is True proposal_id: str = proposal.data["proposal_id"] comment = await execute_create_proposal_comment( repo_id=r.repo_id, proposal_id=proposal_id, body="Looks good!", actor="alice", ) assert comment.ok is True result = await execute_list_proposal_comments( repo_id=r.repo_id, proposal_id=proposal_id, actor="alice", ) assert result.ok is True assert result.data["total"] == 1 assert len(result.data["comments"]) == 1 assert result.data["comments"][0]["body"] == "Looks good!" async def test_list_proposal_comments_requires_auth(self, db_session: AsyncSession) -> None: """list_proposal_comments requires authentication.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_list_proposal_comments( repo_id=r.repo_id, proposal_id="any-id", actor="", ) assert result.ok is False assert result.error_code == "forbidden" class TestIntegrationProposalReviewers: """Integration tests for request/remove proposal reviewers and list reviews.""" async def test_request_and_list_reviewers(self, db_session: AsyncSession) -> None: """request_proposal_reviewers creates pending rows; list_proposal_reviews returns them.""" r = await _repo(db_session, owner="alice") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-rv") await db_session.commit() proposal = await execute_create_proposal( repo_id=r.repo_id, title="Review me", from_branch="feat-rv", to_branch="main", actor="alice", ) assert proposal.ok is True proposal_id: str = proposal.data["proposal_id"] req = await execute_request_proposal_reviewers( repo_id=r.repo_id, proposal_id=proposal_id, reviewers=["bob", "carol"], actor="alice", ) assert req.ok is True assert req.data["total"] == 2 reviewer_handles = {rv["reviewer"] for rv in req.data["reviews"]} assert "bob" in reviewer_handles assert "carol" in reviewer_handles lst = await execute_list_proposal_reviews( repo_id=r.repo_id, proposal_id=proposal_id, actor="alice", ) assert lst.ok is True assert lst.data["total"] == 2 async def test_remove_reviewer(self, db_session: AsyncSession) -> None: """remove_proposal_reviewer removes a pending reviewer.""" r = await _repo(db_session, owner="alice") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-rm") await db_session.commit() proposal = await execute_create_proposal( repo_id=r.repo_id, title="Remove reviewer", from_branch="feat-rm", to_branch="main", actor="alice", ) proposal_id: str = proposal.data["proposal_id"] await execute_request_proposal_reviewers( repo_id=r.repo_id, proposal_id=proposal_id, reviewers=["bob"], actor="alice", ) remove = await execute_remove_proposal_reviewer( repo_id=r.repo_id, proposal_id=proposal_id, reviewer="bob", actor="alice", ) assert remove.ok is True assert remove.data["total"] == 0 async def test_request_reviewers_forbidden_for_non_write(self, db_session: AsyncSession) -> None: """Unauthenticated user cannot request reviewers.""" r = await _repo(db_session, owner="alice") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-fw") await db_session.commit() proposal = await execute_create_proposal( repo_id=r.repo_id, title="Forbidden", from_branch="feat-fw", to_branch="main", actor="alice", ) proposal_id: str = proposal.data["proposal_id"] result = await execute_request_proposal_reviewers( repo_id=r.repo_id, proposal_id=proposal_id, reviewers=["bob"], actor="", ) assert result.ok is False assert result.error_code == "forbidden" async def test_list_reviews_filtered_by_state(self, db_session: AsyncSession) -> None: """list_proposal_reviews state filter returns only matching rows.""" r = await _repo(db_session, owner="alice") await _commit_and_branch(db_session, r.repo_id, "main") await _commit_and_branch(db_session, r.repo_id, "feat-flt") await db_session.commit() proposal = await execute_create_proposal( repo_id=r.repo_id, title="Filter test", from_branch="feat-flt", to_branch="main", actor="alice", ) proposal_id: str = proposal.data["proposal_id"] await execute_request_proposal_reviewers( repo_id=r.repo_id, proposal_id=proposal_id, reviewers=["bob"], actor="alice", ) lst = await execute_list_proposal_reviews( repo_id=r.repo_id, proposal_id=proposal_id, state="pending", actor="alice", ) assert lst.ok is True assert lst.data["total"] == 1 lst_approved = await execute_list_proposal_reviews( repo_id=r.repo_id, proposal_id=proposal_id, state="approved", actor="alice", ) assert lst_approved.ok is True assert lst_approved.data["total"] == 0 class TestIntegrationRepoManagement: """Integration tests for delete_repo, update_repo, transfer_repo_ownership.""" async def test_delete_repo_happy_path(self, db_session: AsyncSession) -> None: """Owner can delete their own repo.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_delete_repo(repo_id=r.repo_id, actor="alice") assert result.ok is True assert result.data["deleted"] is True assert result.data["repo_id"] == r.repo_id async def test_delete_repo_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner cannot delete a repo.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_delete_repo(repo_id=r.repo_id, actor="carol") assert result.ok is False assert result.error_code == "forbidden" async def test_delete_repo_requires_auth(self, db_session: AsyncSession) -> None: """Unauthenticated user cannot delete a repo.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_delete_repo(repo_id=r.repo_id, actor="") assert result.ok is False assert result.error_code == "forbidden" async def test_update_repo_description(self, db_session: AsyncSession) -> None: """Owner can update the repo description.""" from musehub.mcp.write_tools.repos import execute_update_repo r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_update_repo( repo_id=r.repo_id, actor="alice", description="Updated description", ) assert result.ok is True assert result.data["description"] == "Updated description" async def test_update_repo_visibility(self, db_session: AsyncSession) -> None: """Owner can change visibility from public to private.""" from musehub.mcp.write_tools.repos import execute_update_repo r = await _repo(db_session, owner="alice", visibility="public") await db_session.commit() result = await execute_update_repo( repo_id=r.repo_id, actor="alice", visibility="private", ) assert result.ok is True assert result.data["visibility"] == "private" async def test_update_repo_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner cannot update repo settings.""" from musehub.mcp.write_tools.repos import execute_update_repo r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_update_repo( repo_id=r.repo_id, actor="carol", description="Sneaky update", ) assert result.ok is False assert result.error_code == "forbidden" async def test_update_repo_requires_auth(self, db_session: AsyncSession) -> None: """Unauthenticated caller cannot update repo settings.""" from musehub.mcp.write_tools.repos import execute_update_repo r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_update_repo( repo_id=r.repo_id, actor="", description="Should fail", ) assert result.ok is False assert result.error_code == "forbidden" async def test_update_repo_not_found(self, db_session: AsyncSession) -> None: """Updating a non-existent repo returns repo_not_found.""" from musehub.mcp.write_tools.repos import execute_update_repo result = await execute_update_repo( repo_id="00000000-0000-0000-0000-000000000000", actor="alice", name="ghost", ) assert result.ok is False assert result.error_code == "repo_not_found" async def test_patch_repo_settings_happy_path(self, db_session: AsyncSession) -> None: """Owner can patch repo settings.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_update_repo( repo_id=r.repo_id, actor="alice", description="New description", visibility="private", ) assert result.ok is True assert result.data["description"] == "New description" assert result.data["visibility"] == "private" async def test_patch_repo_settings_forbidden_for_non_admin(self, db_session: AsyncSession) -> None: """Non-admin cannot patch repo settings.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_update_repo( repo_id=r.repo_id, actor="carol", description="Evil update", ) assert result.ok is False assert result.error_code == "forbidden" async def test_transfer_repo_ownership_happy_path(self, db_session: AsyncSession) -> None: """Owner can transfer ownership to another user.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_transfer_repo_ownership( repo_id=r.repo_id, new_owner="bob", actor="alice", ) assert result.ok is True assert result.data["owner_user_id"] == "bob" async def test_transfer_repo_ownership_forbidden_for_non_owner(self, db_session: AsyncSession) -> None: """Non-owner cannot transfer ownership.""" r = await _repo(db_session, owner="alice") await db_session.commit() result = await execute_transfer_repo_ownership( repo_id=r.repo_id, new_owner="evil", actor="carol", ) assert result.ok is False assert result.error_code == "forbidden"