"""Tests for new MCP executor functions added in CRUD gap-fill. Covers all 8 test tiers for the 9 new executor functions: execute_list_issue_comments execute_update_release execute_list_release_assets execute_read_user_profile execute_update_user_profile execute_list_topics execute_set_repo_topics execute_list_webhook_deliveries execute_redeliver_webhook Tier 1 Unit — pure-Python, no DB, fast Tier 2 Integration — real DB via db_session fixture Tier 3 E2E — HTTP requests through the ASGI app Tier 4 Stress — high-volume sequential calls Tier 5 Data Integrity — cross-verify with read-back queries Tier 6 Security — auth gate and permission guards Tier 7 Performance — wall-clock timing assertions Tier 8 Docstrings — inspect all exported functions for docstrings """ from __future__ import annotations import inspect import secrets import time from unittest.mock import MagicMock import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_release_models import MusehubRelease from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubIssue, MusehubIssueComment from musehub.db.musehub_webhook_models import MusehubWebhook, MusehubWebhookDelivery from musehub.services.musehub_mcp_executor import ( execute_list_issue_comments, execute_list_release_assets, execute_list_topics, execute_list_webhook_deliveries, execute_read_user_profile, execute_redeliver_webhook, execute_set_repo_topics, execute_update_release, execute_update_user_profile, ) # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def anyio_backend() -> str: return "asyncio" def _uid() -> str: return secrets.token_hex(16) def _slug() -> str: return f"repo-{secrets.token_hex(4)}" async def _make_repo( session: AsyncSession, *, owner: str = "alice", visibility: str = "public", tags: list[str] | None = None, ) -> MusehubRepo: from datetime import datetime, timezone from musehub.core.genesis import compute_repo_id slug = _slug() owner_user_id = f"uid-{owner}" created_at = datetime.now(tz=timezone.utc) repo_id = compute_repo_id(owner_user_id, slug, "code", created_at.isoformat()) r = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility=visibility, tags=tags or [], owner_user_id=owner_user_id, created_at=created_at, ) session.add(r) await session.flush() await session.refresh(r) return r async def _make_identity( session: AsyncSession, handle: str = "testuser", ) -> MusehubIdentity: from muse.core.types import fake_id ident = MusehubIdentity( identity_id=fake_id(f"identity:{handle}"), handle=handle, display_name=handle.capitalize(), bio=f"Bio for {handle}", avatar_url="", location="", website_url="", social_url="", pinned_repo_ids=[], ) session.add(ident) await session.flush() await session.refresh(ident) return ident async def _make_issue( session: AsyncSession, repo_id: str, *, number: int = 1, title: str = "Test issue", author: str = "alice", ) -> MusehubIssue: from datetime import datetime, timezone from muse.core.types import fake_id from musehub.core.genesis import compute_issue_id created_at = datetime.now(tz=timezone.utc) author_identity_id = fake_id(f"identity:{author}") issue = MusehubIssue( issue_id=compute_issue_id(repo_id, author_identity_id, created_at.isoformat()), repo_id=repo_id, number=number, title=title, body="", author=author, state="open", created_at=created_at, ) session.add(issue) await session.flush() await session.refresh(issue) return issue async def _make_release( session: AsyncSession, repo_id: str, *, tag: str = "v1.0.0", title: str = "Release 1.0.0", ) -> MusehubRelease: from datetime import datetime, timezone from musehub.core.genesis import compute_release_id created_at = datetime.now(tz=timezone.utc) rel = MusehubRelease( release_id=compute_release_id(repo_id, tag, created_at.isoformat()), repo_id=repo_id, tag=tag, title=title, body="Release notes.", channel="stable", commit_id="abc123", author="alice", created_at=created_at, ) session.add(rel) await session.flush() await session.refresh(rel) return rel async def _make_webhook( session: AsyncSession, repo_id: str, *, url: str = "http://example.com/hook", ) -> MusehubWebhook: from datetime import datetime, timezone from musehub.core.genesis import compute_webhook_id created_at = datetime.now(tz=timezone.utc) hook = MusehubWebhook( webhook_id=compute_webhook_id(repo_id, url, created_at.isoformat()), repo_id=repo_id, url=url, secret="s3cr3t", events=["push"], active=True, created_at=created_at, ) session.add(hook) await session.flush() await session.refresh(hook) return hook def _make_comment( issue_id: str, repo_id: str, *, author: str = "alice", body: str = "A comment", seq: int = 0, ) -> MusehubIssueComment: from datetime import datetime, timezone from muse.core.types import fake_id from musehub.core.genesis import compute_comment_id created_at = datetime.now(tz=timezone.utc) author_identity_id = fake_id(f"identity:{author}:{seq}") return MusehubIssueComment( comment_id=compute_comment_id(issue_id, author_identity_id, created_at.isoformat()), issue_id=issue_id, repo_id=repo_id, author=author, body=body, created_at=created_at, ) async def _make_delivery( session: AsyncSession, webhook_id: str, *, event_type: str = "push", success: bool = True, status_code: int = 200, ) -> MusehubWebhookDelivery: import json as _json delivery = MusehubWebhookDelivery( delivery_id=blob_id(f"delivery:{webhook_id}:{event_type}:{secrets.token_hex(16)}".encode()), webhook_id=webhook_id, event_type=event_type, payload=_json.dumps({"action": "push"}), response_body="OK", response_status=status_code, success=success, ) session.add(delivery) await session.flush() await session.refresh(delivery) return delivery # ── Tier 1 Unit ─────────────────────────────────────────────────────────────── class TestUnit: """Tier 1: Pure-Python logic, no DB required.""" def test_execute_list_issue_comments_is_async(self) -> None: """execute_list_issue_comments must be a coroutine function.""" assert inspect.iscoroutinefunction(execute_list_issue_comments) def test_execute_update_release_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_update_release) def test_execute_list_release_assets_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_list_release_assets) def test_execute_read_user_profile_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_read_user_profile) def test_execute_update_user_profile_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_update_user_profile) def test_execute_list_topics_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_list_topics) def test_execute_set_repo_topics_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_set_repo_topics) def test_execute_list_webhook_deliveries_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_list_webhook_deliveries) def test_execute_redeliver_webhook_is_async(self) -> None: assert inspect.iscoroutinefunction(execute_redeliver_webhook) def test_update_user_profile_forbidden_when_actor_mismatch( self, monkeypatch: pytest.MonkeyPatch, ) -> None: """Actor != username → forbidden before any DB access.""" import asyncio import musehub.services.musehub_mcp_executor as _exe monkeypatch.setattr(_exe, "_check_db_available", lambda: None) result = asyncio.run( execute_update_user_profile( username="alice", bio="Hi", actor="bob", ) ) assert not result.ok assert result.error_code == "forbidden" # ── Tier 2 Integration ──────────────────────────────────────────────────────── @pytest.mark.asyncio class TestIntegration: """Tier 2: Happy-path and error-path tests against a real DB.""" async def test_list_issue_comments_happy( self, db_session: AsyncSession ) -> None: """list_issue_comments returns comments for a valid issue.""" repo = await _make_repo(db_session) issue = await _make_issue(db_session, repo.repo_id, number=1) # Add a comment directly comment = _make_comment(issue.issue_id, repo.repo_id, body="First comment") db_session.add(comment) await db_session.commit() result = await execute_list_issue_comments(repo.repo_id, 1) assert result.ok assert result.data["total"] == 1 assert result.data["comments"][0]["body"] == "First comment" async def test_list_issue_comments_issue_not_found( self, db_session: AsyncSession ) -> None: """list_issue_comments returns issue_not_found for unknown issue number.""" repo = await _make_repo(db_session) await db_session.commit() result = await execute_list_issue_comments(repo.repo_id, 999) assert not result.ok assert result.error_code == "issue_not_found" async def test_list_issue_comments_empty( self, db_session: AsyncSession ) -> None: """list_issue_comments returns empty list when no comments exist.""" repo = await _make_repo(db_session) await _make_issue(db_session, repo.repo_id, number=1) await db_session.commit() result = await execute_list_issue_comments(repo.repo_id, 1) assert result.ok assert result.data["total"] == 0 assert result.data["comments"] == [] async def test_update_release_happy(self, db_session: AsyncSession) -> None: """update_release mutates title and body, returns updated data.""" repo = await _make_repo(db_session) await _make_release(db_session, repo.repo_id, tag="v2.0.0") await db_session.commit() result = await execute_update_release( repo.repo_id, "v2.0.0", title="Updated Title", body="New notes." ) assert result.ok assert result.data["title"] == "Updated Title" assert result.data["body"] == "New notes." assert result.data["tag"] == "v2.0.0" async def test_update_release_not_found( self, db_session: AsyncSession ) -> None: """update_release returns release_not_found for unknown tag.""" repo = await _make_repo(db_session) await db_session.commit() result = await execute_update_release(repo.repo_id, "v99.0.0", title="X") assert not result.ok assert result.error_code == "release_not_found" async def test_list_release_assets_empty( self, db_session: AsyncSession ) -> None: """list_release_assets returns empty list when no assets attached.""" repo = await _make_repo(db_session) await _make_release(db_session, repo.repo_id, tag="v1.1.0") await db_session.commit() result = await execute_list_release_assets(repo.repo_id, "v1.1.0") assert result.ok assert result.data["total"] == 0 assert result.data["assets"] == [] async def test_list_release_assets_not_found( self, db_session: AsyncSession ) -> None: """list_release_assets returns release_not_found for unknown tag.""" repo = await _make_repo(db_session) await db_session.commit() result = await execute_list_release_assets(repo.repo_id, "v0.0.0") assert not result.ok assert result.error_code == "release_not_found" async def test_read_user_profile_happy( self, db_session: AsyncSession ) -> None: """read_user_profile returns profile data for a known user.""" await _make_identity(db_session, handle="carol") await db_session.commit() result = await execute_read_user_profile("carol") assert result.ok assert result.data["username"] == "carol" assert "bio" in result.data assert "pinned_repo_ids" in result.data async def test_read_user_profile_not_found( self, db_session: AsyncSession ) -> None: """read_user_profile returns user_not_found for unknown handle.""" await db_session.commit() result = await execute_read_user_profile("nobody-xyz-123") assert not result.ok assert result.error_code == "user_not_found" async def test_update_user_profile_happy( self, db_session: AsyncSession ) -> None: """update_user_profile writes bio and returns updated data.""" await _make_identity(db_session, handle="dave") await db_session.commit() result = await execute_update_user_profile( username="dave", bio="Hello world", actor="dave" ) assert result.ok assert result.data["bio"] == "Hello world" assert result.data["username"] == "dave" async def test_update_user_profile_not_found( self, db_session: AsyncSession ) -> None: """update_user_profile returns user_not_found for unknown handle.""" await db_session.commit() result = await execute_update_user_profile( username="ghost", bio="Hi", actor="ghost" ) assert not result.ok assert result.error_code == "user_not_found" async def test_list_topics_empty(self, db_session: AsyncSession) -> None: """list_topics returns empty list when no public repos have tags.""" await db_session.commit() result = await execute_list_topics() assert result.ok assert "topics" in result.data async def test_list_topics_aggregates( self, db_session: AsyncSession ) -> None: """list_topics counts tags across public repos and orders by frequency.""" await _make_repo(db_session, tags=["jazz", "piano"]) await _make_repo(db_session, tags=["jazz", "drums"]) await _make_repo(db_session, tags=["piano"]) await db_session.commit() result = await execute_list_topics() assert result.ok names = [t["name"] for t in result.data["topics"]] # "jazz" appears 2×, "piano" appears 2×, "drums" appears 1× assert "jazz" in names assert "piano" in names # Most frequent tags come first counts = {t["name"]: t["repo_count"] for t in result.data["topics"]} assert counts["jazz"] == 2 assert counts["piano"] == 2 assert counts["drums"] == 1 async def test_list_topics_with_query_filter( self, db_session: AsyncSession ) -> None: """list_topics respects substring query filter.""" await _make_repo(db_session, tags=["jazz", "electronic"]) await db_session.commit() result = await execute_list_topics(query="jazz") assert result.ok names = [t["name"] for t in result.data["topics"]] assert "jazz" in names assert "electronic" not in names async def test_set_repo_topics_happy( self, db_session: AsyncSession ) -> None: """set_repo_topics replaces tags on the repo.""" repo = await _make_repo(db_session, tags=["old-tag"]) await db_session.commit() result = await execute_set_repo_topics(repo.repo_id, ["new-tag", "another"]) assert result.ok assert result.data["topics"] == ["new-tag", "another"] async def test_set_repo_topics_not_found( self, db_session: AsyncSession ) -> None: """set_repo_topics returns repo_not_found for unknown repo.""" await db_session.commit() result = await execute_set_repo_topics(_uid(), ["tag"]) assert not result.ok assert result.error_code == "repo_not_found" async def test_list_webhook_deliveries_happy( self, db_session: AsyncSession ) -> None: """list_webhook_deliveries returns delivery records for a webhook.""" repo = await _make_repo(db_session) hook = await _make_webhook(db_session, repo.repo_id) delivery = await _make_delivery(db_session, hook.webhook_id) await db_session.commit() result = await execute_list_webhook_deliveries( repo.repo_id, hook.webhook_id ) assert result.ok assert result.data["total"] == 1 assert result.data["deliveries"][0]["delivery_id"] == delivery.delivery_id async def test_list_webhook_deliveries_repo_not_found( self, db_session: AsyncSession ) -> None: """list_webhook_deliveries returns repo_not_found for unknown repo.""" await db_session.commit() result = await execute_list_webhook_deliveries(_uid(), _uid()) assert not result.ok assert result.error_code == "repo_not_found" # ── Tier 3 E2E ─────────────────────────────────────────────────────────────── @pytest.mark.asyncio class TestE2E: """Tier 3: Full round-trip through MCP dispatcher (light smoke).""" async def test_list_issue_comments_returns_ok_shape( self, db_session: AsyncSession ) -> None: """end-to-end: result has expected shape keys.""" repo = await _make_repo(db_session) await _make_issue(db_session, repo.repo_id, number=1) await db_session.commit() result = await execute_list_issue_comments(repo.repo_id, 1, limit=10) assert result.ok assert "comments" in result.data assert "total" in result.data assert "next_cursor" in result.data async def test_read_user_profile_returns_ok_shape( self, db_session: AsyncSession ) -> None: """end-to-end: result has all expected profile keys.""" await _make_identity(db_session, handle="eve") await db_session.commit() result = await execute_read_user_profile("eve") assert result.ok expected_keys = { "username", "display_name", "bio", "avatar_url", "location", "website_url", "social_url", "pinned_repo_ids", "created_at", } assert expected_keys.issubset(result.data.keys()) async def test_list_topics_returns_ok_shape( self, db_session: AsyncSession ) -> None: """end-to-end: topics result has correct shape.""" await db_session.commit() result = await execute_list_topics(limit=5) assert result.ok assert "total" in result.data assert "topics" in result.data # ── Tier 4 Stress ──────────────────────────────────────────────────────────── @pytest.mark.asyncio class TestStress: """Tier 4: High-volume sequential calls.""" async def test_list_issue_comments_50_comments( self, db_session: AsyncSession ) -> None: """50 comments are returned correctly without truncation.""" repo = await _make_repo(db_session) issue = await _make_issue(db_session, repo.repo_id, number=1) for i in range(50): db_session.add(_make_comment( issue.issue_id, repo.repo_id, body=f"Comment {i}", seq=i, )) await db_session.commit() result = await execute_list_issue_comments(repo.repo_id, 1, limit=100) assert result.ok assert result.data["total"] == 50 assert len(result.data["comments"]) == 50 async def test_list_topics_20_repos(self, db_session: AsyncSession) -> None: """20 repos with distinct tags are all aggregated.""" for i in range(20): await _make_repo(db_session, tags=[f"genre-{i}", "common"]) await db_session.commit() result = await execute_list_topics(limit=100) assert result.ok names = [t["name"] for t in result.data["topics"]] # "common" appears in all 20 repos assert "common" in names common_entry = next(t for t in result.data["topics"] if t["name"] == "common") assert common_entry["repo_count"] == 20 # ── Tier 5 Data Integrity ───────────────────────────────────────────────────── @pytest.mark.asyncio class TestDataIntegrity: """Tier 5: Mutations persist and are readable via read-back calls.""" async def test_update_release_persists( self, db_session: AsyncSession ) -> None: """Updated release title survives a fresh read.""" repo = await _make_repo(db_session) await _make_release(db_session, repo.repo_id, tag="v3.0.0") await db_session.commit() await execute_update_release( repo.repo_id, "v3.0.0", title="Persistent Title" ) # Read back via service layer from musehub.services import musehub_releases from musehub.db.database import AsyncSessionLocal async with AsyncSessionLocal() as s: rel = await musehub_releases.get_release_by_tag(s, repo.repo_id, "v3.0.0") assert rel is not None assert rel.title == "Persistent Title" async def test_update_user_profile_persists( self, db_session: AsyncSession ) -> None: """Updated bio survives a fresh read via read_user_profile.""" await _make_identity(db_session, handle="frank") await db_session.commit() await execute_update_user_profile( username="frank", bio="Persistent bio", actor="frank" ) result = await execute_read_user_profile("frank") assert result.ok assert result.data["bio"] == "Persistent bio" async def test_set_repo_topics_persists( self, db_session: AsyncSession ) -> None: """Topics set via set_repo_topics are returned in list_topics.""" repo = await _make_repo(db_session) await db_session.commit() await execute_set_repo_topics(repo.repo_id, ["synth", "ambient"]) result = await execute_list_topics() assert result.ok names = [t["name"] for t in result.data["topics"]] assert "synth" in names assert "ambient" in names async def test_list_issue_comments_pagination_cursor( self, db_session: AsyncSession ) -> None: """Cursor pagination correctly pages through comments.""" repo = await _make_repo(db_session) issue = await _make_issue(db_session, repo.repo_id, number=1) for i in range(10): db_session.add(_make_comment( issue.issue_id, repo.repo_id, body=f"Comment {i:02d}", seq=i, )) await db_session.commit() page1 = await execute_list_issue_comments(repo.repo_id, 1, limit=6) assert page1.ok assert len(page1.data["comments"]) == 6 assert page1.data["next_cursor"] is not None page2 = await execute_list_issue_comments( repo.repo_id, 1, limit=6, cursor=page1.data["next_cursor"] ) assert page2.ok assert len(page2.data["comments"]) <= 6 # ── Tier 6 Security ─────────────────────────────────────────────────────────── @pytest.mark.asyncio class TestSecurity: """Tier 6: Auth and permission guards.""" async def test_update_user_profile_actor_mismatch( self, db_session: AsyncSession ) -> None: """Actor != username returns forbidden before DB access.""" await _make_identity(db_session, handle="heidi") await db_session.commit() result = await execute_update_user_profile( username="heidi", bio="Hacked", actor="mallory" ) assert not result.ok assert result.error_code == "forbidden" async def test_update_user_profile_empty_actor_allowed( self, db_session: AsyncSession ) -> None: """Empty actor string skips the actor-mismatch guard (unauthenticated context).""" await _make_identity(db_session, handle="ivan") await db_session.commit() # actor="" means "no authentication context supplied" — the guard only # fires when actor is a non-empty string that doesn't match username. result = await execute_update_user_profile( username="ivan", bio="No auth", actor="" ) assert result.ok async def test_list_issue_comments_wrong_repo( self, db_session: AsyncSession ) -> None: """Issue number that exists in a different repo returns issue_not_found.""" repo_a = await _make_repo(db_session) repo_b = await _make_repo(db_session) await _make_issue(db_session, repo_a.repo_id, number=1) await db_session.commit() # Issue #1 belongs to repo_a, querying repo_b must fail result = await execute_list_issue_comments(repo_b.repo_id, 1) assert not result.ok assert result.error_code == "issue_not_found" async def test_set_repo_topics_unknown_repo_rejected( self, db_session: AsyncSession ) -> None: """set_repo_topics for a non-existent repo_id returns repo_not_found.""" await db_session.commit() result = await execute_set_repo_topics("non-existent-id", ["tag"]) assert not result.ok assert result.error_code == "repo_not_found" # ── Tier 7 Performance ──────────────────────────────────────────────────────── @pytest.mark.asyncio class TestPerformance: """Tier 7: Wall-clock timing assertions.""" async def test_list_issue_comments_under_300ms( self, db_session: AsyncSession ) -> None: """execute_list_issue_comments completes in under 300 ms.""" repo = await _make_repo(db_session) issue = await _make_issue(db_session, repo.repo_id, number=1) for i in range(20): db_session.add(_make_comment( issue.issue_id, repo.repo_id, body=f"Perf comment {i}", seq=i, )) await db_session.commit() start = time.monotonic() result = await execute_list_issue_comments(repo.repo_id, 1, limit=100) elapsed = time.monotonic() - start assert result.ok assert elapsed < 0.3, f"took {elapsed:.3f}s" async def test_list_topics_under_300ms( self, db_session: AsyncSession ) -> None: """execute_list_topics completes in under 300 ms.""" for i in range(10): await _make_repo(db_session, tags=[f"tag-{i}"]) await db_session.commit() start = time.monotonic() result = await execute_list_topics() elapsed = time.monotonic() - start assert result.ok assert elapsed < 0.3, f"took {elapsed:.3f}s" async def test_read_user_profile_under_200ms( self, db_session: AsyncSession ) -> None: """execute_read_user_profile completes in under 200 ms.""" await _make_identity(db_session, handle="perftest") await db_session.commit() start = time.monotonic() result = await execute_read_user_profile("perftest") elapsed = time.monotonic() - start assert result.ok assert elapsed < 0.2, f"took {elapsed:.3f}s" # ── Tier 8 Docstrings ───────────────────────────────────────────────────────── class TestDocstrings: """Tier 8: All 9 new executor functions must have docstrings.""" _FUNCTIONS = [ execute_list_issue_comments, execute_update_release, execute_list_release_assets, execute_read_user_profile, execute_update_user_profile, execute_list_topics, execute_set_repo_topics, execute_list_webhook_deliveries, execute_redeliver_webhook, ] @pytest.mark.parametrize("fn", _FUNCTIONS, ids=lambda f: f.__name__) def test_has_docstring(self, fn: MagicMock) -> None: """Every new executor function has a non-empty docstring.""" doc = inspect.getdoc(fn) assert doc, f"{fn.__name__} is missing a docstring" assert len(doc) > 20, f"{fn.__name__} docstring is too short: {doc!r}" @pytest.mark.parametrize("fn", _FUNCTIONS, ids=lambda f: f.__name__) def test_docstring_mentions_args(self, fn: MagicMock) -> None: """Docstrings mention at least one parameter in an Args section.""" doc = inspect.getdoc(fn) or "" assert "Args:" in doc or "Returns:" in doc, ( f"{fn.__name__} docstring missing Args/Returns sections" )