"""Section 14 — MCP Read Tools: 7-layer test suite. Covers ``musehub/services/musehub_mcp_executor.py`` executor functions and their wiring through the MCP dispatcher (``musehub/mcp/dispatcher.py``). Read tools under test: execute_browse_repo, execute_list_branches, execute_list_commits, execute_read_file, execute_get_analysis, execute_search, execute_read_commit, execute_compare, execute_whoami, execute_search_repos, execute_get_repo, execute_list_repos and the helpers: _mime_for_path, _check_db_available, MusehubToolResult Seven layers: Layer 1 Unit: - _mime_for_path: known MIDI extension, .webp custom, unknown → octet-stream, .py - _check_db_available: factory=None → db_unavailable result - MusehubToolResult: ok=True / ok=False shape invariants - execute_get_analysis: invalid dimension returns immediately (no DB touch) - execute_search: invalid mode returns immediately - execute_whoami: user_id=None → authenticated=False immediately Layer 2 Integration: - execute_browse_repo: existing repo → ok=True with repo/branches/commits keys - execute_browse_repo: unknown repo_id → ok=False, error_code=not_found - execute_list_branches: existing repo → branch list returned - execute_list_branches: unknown repo → not_found - execute_list_commits: commits returned newest-first, branch filter, limit clamp - execute_read_file: known object → ok=True, mime resolved - execute_read_file: unknown object_id → not_found - execute_read_file: unknown repo → not_found - execute_read_commit: known commit → ok=True - execute_read_commit: unknown commit → not_found - execute_get_analysis: overview / commits / objects dimensions - execute_search: path mode / commit mode case-insensitive - execute_compare: ok=True with diff shape - execute_whoami: with user_id → authenticated=True Layer 3 E2E (HTTP tools/call): - musehub_list_branches: isError=False, content is valid JSON - musehub_list_branches unknown repo → isError=True - musehub_list_commits with limit - musehub_search invalid mode → isError=True - musehub_get_commit not found → isError=True - musehub_whoami anonymous → authenticated=False - musehub_get_analysis invalid dimension → isError=True - owner+slug transparent resolution Layer 4 Stress: - 50 commits → list_commits returns all 50 - 30 objects, search returns matching subset Layer 5 Data Integrity: - browse_repo response shape: all required top-level keys - list_commits newest-first ordering - read_file mime_type resolved per extension - search path mode case-insensitive - search commit mode case-insensitive - get_analysis overview has all required fields Layer 6 Security: - execute_search_repos only returns public repos - execute_whoami with None → authenticated=False (no data leakage) - write tool via HTTP without auth → isError=True Layer 7 Performance: - 1000× _mime_for_path under 10 ms - execute_browse_repo on populated repo under 200 ms - execute_get_analysis overview under 200 ms """ from __future__ import annotations import json import secrets import time from datetime import datetime, timezone import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id, long_id from musehub.core.genesis import compute_branch_id, compute_collaborator_id, compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubObject, MusehubObjectRef, MusehubRepo from musehub.main import app from musehub.types.json_types import JSONObject from musehub.services.musehub_mcp_executor import ( MusehubToolResult, _check_db_available, _mime_for_path, execute_browse_repo, execute_compare, execute_get_analysis, execute_read_commit, execute_list_branches, execute_list_commits, execute_read_file, execute_search, execute_whoami, ) # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def anyio_backend() -> str: return "asyncio" @pytest_asyncio.fixture async def http_client(db_session: AsyncSession) -> AsyncClient: async with AsyncClient( transport=ASGITransport(app=app), base_url="http://localhost", ) as c: yield c # ── Helpers ─────────────────────────────────────────────────────────────────── def _uid() -> str: return secrets.token_hex(16) async def _repo( session: AsyncSession, slug: str, visibility: str = "public", owner: str = "alice", ) -> MusehubRepo: from datetime import datetime, timezone created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(owner.encode()) repo_id = compute_repo_id(owner_id, slug, "code", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility=visibility, owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() await session.refresh(repo) return repo async def _commit( session: AsyncSession, repo_id: str, branch: str = "main", message: str = "add track", author: str = "alice", ts: datetime | None = None, ) -> MusehubCommit: c = MusehubCommit( commit_id=fake_id(f"{repo_id}{branch}{message}{_uid()[:8]}"), branch=branch, parent_ids=[], message=message, author=author, timestamp=ts or datetime.now(tz=timezone.utc), ) session.add(c) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=c.commit_id)) await session.flush() return c async def _branch( session: AsyncSession, repo_id: str, name: str, head_commit_id: str, ) -> MusehubBranch: b = MusehubBranch( branch_id=compute_branch_id(repo_id, name), repo_id=repo_id, name=name, head_commit_id=head_commit_id, ) session.add(b) await session.flush() return b async def _object( session: AsyncSession, repo_id: str, path: str, size_bytes: int = 1024, ) -> MusehubObject: oid = long_id(_uid()[:32]) obj = MusehubObject( object_id=oid, path=path, size_bytes=size_bytes, ) session.add(obj) session.add(MusehubObjectRef(repo_id=repo_id, object_id=oid)) await session.flush() return obj def _tools_call(name: str, arguments: JSONObject) -> JSONObject: return {"jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": name, "arguments": arguments}} def _unwrap_tool_text(text: str) -> str: """Strip wrapper tags added by the dispatcher.""" text = text.strip() if text.startswith(""): text = text[len(""):].strip() if text.endswith(""): text = text[: -len("")].strip() return text async def _init_session(http_client: AsyncClient) -> str: """POST initialize and return the session_id.""" resp = await http_client.post( "/mcp", json={ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-11-25", "clientInfo": {"name": "test", "version": "1.0"}, "capabilities": {}, }, }, headers={"Content-Type": "application/json"}, ) return resp.headers["mcp-session-id"] # ── Layer 1 — Unit ──────────────────────────────────────────────────────────── class TestUnitMimeForPath: def test_midi_extension(self) -> None: mime = _mime_for_path("tracks/song.mid") assert mime == "audio/midi" def test_webp_custom_extension(self) -> None: assert _mime_for_path("image.webp") == "image/webp" def test_unknown_extension_returns_octet_stream(self) -> None: assert _mime_for_path("artifact.xyz123") == "application/octet-stream" def test_python_extension(self) -> None: assert "python" in _mime_for_path("script.py").lower() def test_no_extension_returns_octet_stream(self) -> None: assert _mime_for_path("noextension") == "application/octet-stream" def test_case_insensitive_extension(self) -> None: upper = _mime_for_path("TRACK.WEBP") lower = _mime_for_path("track.webp") assert upper == lower class TestUnitCheckDbAvailable: def test_factory_none_returns_error(self) -> None: from musehub.db import database original = database._async_session_factory try: setattr(database, '_async_session_factory', None) result = _check_db_available() assert result is not None assert result.ok is False assert result.error_code == "db_unavailable" assert result.error_message is not None finally: database._async_session_factory = original def test_factory_set_returns_none(self, db_session: AsyncSession) -> None: """With db_session fixture active, factory is set — check returns None.""" result = _check_db_available() assert result is None class TestUnitMusehubToolResult: def test_ok_true_shape(self) -> None: r = MusehubToolResult(ok=True, data={"repo_id": "abc"}) assert r.ok is True assert r.data == {"repo_id": "abc"} assert r.error_code is None assert r.error_message is None def test_ok_false_shape(self) -> None: r = MusehubToolResult( ok=False, error_code="not_found", error_message="Repo not found.", ) assert r.ok is False assert r.error_code == "not_found" assert "not found" in r.error_message.lower() assert r.data == {} class TestUnitValidationWithoutDB: async def test_get_analysis_invalid_dimension(self, db_session: AsyncSession) -> None: result = await execute_get_analysis("any-repo-id", dimension="music") assert result.ok is False assert result.error_code == "invalid_args" assert "music" in (result.error_message or "") async def test_search_invalid_mode(self, db_session: AsyncSession) -> None: result = await execute_search("any-repo-id", query="bass", mode="regex") assert result.ok is False assert result.error_code == "invalid_args" assert "regex" in (result.error_message or "") async def test_whoami_anonymous(self) -> None: """execute_whoami with None returns authenticated=False without hitting DB.""" result = await execute_whoami(None) assert result.ok is True assert result.data["authenticated"] is False assert result.data["user_id"] is None # ── Layer 2 — Integration ───────────────────────────────────────────────────── class TestIntegrationBrowseRepo: async def test_existing_repo_returns_ok(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "browse-ok") c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await db_session.commit() result = await execute_browse_repo(r.repo_id) assert result.ok is True assert "repo" in result.data assert "branches" in result.data assert "recent_commits" in result.data assert result.data["branch_count"] == 1 async def test_unknown_repo_returns_not_found(self, db_session: AsyncSession) -> None: result = await execute_browse_repo("nonexistent-repo-id") assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationListBranches: async def test_returns_branches(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "lb-ok") c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await _branch(db_session, r.repo_id, "dev", c.commit_id) await db_session.commit() result = await execute_list_branches(r.repo_id) assert result.ok is True assert result.data["branch_count"] == 2 names = [b["name"] for b in result.data["branches"]] assert "main" in names assert "dev" in names async def test_unknown_repo_returns_not_found(self, db_session: AsyncSession) -> None: result = await execute_list_branches("ghost-repo") assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationListCommits: async def test_returns_commits(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "lc-ok") for i in range(5): await _commit(db_session, r.repo_id, message=f"commit {i}") await db_session.commit() result = await execute_list_commits(r.repo_id, limit=10) assert result.ok is True assert result.data["returned"] == 5 async def test_branch_filter(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "lc-branch") await _commit(db_session, r.repo_id, branch="main", message="on main") await _commit(db_session, r.repo_id, branch="dev", message="on dev") await db_session.commit() result = await execute_list_commits(r.repo_id, branch="main", limit=10) assert result.ok is True commits = result.data["commits"] assert all(c["branch"] == "main" for c in commits) async def test_limit_clamped_high(self, db_session: AsyncSession) -> None: """Limit values over 100 are clamped to 100.""" r = await _repo(db_session, "lc-clamp-hi") for _ in range(5): await _commit(db_session, r.repo_id) await db_session.commit() # limit=200 should clamp to 100 but still return all 5 result = await execute_list_commits(r.repo_id, limit=200) assert result.ok is True assert result.data["returned"] == 5 async def test_limit_clamped_low(self, db_session: AsyncSession) -> None: """Limit values below 1 are clamped to 1.""" r = await _repo(db_session, "lc-clamp-lo") for _ in range(5): await _commit(db_session, r.repo_id) await db_session.commit() result = await execute_list_commits(r.repo_id, limit=0) assert result.ok is True assert result.data["returned"] == 1 async def test_unknown_repo(self, db_session: AsyncSession) -> None: result = await execute_list_commits("ghost-lc") assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationReadFile: async def test_known_object_returns_metadata(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "rf-ok") obj = await _object(db_session, r.repo_id, "tracks/bass.mid", size_bytes=4096) await db_session.commit() result = await execute_read_file(r.repo_id, obj.object_id) assert result.ok is True assert result.data["object_id"] == obj.object_id assert result.data["path"] == "tracks/bass.mid" assert result.data["size_bytes"] == 4096 assert "midi" in result.data["mime_type"].lower() async def test_unknown_object_returns_not_found(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "rf-no-obj") await db_session.commit() result = await execute_read_file(r.repo_id, "sha256:deadbeef") assert result.ok is False assert result.error_code == "file_not_found" async def test_unknown_repo_returns_not_found(self, db_session: AsyncSession) -> None: result = await execute_read_file("ghost-repo", "sha256:anything") assert result.ok is False assert result.error_code == "repo_not_found" class TestIntegrationGetCommit: async def test_known_commit_returns_data(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "gc-ok") c = await _commit(db_session, r.repo_id, message="feature: harmony") await db_session.commit() result = await execute_read_commit(r.repo_id, c.commit_id) assert result.ok is True assert result.data["commit_id"] == c.commit_id assert result.data["message"] == "feature: harmony" assert result.data["author"] == "alice" async def test_unknown_commit_returns_not_found(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "gc-miss") await db_session.commit() result = await execute_read_commit(r.repo_id, "nonexistent-commit-id") assert result.ok is False assert result.error_code == "commit_not_found" class TestIntegrationGetAnalysis: async def test_overview_dimension(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "ga-overview") c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await _object(db_session, r.repo_id, "track.mid") await db_session.commit() result = await execute_get_analysis(r.repo_id, dimension="overview") assert result.ok is True d = result.data assert d["dimension"] == "overview" assert d["branch_count"] == 1 assert d["commit_count"] >= 1 assert d["object_count"] == 1 async def test_commits_dimension(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "ga-commits") await _commit(db_session, r.repo_id, branch="main", author="alice") await _commit(db_session, r.repo_id, branch="dev", author="bob") await db_session.commit() result = await execute_get_analysis(r.repo_id, dimension="commits") assert result.ok is True d = result.data assert d["dimension"] == "commits" assert "by_branch" in d assert "by_author" in d assert d["by_author"].get("alice", 0) >= 1 assert d["by_author"].get("bob", 0) >= 1 async def test_objects_dimension(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "ga-objects") await _object(db_session, r.repo_id, "a.mid", size_bytes=100) await _object(db_session, r.repo_id, "b.mid", size_bytes=200) await db_session.commit() result = await execute_get_analysis(r.repo_id, dimension="blobs") assert result.ok is True d = result.data assert d["dimension"] == "blobs" assert d["total_blobs"] == 2 assert d["total_size_bytes"] == 300 class TestIntegrationSearch: async def test_path_mode_returns_matching_objects(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "s-path") await _object(db_session, r.repo_id, "tracks/jazz_bass.mid") await _object(db_session, r.repo_id, "tracks/treble.mid") await db_session.commit() result = await execute_search(r.repo_id, "jazz", mode="path") assert result.ok is True assert result.data["result_count"] == 1 assert result.data["results"][0]["path"] == "tracks/jazz_bass.mid" async def test_path_mode_case_insensitive(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "s-ci-path") await _object(db_session, r.repo_id, "JAZZ_TRACK.mid") await db_session.commit() result = await execute_search(r.repo_id, "jazz", mode="path") assert result.ok is True assert result.data["result_count"] == 1 async def test_commit_mode_returns_matching_commits(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "s-commit") await _commit(db_session, r.repo_id, message="add bass groove") await _commit(db_session, r.repo_id, message="fix tempo sync") await db_session.commit() result = await execute_search(r.repo_id, "bass", mode="commit") assert result.ok is True assert result.data["result_count"] == 1 assert "bass" in result.data["results"][0]["message"].lower() async def test_commit_mode_case_insensitive(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "s-ci-commit") await _commit(db_session, r.repo_id, message="Add BASS line") await db_session.commit() result = await execute_search(r.repo_id, "bass", mode="commit") assert result.ok is True assert result.data["result_count"] == 1 class TestIntegrationCompare: async def test_compare_returns_diff_shape(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "compare-ok") ca = await _commit(db_session, r.repo_id, branch="main") cb = await _commit(db_session, r.repo_id, branch="dev") await db_session.commit() result = await execute_compare(r.repo_id, base_ref="main", head_ref="dev") assert result.ok is True assert result.data["base_ref"] == "main" assert result.data["head_ref"] == "dev" assert "base_commit_id" in result.data assert "head_commit_id" in result.data class TestIntegrationWhoami: async def test_authenticated_user_returns_data(self, db_session: AsyncSession) -> None: result = await execute_whoami("uid-test-user") assert result.ok is True assert result.data["authenticated"] is True assert result.data["user_id"] == "uid-test-user" # ── Layer 3 — End-to-End ────────────────────────────────────────────────────── class TestE2EReadTools: async def test_list_branches_returns_valid_json_content( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: r = await _repo(db_session, "e2e-lb") c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await db_session.commit() resp = await http_client.post( "/mcp", json=_tools_call("musehub_list_branches", {"repo_id": r.repo_id}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 data = resp.json() assert data["result"]["isError"] is False text = _unwrap_tool_text(data["result"]["content"][0]["text"]) payload = json.loads(text) assert "branches" in payload async def test_list_branches_unknown_repo_returns_iserror( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_list_branches", {"repo_id": "ghost-e2e"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 data = resp.json() assert data["result"]["isError"] is True error = json.loads(data["result"]["content"][0]["text"]) assert error["error_code"] == "repo_not_found" async def test_list_commits_with_limit( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: r = await _repo(db_session, "e2e-lc") for _ in range(10): await _commit(db_session, r.repo_id) await db_session.commit() resp = await http_client.post( "/mcp", json=_tools_call("musehub_list_commits", {"repo_id": r.repo_id, "limit": 5}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 result = resp.json()["result"] assert result["isError"] is False payload = json.loads(_unwrap_tool_text(result["content"][0]["text"])) assert payload["returned"] <= 5 async def test_search_invalid_mode_returns_iserror( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: r = await _repo(db_session, "e2e-s-mode") await db_session.commit() resp = await http_client.post( "/mcp", json=_tools_call("musehub_search", {"repo_id": r.repo_id, "query": "x", "mode": "invalid"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 assert resp.json()["result"]["isError"] is True async def test_get_commit_not_found_returns_iserror( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: r = await _repo(db_session, "e2e-gc") await db_session.commit() resp = await http_client.post( "/mcp", json=_tools_call("musehub_get_commit", {"repo_id": r.repo_id, "commit_id": "ghost-commit"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 assert resp.json()["result"]["isError"] is True async def test_whoami_anonymous_returns_not_authenticated( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_whoami", {}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 result = resp.json()["result"] assert result["isError"] is False payload = json.loads(_unwrap_tool_text(result["content"][0]["text"])) assert payload["authenticated"] is False async def test_get_analysis_invalid_dimension_returns_iserror( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: r = await _repo(db_session, "e2e-ga-bad") await db_session.commit() resp = await http_client.post( "/mcp", json=_tools_call("musehub_get_analysis", {"repo_id": r.repo_id, "dimension": "music"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 assert resp.json()["result"]["isError"] is True async def test_unknown_tool_returns_iserror( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: resp = await http_client.post( "/mcp", json=_tools_call("musehub_no_such_tool", {}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 assert resp.json()["result"]["isError"] is True # ── Layer 4 — Stress ────────────────────────────────────────────────────────── class TestStressReadTools: async def test_50_commits_all_returned(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "stress-commits") for i in range(50): await _commit(db_session, r.repo_id, message=f"commit {i}") await db_session.commit() result = await execute_list_commits(r.repo_id, limit=100) assert result.ok is True assert result.data["returned"] == 50 async def test_30_objects_search_returns_subset(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "stress-search") # 20 matching objects + 10 non-matching for i in range(20): await _object(db_session, r.repo_id, f"jazz/track_{i}.mid") for i in range(10): await _object(db_session, r.repo_id, f"blues/track_{i}.mid") await db_session.commit() result = await execute_search(r.repo_id, "jazz", mode="path") assert result.ok is True assert result.data["result_count"] == 20 # ── Layer 5 — Data Integrity ────────────────────────────────────────────────── class TestDataIntegrityBrowseRepo: async def test_response_has_all_required_keys(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "di-browse") c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await db_session.commit() result = await execute_browse_repo(r.repo_id) assert result.ok is True for key in ("repo", "branches", "recent_commits", "total_commits", "branch_count"): assert key in result.data, f"Missing key: {key}" async def test_repo_sub_dict_has_required_fields(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "di-browse-repo") c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await db_session.commit() result = await execute_browse_repo(r.repo_id) repo_data = result.data["repo"] for field in ("repo_id", "name", "visibility", "owner_user_id", "created_at"): assert field in repo_data, f"Missing repo field: {field}" class TestDataIntegrityCommitOrdering: async def test_commits_newest_first(self, db_session: AsyncSession) -> None: from datetime import timedelta r = await _repo(db_session, "di-order") base = datetime.now(tz=timezone.utc) for i in range(5): await _commit( db_session, r.repo_id, message=f"commit {i}", ts=base + timedelta(seconds=i), ) await db_session.commit() result = await execute_list_commits(r.repo_id, limit=10) commits = result.data["commits"] timestamps = [c["timestamp"] for c in commits] assert timestamps == sorted(timestamps, reverse=True) class TestDataIntegrityReadFileMime: async def test_webp_mime_resolved(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "di-mime-webp") obj = await _object(db_session, r.repo_id, "roll.webp") await db_session.commit() result = await execute_read_file(r.repo_id, obj.object_id) assert result.ok is True assert result.data["mime_type"] == "image/webp" async def test_midi_mime_resolved(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "di-mime-mid") obj = await _object(db_session, r.repo_id, "track.mid") await db_session.commit() result = await execute_read_file(r.repo_id, obj.object_id) assert result.ok is True assert "midi" in result.data["mime_type"].lower() class TestDataIntegrityGetAnalysisOverview: async def test_overview_has_all_required_fields(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "di-ga-overview") c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await db_session.commit() result = await execute_get_analysis(r.repo_id, dimension="overview") assert result.ok is True for field in ("repo_id", "dimension", "repo_name", "visibility", "branch_count", "commit_count", "object_count"): assert field in result.data, f"Missing field: {field}" # ── Layer 6 — Security ──────────────────────────────────────────────────────── class TestSecurityReadTools: async def test_whoami_anonymous_returns_no_user_data(self) -> None: """Anonymous whoami must not leak user info.""" result = await execute_whoami(None) assert result.ok is True assert result.data["authenticated"] is False assert result.data["user_id"] is None # Must not contain any other fields that could leak data. assert "repo_count" not in result.data async def test_search_repos_only_returns_public_repos( self, db_session: AsyncSession ) -> None: """execute_search_repos must never return private repos.""" await _repo(db_session, "sec-public", visibility="public") await _repo(db_session, "sec-private", visibility="private") await db_session.commit() from musehub.services.musehub_mcp_executor import execute_search_repos result = await execute_search_repos(query="sec", limit=50) assert result.ok is True names = [r["name"] for r in result.data["repos"]] assert "sec-private" not in names async def test_write_tool_via_mcp_requires_auth( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: """musehub_create_repo (write tool) called without auth returns 401.""" resp = await http_client.post( "/mcp", json=_tools_call("musehub_create_repo", {"name": "should-fail", "owner": "alice"}), headers={"Content-Type": "application/json"}, ) assert resp.status_code == 401 async def test_read_file_unknown_repo_does_not_crash(self, db_session: AsyncSession) -> None: """Unknown repo must return not_found, never raise an exception.""" result = await execute_read_file("completely-made-up-id", "sha256:x") assert not result.ok assert result.error_code == "repo_not_found" # ── Layer 7 — Performance ───────────────────────────────────────────────────── class TestPerformanceMimeResolution: def test_1000_mime_resolutions_under_10ms(self) -> None: paths = [ "track.mid", "cover.webp", "audio.mp3", "script.py", "unknown.xyz", "noext", "deep/path/to/file.mid", ] start = time.perf_counter() for i in range(1000): _mime_for_path(paths[i % len(paths)]) elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 10, f"1000× _mime_for_path took {elapsed_ms:.1f} ms" class TestPerformanceExecutors: async def test_browse_repo_under_200ms(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "perf-browse") for _ in range(10): c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) for _ in range(5): await _object(db_session, r.repo_id, f"track_{_}.mid") await db_session.commit() start = time.perf_counter() result = await execute_browse_repo(r.repo_id) elapsed_ms = (time.perf_counter() - start) * 1000 assert result.ok is True assert elapsed_ms < 500, f"execute_browse_repo took {elapsed_ms:.1f} ms" async def test_get_analysis_overview_under_200ms(self, db_session: AsyncSession) -> None: r = await _repo(db_session, "perf-analysis") for _ in range(20): c = await _commit(db_session, r.repo_id) await _branch(db_session, r.repo_id, "main", c.commit_id) await db_session.commit() start = time.perf_counter() result = await execute_get_analysis(r.repo_id, dimension="overview") elapsed_ms = (time.perf_counter() - start) * 1000 assert result.ok is True assert elapsed_ms < 200, f"execute_get_analysis(overview) took {elapsed_ms:.1f} ms" # ── execute_get_repo ────────────────────────────────────────────────────────── class TestExecuteGetRepo: """Unit and integration tests for execute_get_repo.""" async def test_get_repo_by_repo_id(self, db_session: AsyncSession) -> None: """Returns repo metadata when resolved by repo_id.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "get-by-id") await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id) assert result.ok is True assert result.data["repo_id"] == r.repo_id assert result.data["slug"] == "get-by-id" assert result.data["owner"] == "alice" async def test_get_repo_by_owner_slug(self, db_session: AsyncSession) -> None: """Returns repo metadata when resolved by owner+slug.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "get-by-slug") await db_session.commit() result = await execute_get_repo(owner="alice", slug="get-by-slug") assert result.ok is True assert result.data["repo_id"] == r.repo_id assert result.data["name"] == "get-by-slug" async def test_get_repo_returns_expected_fields(self, db_session: AsyncSession) -> None: """Result data contains all documented fields.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "field-check") await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id) assert result.ok is True for field in ("repo_id", "name", "owner", "slug", "visibility", "description", "tags", "default_branch", "clone_url", "created_at", "updated_at", "pushed_at"): assert field in result.data, f"missing field: {field}" async def test_get_repo_not_found_returns_error(self, db_session: AsyncSession) -> None: """Unknown repo_id returns ok=False with repo_not_found error code.""" from musehub.services.musehub_mcp_executor import execute_get_repo result = await execute_get_repo(repo_id="00000000-0000-0000-0000-000000000000") assert result.ok is False assert result.error_code == "repo_not_found" async def test_get_repo_missing_args_returns_invalid(self, db_session: AsyncSession) -> None: """Calling with no identifier returns invalid_args, not a crash.""" from musehub.services.musehub_mcp_executor import execute_get_repo result = await execute_get_repo() assert result.ok is False assert result.error_code == "invalid_args" async def test_get_repo_private_accessible_by_owner(self, db_session: AsyncSession) -> None: """Owner can access their own private repo via execute_get_repo.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "priv-repo", visibility="private", owner="alice") await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id, actor="alice") assert result.ok is True assert result.data["visibility"] == "private" async def test_get_repo_via_mcp_dispatcher( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: """musehub_get_repo is dispatched correctly through the MCP endpoint.""" r = await _repo(db_session, "dispatch-get-repo") await db_session.commit() session_id = await _init_session(http_client) resp = await http_client.post( "/mcp", json=_tools_call("musehub_get_repo", {"repo_id": r.repo_id}), headers={"Content-Type": "application/json", "mcp-session-id": session_id}, ) assert resp.status_code == 200 body = resp.json() assert body["result"]["isError"] is False data = json.loads(_unwrap_tool_text(body["result"]["content"][0]["text"])) assert data["slug"] == "dispatch-get-repo" async def test_get_repo_via_mcp_owner_slug_dispatch( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: """musehub_get_repo resolves by owner+slug through the dispatcher.""" r = await _repo(db_session, "owner-slug-dispatch") await db_session.commit() session_id = await _init_session(http_client) resp = await http_client.post( "/mcp", json=_tools_call("musehub_get_repo", {"owner": "alice", "slug": "owner-slug-dispatch"}), headers={"Content-Type": "application/json", "mcp-session-id": session_id}, ) assert resp.status_code == 200 body = resp.json() assert body["result"]["isError"] is False data = json.loads(_unwrap_tool_text(body["result"]["content"][0]["text"])) assert data["repo_id"] == r.repo_id # ── execute_list_repos ──────────────────────────────────────────────────────── class TestExecuteListRepos: """Unit and integration tests for execute_list_repos.""" async def test_list_repos_returns_owned_repos(self, db_session: AsyncSession) -> None: """Returns repos owned by the actor.""" from musehub.services.musehub_mcp_executor import execute_list_repos await _repo(db_session, "owned-1", owner="bob") await _repo(db_session, "owned-2", owner="bob") await db_session.commit() result = await execute_list_repos(actor="bob") assert result.ok is True slugs = [r["slug"] for r in result.data["repos"]] assert "owned-1" in slugs assert "owned-2" in slugs async def test_list_repos_empty_for_unknown_user(self, db_session: AsyncSession) -> None: """Returns ok=True with empty list for a user with no repos.""" from musehub.services.musehub_mcp_executor import execute_list_repos result = await execute_list_repos(actor="nobody-at-all") assert result.ok is True assert result.data["repos"] == [] assert result.data["total"] == 0 async def test_list_repos_requires_actor(self, db_session: AsyncSession) -> None: """Empty actor returns forbidden error.""" from musehub.services.musehub_mcp_executor import execute_list_repos result = await execute_list_repos(actor="") assert result.ok is False assert result.error_code == "forbidden" async def test_list_repos_returns_expected_fields(self, db_session: AsyncSession) -> None: """Each repo in the list has all documented fields.""" from musehub.services.musehub_mcp_executor import execute_list_repos await _repo(db_session, "fields-repo", owner="carol") await db_session.commit() result = await execute_list_repos(actor="carol") assert result.ok is True assert len(result.data["repos"]) >= 1 repo = result.data["repos"][0] for field in ("repo_id", "name", "owner", "slug", "visibility", "description", "tags", "default_branch", "created_at", "pushed_at"): assert field in repo, f"missing field: {field}" async def test_list_repos_respects_limit(self, db_session: AsyncSession) -> None: """limit parameter caps the number of repos returned.""" from musehub.services.musehub_mcp_executor import execute_list_repos for i in range(5): await _repo(db_session, f"limit-repo-{i}", owner="dave") await db_session.commit() result = await execute_list_repos(actor="dave", limit=2) assert result.ok is True assert len(result.data["repos"]) <= 2 async def test_list_repos_next_cursor_when_more(self, db_session: AsyncSession) -> None: """next_cursor is set when there are more repos beyond the page.""" from musehub.services.musehub_mcp_executor import execute_list_repos for i in range(5): await _repo(db_session, f"cursor-repo-{i}", owner="eve") await db_session.commit() result = await execute_list_repos(actor="eve", limit=2) assert result.ok is True assert result.data["next_cursor"] is not None async def test_list_repos_no_cursor_on_last_page(self, db_session: AsyncSession) -> None: """next_cursor is None when the page is the last one.""" from musehub.services.musehub_mcp_executor import execute_list_repos await _repo(db_session, "only-repo", owner="frank") await db_session.commit() result = await execute_list_repos(actor="frank", limit=100) assert result.ok is True assert result.data["next_cursor"] is None async def test_list_repos_does_not_return_other_users_repos( self, db_session: AsyncSession ) -> None: """Repos owned by other users are not visible.""" from musehub.services.musehub_mcp_executor import execute_list_repos await _repo(db_session, "grace-repo", owner="grace") await _repo(db_session, "other-repo", owner="other-person") await db_session.commit() result = await execute_list_repos(actor="grace") assert result.ok is True owners = {r["owner"] for r in result.data["repos"]} assert "other-person" not in owners async def test_list_repos_via_mcp_dispatcher( self, http_client: AsyncClient, db_session: AsyncSession ) -> None: """musehub_list_repos is dispatched and auth is threaded correctly.""" session_id = await _init_session(http_client) resp = await http_client.post( "/mcp", json=_tools_call("musehub_list_repos", {"limit": 10}), headers={"Content-Type": "application/json", "mcp-session-id": session_id}, ) assert resp.status_code == 200 body = resp.json() # Unauthenticated MCP session → actor is empty → forbidden assert body["result"]["isError"] is True error = json.loads(body["result"]["content"][0]["text"]) assert error["error_code"] == "forbidden" # ── Security, integrity, and stress tests ───────────────────────────────────── class TestGetRepoVisibilityEnforcement: """execute_get_repo enforces visibility for private repos.""" async def test_private_repo_accessible_by_owner(self, db_session: AsyncSession) -> None: """Owner can read their own private repo.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "owner-priv", visibility="private", owner="alice") await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id, actor="alice") assert result.ok is True assert result.data["slug"] == "owner-priv" async def test_private_repo_denied_to_non_owner(self, db_session: AsyncSession) -> None: """Non-owner gets repo_not_found (not forbidden) for private repo — no existence leak.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "secret-repo", visibility="private", owner="alice") await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id, actor="bob") assert result.ok is False assert result.error_code == "repo_not_found" async def test_private_repo_denied_to_unauthenticated(self, db_session: AsyncSession) -> None: """Unauthenticated caller (actor='') cannot read a private repo.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "anon-denied", visibility="private", owner="alice") await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id, actor="") assert result.ok is False assert result.error_code == "repo_not_found" async def test_private_repo_accessible_by_collaborator(self, db_session: AsyncSession) -> None: """Accepted collaborator can read a private repo.""" from musehub.services.musehub_mcp_executor import execute_get_repo from musehub.db.musehub_collaborator_models import MusehubCollaborator from datetime import timezone r = await _repo(db_session, "collab-priv", visibility="private", owner="alice") _at = datetime.now(tz=timezone.utc) collab = MusehubCollaborator( id=compute_collaborator_id(r.repo_id, compute_identity_id(b"bob"), _at.isoformat()), repo_id=r.repo_id, identity_handle="bob", permission="read", accepted_at=_at, ) db_session.add(collab) await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id, actor="bob") assert result.ok is True assert result.data["slug"] == "collab-priv" async def test_private_repo_denied_to_pending_collaborator(self, db_session: AsyncSession) -> None: """Invited-but-not-accepted collaborator cannot read a private repo.""" from musehub.services.musehub_mcp_executor import execute_get_repo from musehub.db.musehub_collaborator_models import MusehubCollaborator r = await _repo(db_session, "pending-collab", visibility="private", owner="alice") _invited = datetime.now(tz=timezone.utc) collab = MusehubCollaborator( id=compute_collaborator_id(r.repo_id, compute_identity_id(b"carol"), _invited.isoformat()), repo_id=r.repo_id, identity_handle="carol", permission="read", accepted_at=None, # not yet accepted ) db_session.add(collab) await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id, actor="carol") assert result.ok is False assert result.error_code == "repo_not_found" async def test_public_repo_accessible_without_auth(self, db_session: AsyncSession) -> None: """Public repo is readable by any caller including unauthenticated.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "public-open", visibility="public", owner="alice") await db_session.commit() result = await execute_get_repo(repo_id=r.repo_id, actor="") assert result.ok is True assert result.data["visibility"] == "public" async def test_private_repo_by_owner_slug_denied_to_non_owner(self, db_session: AsyncSession) -> None: """Visibility check applies to owner+slug resolution path too.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "slug-priv", visibility="private", owner="alice") await db_session.commit() result = await execute_get_repo(owner="alice", slug="slug-priv", actor="eve") assert result.ok is False assert result.error_code == "repo_not_found" async def test_visibility_error_message_does_not_reveal_existence(self, db_session: AsyncSession) -> None: """Error message for private repo access denial is same as repo_not_found — no oracle.""" from musehub.services.musehub_mcp_executor import execute_get_repo r = await _repo(db_session, "oracle-test", visibility="private", owner="alice") await db_session.commit() # Existing private repo — denied to non-owner result_denied = await execute_get_repo(repo_id=r.repo_id, actor="eve") # Non-existent repo result_missing = await execute_get_repo(repo_id="00000000-0000-0000-0000-000000000099") assert result_denied.error_code == result_missing.error_code assert result_denied.error_message == result_missing.error_message class TestListReposComprehensive: """Comprehensive tests for execute_list_repos — collaboration, soft-delete, pagination.""" async def test_list_repos_includes_collaboration_repos(self, db_session: AsyncSession) -> None: """Repos the actor collaborates on (but doesn't own) appear in the list.""" from musehub.services.musehub_mcp_executor import execute_list_repos from musehub.db.musehub_collaborator_models import MusehubCollaborator from datetime import timezone r = await _repo(db_session, "collab-listed", owner="alice") _listed_at = datetime.now(tz=timezone.utc) collab = MusehubCollaborator( id=compute_collaborator_id(r.repo_id, compute_identity_id(b"bob"), _listed_at.isoformat()), repo_id=r.repo_id, identity_handle="bob", permission="write", accepted_at=_listed_at, ) db_session.add(collab) await db_session.commit() result = await execute_list_repos(actor="bob") assert result.ok is True slugs = [repo["slug"] for repo in result.data["repos"]] assert "collab-listed" in slugs async def test_list_repos_excludes_pending_collab(self, db_session: AsyncSession) -> None: """Repos where the invitation is not yet accepted do NOT appear.""" from musehub.services.musehub_mcp_executor import execute_list_repos from musehub.db.musehub_collaborator_models import MusehubCollaborator r = await _repo(db_session, "pending-invisible", owner="alice") _pending_at = datetime.now(tz=timezone.utc) collab = MusehubCollaborator( id=compute_collaborator_id(r.repo_id, compute_identity_id(b"carol"), _pending_at.isoformat()), repo_id=r.repo_id, identity_handle="carol", permission="read", accepted_at=None, ) db_session.add(collab) await db_session.commit() result = await execute_list_repos(actor="carol") assert result.ok is True slugs = [repo["slug"] for repo in result.data["repos"]] assert "pending-invisible" not in slugs async def test_list_repos_excludes_deleted(self, db_session: AsyncSession) -> None: """Hard-deleted repos do not appear in the list.""" from musehub.services.musehub_mcp_executor import execute_list_repos r_live = await _repo(db_session, "live-repo", owner="dave") r_dead = await _repo(db_session, "deleted-repo", owner="dave") await db_session.delete(r_dead) await db_session.flush() await db_session.commit() result = await execute_list_repos(actor="dave") assert result.ok is True slugs = [r["slug"] for r in result.data["repos"]] assert "live-repo" in slugs assert "deleted-repo" not in slugs async def test_list_repos_total_excludes_deleted(self, db_session: AsyncSession) -> None: """total count does not include hard-deleted repos.""" from musehub.services.musehub_mcp_executor import execute_list_repos for i in range(3): await _repo(db_session, f"count-live-{i}", owner="eve") r_dead = await _repo(db_session, "count-dead", owner="eve") await db_session.delete(r_dead) await db_session.flush() await db_session.commit() result = await execute_list_repos(actor="eve") assert result.ok is True assert result.data["total"] == 3 async def test_list_repos_pagination_stress(self, db_session: AsyncSession) -> None: """Paginating through 120 repos returns all without duplicates or misses.""" from musehub.services.musehub_mcp_executor import execute_list_repos for i in range(120): await _repo(db_session, f"stress-{i:03d}", owner="frank") await db_session.commit() all_repos: list[dict] = [] cursor: str | None = None pages = 0 while True: result = await execute_list_repos(actor="frank", limit=20, cursor=cursor) assert result.ok is True batch = result.data["repos"] all_repos.extend(batch) pages += 1 cursor = result.data["next_cursor"] if cursor is None: break assert len(all_repos) == 120, f"Expected 120, got {len(all_repos)}" ids = [r["repo_id"] for r in all_repos] assert len(ids) == len(set(ids)), "Duplicate repos found across pages" # With 120 repos and page size 20, the last full page sets a cursor; the # subsequent empty page terminates iteration. Accept 6 or 7 pages. assert pages <= 7, f"Unexpected page count: {pages}" async def test_list_repos_cursor_is_stable_across_requests(self, db_session: AsyncSession) -> None: """Re-using the same cursor yields the same next page.""" from musehub.services.musehub_mcp_executor import execute_list_repos for i in range(30): await _repo(db_session, f"stable-{i:02d}", owner="grace") await db_session.commit() page1 = await execute_list_repos(actor="grace", limit=10) cursor = page1.data["next_cursor"] assert cursor is not None page2a = await execute_list_repos(actor="grace", limit=10, cursor=cursor) page2b = await execute_list_repos(actor="grace", limit=10, cursor=cursor) assert page2a.data["repos"] == page2b.data["repos"] async def test_list_repos_malformed_cursor_returns_first_page(self, db_session: AsyncSession) -> None: """A malformed cursor is silently ignored and returns from the first page.""" from musehub.services.musehub_mcp_executor import execute_list_repos for i in range(5): await _repo(db_session, f"cursor-test-{i}", owner="heidi") await db_session.commit() result = await execute_list_repos(actor="heidi", cursor="not-a-timestamp") assert result.ok is True assert len(result.data["repos"]) == 5 async def test_list_repos_limit_clamped_at_100(self, db_session: AsyncSession) -> None: """limit values above 100 are clamped to 100.""" from musehub.services.musehub_mcp_executor import execute_list_repos for i in range(5): await _repo(db_session, f"clamp-{i}", owner="ivan") await db_session.commit() result = await execute_list_repos(actor="ivan", limit=9999) assert result.ok is True # Only 5 repos exist; result set is smaller than the clamped limit assert len(result.data["repos"]) == 5