gabriel / musehub public
mcp.py python
625 lines 23.9 KB
Raw
sha256:3c58668648c7323bb9f5c6881cfe6a3f14fc93fcb73b537d253732952a5bf8bf chore: bump version to 0.2.0rc12 Sonnet 4.6 patch 8 days ago
1 """MuseHub MCP — Full Streamable HTTP transport (MCP 2025-11-25).
2
3 Implements the complete Streamable HTTP transport spec:
4
5 POST /mcp — client → server messages (requests, notifications, responses).
6 - Returns ``application/json`` for most requests (resources, prompts, ping).
7 - Returns ``text/event-stream`` when a tool needs SSE (elicitation, progress).
8 - Returns 202 Accepted for notifications (no body).
9 - Issues ``Mcp-Session-Id`` header on successful ``initialize``.
10 - Validates ``Mcp-Session-Id`` on all subsequent requests.
11 - Validates ``MCP-Protocol-Version`` header on non-initialize requests.
12 - Validates ``Origin`` header to prevent DNS-rebinding attacks.
13
14 GET /mcp — server → client SSE push channel.
15 - Opens a persistent ``text/event-stream`` for server-initiated messages.
16 - Supports ``Last-Event-ID`` for reconnection replay.
17 - Injects heartbeat comments every 15 s to keep proxies alive.
18
19 DELETE /mcp — client-initiated session termination.
20 - Returns 200 on success, 404 if session unknown.
21
22 Auth model:
23 - ``Authorization: MSign handle="..." ts=... sig="..."`` → authenticated; handle from MSign context.
24 - No token → anonymous; read-only tools work, write tools return isError=true.
25 - Invalid signature → 401.
26
27 Security:
28 - Origin header validated on all POST/GET/DELETE requests.
29 - Allowed origins configured via ``MUSEHUB_ALLOWED_ORIGINS`` env var
30 (comma-separated). Defaults to localhost in dev mode.
31 """
32
33 import json
34 import logging
35 import os
36 from collections.abc import AsyncIterator
37 from urllib.parse import urlparse
38
39 from fastapi import APIRouter, Depends, Request, Response
40 from musehub.auth.request_signing import MSignContext, optional_signed_request
41 from fastapi.responses import JSONResponse, StreamingResponse
42 from musehub.rate_limits import limiter, MCP_LIMIT
43
44 from musehub.types.json_types import JSONObject, JSONValue
45 from musehub.mcp.dispatcher import handle_batch, handle_request
46 from musehub.mcp.tools import MUSEHUB_WRITE_TOOL_NAMES
47 from musehub.mcp.session import (
48 MCPSession,
49 SessionCapacityError,
50 create_session,
51 delete_session,
52 get_session,
53 push_to_session,
54 register_sse_queue,
55 resolve_elicitation,
56 )
57 from musehub.mcp.sse import SSE_CONTENT_TYPE, heartbeat_stream, sse_heartbeat
58
59 logger = logging.getLogger(__name__)
60
61 router = APIRouter(tags=["MCP"])
62
63 _PROTOCOL_VERSION = "2025-11-25"
64
65 # ── Origin validation ─────────────────────────────────────────────────────────
66
67 _ALLOWED_ORIGINS: frozenset[str] = frozenset(
68 o.strip()
69 for o in os.environ.get(
70 "MUSEHUB_ALLOWED_ORIGINS",
71 "http://localhost,http://127.0.0.1,https://musehub.ai",
72 ).split(",")
73 if o.strip()
74 )
75
76 _ALWAYS_ALLOW_ORIGINS: frozenset[str] = frozenset({
77 "http://localhost",
78 "http://127.0.0.1",
79 })
80
81
82 def _validate_origin(request: Request) -> bool:
83 """Return True if the request origin is allowed.
84
85 Per the Streamable HTTP spec, servers MUST validate Origin to prevent
86 DNS-rebinding attacks. Requests without an Origin header are allowed
87 (e.g. curl, Postman, stdio bridge tools).
88 """
89 origin = request.headers.get("Origin")
90 if origin is None:
91 return True # Non-browser clients don't send Origin.
92
93 # Normalise: strip path component, keep scheme+host+port.
94 try:
95 parsed = urlparse(origin)
96 normalised = f"{parsed.scheme}://{parsed.netloc}"
97 except Exception:
98 return False
99
100 return normalised in _ALLOWED_ORIGINS or normalised in _ALWAYS_ALLOW_ORIGINS
101
102
103 # ── Auth helper ───────────────────────────────────────────────────────────────
104
105
106
107 # ── POST /mcp ─────────────────────────────────────────────────────────────────
108
109
110 @router.post(
111 "/mcp",
112 operation_id="mcpEndpoint",
113 summary="MCP Streamable HTTP — POST endpoint (2025-11-25)",
114 include_in_schema=True,
115 )
116 @limiter.limit(MCP_LIMIT)
117 async def mcp_post(
118 request: Request,
119 ctx: MSignContext | None = Depends(optional_signed_request),
120 ) -> Response:
121 """MCP Streamable HTTP POST endpoint.
122
123 Handles all client→server JSON-RPC messages. Returns ``application/json``
124 for most requests, ``text/event-stream`` when the response requires SSE
125 (tool calls that use elicitation or progress streaming), and
126 ``202 Accepted`` for notifications.
127
128 The ``initialize`` method issues an ``Mcp-Session-Id`` response header.
129 All subsequent requests must include that header.
130 """
131 # ── Security: Origin validation ───────────────────────────────────────────
132 if not _validate_origin(request):
133 return JSONResponse(
134 status_code=403,
135 content={
136 "jsonrpc": "2.0",
137 "id": None,
138 "error": {"code": -32600, "message": "Forbidden: invalid Origin header"},
139 },
140 )
141
142 # ── Auth ──────────────────────────────────────────────────────────────────
143 # ctx is None for anonymous requests (no MSign header or invalid signature
144 # that optional_signed_request let through as None). If a MSign header IS
145 # present but ctx is None, the signature failed — return 401.
146 auth_header = request.headers.get("Authorization", "")
147 if auth_header.startswith("MSign ") and ctx is None:
148 return JSONResponse(
149 status_code=401,
150 content={"error": "Invalid MSign signature."},
151 headers={"WWW-Authenticate": 'MSign realm="musehub"'},
152 )
153 user_id: str | None = ctx.handle if ctx else None
154 is_agent: bool = ctx.is_agent if ctx else False
155 agent_name: str | None = (ctx.handle if ctx.is_agent else None) if ctx else None
156
157 # ── Parse body ────────────────────────────────────────────────────────────
158 try:
159 body = await request.body()
160 raw = json.loads(body)
161 except (json.JSONDecodeError, ValueError) as exc:
162 return JSONResponse(
163 status_code=400,
164 content={
165 "jsonrpc": "2.0",
166 "id": None,
167 "error": {"code": -32700, "message": f"Parse error: {exc}"},
168 },
169 )
170
171 # ── Determine if this is initialize or a subsequent request ──────────────
172 # For batches, use the method of the first request.
173 def _first_method(r: JSONObject | list[JSONObject]) -> str | None:
174 if isinstance(r, dict):
175 m = r.get("method")
176 return m if isinstance(m, str) else None
177 if isinstance(r, list) and r:
178 return r[0].get("method") if isinstance(r[0], dict) else None
179 return None
180
181 first_method = _first_method(raw)
182 is_initialize = first_method == "initialize"
183
184 # ── Session management ────────────────────────────────────────────────────
185 session: MCPSession | None = None
186 session_id_header = request.headers.get("Mcp-Session-Id")
187
188 if not is_initialize:
189 # Validate MCP-Protocol-Version on non-initialize requests.
190 proto_ver = request.headers.get("MCP-Protocol-Version")
191 if proto_ver and proto_ver not in ("2025-11-25", "2025-03-26"):
192 return JSONResponse(
193 status_code=400,
194 content={
195 "jsonrpc": "2.0",
196 "id": None,
197 "error": {
198 "code": -32600,
199 "message": f"Unsupported MCP-Protocol-Version: {proto_ver!r}",
200 },
201 },
202 )
203
204 if session_id_header:
205 session = get_session(session_id_header)
206 if session is None:
207 # Session expired or unknown → client must re-initialize.
208 # NOTE: multi-worker deployments using in-process session storage
209 # require sticky sessions at the load-balancer level so that all
210 # requests for a given Mcp-Session-Id land on the same worker.
211 # See nginx upstream hash $http_mcp_session_id consistent for
212 # a production-ready solution.
213 return JSONResponse(
214 status_code=404,
215 content={
216 "jsonrpc": "2.0",
217 "id": None,
218 "error": {
219 "code": -32600,
220 "message": "Session not found. Send a new InitializeRequest without Mcp-Session-Id.",
221 },
222 },
223 )
224
225 # ── Write tool re-verification ────────────────────────────────────────────
226 # Session identity is established at initialize, but a stolen session ID
227 # must not allow write access. For every tools/call that targets a write
228 # tool, require a fresh MSign signature on *this* request (ctx != None).
229 # Read tools, resource reads, prompts, and notifications pass through.
230 if not is_initialize and isinstance(raw, dict) and raw.get("method") == "tools/call":
231 params = raw.get("params")
232 tool_name = params.get("name") if isinstance(params, dict) else None
233 if tool_name in MUSEHUB_WRITE_TOOL_NAMES and ctx is None:
234 req_id = raw.get("id")
235 return JSONResponse(
236 status_code=401,
237 content={
238 "jsonrpc": "2.0",
239 "id": req_id,
240 "error": {
241 "code": -32001,
242 "message": (
243 "Write tools require a fresh MSign signature on each request. "
244 "Include Authorization: MSign ... on this call."
245 ),
246 },
247 },
248 headers={"WWW-Authenticate": 'MSign realm="musehub"'},
249 )
250
251 # ── Check if this is an elicitation response (client→server) ─────────────
252 # When the client sends the result of an elicitation/create back, it's a
253 # JSON-RPC *response* (has "result" or "error" but no "method"). Route it
254 # to the session's pending Future resolver.
255 if session and isinstance(raw, dict) and "method" not in raw and "id" in raw:
256 req_id = raw.get("id")
257 if "result" in raw and req_id is not None:
258 resolved = resolve_elicitation(session, req_id, raw["result"])
259 if resolved:
260 return Response(status_code=202)
261 # Unknown response — ignore per spec.
262 return Response(status_code=202)
263
264 # ── Dispatch ──────────────────────────────────────────────────────────────
265 try:
266 if isinstance(raw, list):
267 responses = await handle_batch(
268 raw, user_id=user_id, session=session,
269 is_agent=is_agent, agent_name=agent_name,
270 )
271 if not responses:
272 return Response(status_code=202)
273 return JSONResponse(content=responses)
274
275 elif isinstance(raw, dict):
276 # Detect if tool needs SSE streaming (elicitation tools set this).
277 needs_sse = _method_needs_sse(raw) and session is not None
278
279 if needs_sse and session is not None:
280 return _make_sse_tool_response(
281 raw, user_id=user_id, session=session,
282 is_agent=is_agent, agent_name=agent_name,
283 )
284
285 resp = await handle_request(
286 raw, user_id=user_id, session=session,
287 is_agent=is_agent, agent_name=agent_name,
288 )
289 if resp is None:
290 return Response(status_code=202)
291
292 # Attach Mcp-Session-Id on initialize.
293 if is_initialize:
294 try:
295 new_session = _create_session_from_initialize(raw, user_id)
296 except SessionCapacityError as cap_exc:
297 logger.warning("MCP session capacity exceeded: %s", cap_exc)
298 return JSONResponse(
299 status_code=503,
300 content={
301 "jsonrpc": "2.0",
302 "id": None,
303 "error": {
304 "code": -32000,
305 "message": str(cap_exc),
306 },
307 },
308 headers={"Retry-After": "5"},
309 )
310 return JSONResponse(
311 content=resp,
312 headers={"Mcp-Session-Id": new_session.session_id},
313 )
314
315 return JSONResponse(content=resp)
316
317 else:
318 return JSONResponse(
319 status_code=400,
320 content={
321 "jsonrpc": "2.0",
322 "id": None,
323 "error": {
324 "code": -32600,
325 "message": "Request must be an object or array",
326 },
327 },
328 )
329
330 except Exception as exc:
331 logger.exception("Unhandled error in POST /mcp: %s", exc)
332 return JSONResponse(
333 status_code=500,
334 content={
335 "jsonrpc": "2.0",
336 "id": None,
337 "error": {"code": -32603, "message": f"Internal error: {exc}"},
338 },
339 )
340
341
342 # ── GET /mcp ──────────────────────────────────────────────────────────────────
343
344
345 @router.get(
346 "/mcp",
347 operation_id="mcpSseStream",
348 summary="MCP Streamable HTTP — GET SSE push channel (2025-11-25)",
349 include_in_schema=True,
350 )
351 async def mcp_get(request: Request) -> Response:
352 """Open a persistent SSE stream for server-initiated messages.
353
354 The client MUST include ``Accept: text/event-stream`` and a valid
355 ``Mcp-Session-Id``. Returns 405 if SSE is not accepted.
356
357 Supports ``Last-Event-ID`` header for reconnection and event replay.
358 Heartbeat comments are sent every 15 s to keep proxies alive.
359
360 Server-initiated messages delivered on this stream:
361 - ``notifications/progress`` — tool progress updates.
362 - ``elicitation/create`` — requests for user input.
363 - ``notifications/elicitation/complete`` — URL mode completion signals.
364 """
365 if not _validate_origin(request):
366 return Response(status_code=403)
367
368 accept = request.headers.get("Accept", "")
369 if "text/event-stream" not in accept:
370 return Response(status_code=405, content="SSE requires Accept: text/event-stream")
371
372 session_id = request.headers.get("Mcp-Session-Id")
373 if not session_id:
374 return JSONResponse(
375 status_code=400,
376 content={"error": "Mcp-Session-Id header required for GET /mcp SSE stream"},
377 )
378
379 session = get_session(session_id)
380 if session is None:
381 return Response(status_code=404, content="Session not found or expired")
382
383 last_event_id = request.headers.get("Last-Event-ID")
384
385 async def event_generator() -> AsyncIterator[str]:
386 async for event_text in heartbeat_stream(
387 register_sse_queue(session, last_event_id),
388 interval_seconds=15.0,
389 ):
390 yield event_text
391
392 return StreamingResponse(
393 event_generator(),
394 media_type=SSE_CONTENT_TYPE,
395 headers={
396 "Cache-Control": "no-cache",
397 "X-Accel-Buffering": "no", # Disable nginx buffering.
398 },
399 )
400
401
402 # ── DELETE /mcp ───────────────────────────────────────────────────────────────
403
404
405 @router.delete(
406 "/mcp",
407 operation_id="mcpDeleteSession",
408 summary="MCP Streamable HTTP — DELETE session (2025-11-25)",
409 include_in_schema=True,
410 )
411 async def mcp_delete(request: Request) -> Response:
412 """Client-initiated session termination.
413
414 Closes all open SSE streams for the session and cancels any pending
415 elicitation Futures. Returns 200 on success, 404 if unknown.
416 """
417 if not _validate_origin(request):
418 return Response(status_code=403)
419
420 session_id = request.headers.get("Mcp-Session-Id")
421 if not session_id:
422 return JSONResponse(
423 status_code=400,
424 content={"error": "Mcp-Session-Id header required"},
425 )
426
427 deleted = delete_session(session_id)
428 if not deleted:
429 return Response(status_code=404, content="Session not found")
430
431 logger.info("MCP session terminated by client: %.8s...", session_id)
432 return Response(status_code=200)
433
434
435 # ── GET /mcp/docs.json ────────────────────────────────────────────────────────
436
437
438 @router.get(
439 "/mcp/docs.json",
440 operation_id="mcpDocsJson",
441 summary="MCP capability manifest — machine-readable JSON",
442 include_in_schema=True,
443 )
444 async def mcp_docs_json() -> JSONResponse:
445 """Return a machine-readable JSON manifest of all MCP capabilities.
446
447 This endpoint is the programmatic complement to ``GET /mcp/docs``.
448 AI agents and tool integrators can fetch this to discover:
449 - The full tool catalogue (names, descriptions, input schemas)
450 - All static and templated resources (URIs, names, descriptions)
451 - All available prompts (names, descriptions, arguments)
452 - Server info and protocol version
453
454 No authentication required — this is intentionally public so agents
455 can bootstrap without prior credentials.
456 """
457 from musehub.mcp.tools import MCP_TOOLS
458 from musehub.mcp.resources import STATIC_RESOURCES, RESOURCE_TEMPLATES
459 from musehub.mcp.prompts import PROMPT_CATALOGUE
460
461 tools_out = [
462 {k: v for k, v in t.items() if k != "server_side"}
463 for t in MCP_TOOLS
464 ]
465 resources_out = [
466 {
467 "uri": r.get("uri"),
468 "name": r.get("name"),
469 "description": r.get("description"),
470 "mimeType": r.get("mimeType"),
471 }
472 for r in STATIC_RESOURCES
473 ]
474 templates_out = [
475 {
476 "uriTemplate": t.get("uriTemplate"),
477 "name": t.get("name"),
478 "description": t.get("description"),
479 "mimeType": t.get("mimeType"),
480 }
481 for t in RESOURCE_TEMPLATES
482 ]
483 prompts_out = [
484 {
485 "name": p["name"],
486 "description": p["description"],
487 "arguments": p.get("arguments", []),
488 }
489 for p in PROMPT_CATALOGUE
490 ]
491
492 return JSONResponse(
493 content={
494 "server": {
495 "name": "musehub-mcp",
496 "version": "0.2.0",
497 "protocolVersion": _PROTOCOL_VERSION,
498 "endpoint": "/mcp",
499 "docsUrl": "/mcp/docs",
500 },
501 "tools": tools_out,
502 "resources": resources_out,
503 "resourceTemplates": templates_out,
504 "prompts": prompts_out,
505 },
506 headers={"Cache-Control": "public, max-age=300"},
507 )
508
509
510 # ── GET /mcp/docs ─────────────────────────────────────────────────────────────
511
512
513 @router.get(
514 "/mcp/docs",
515 operation_id="mcpDocs",
516 summary="MCP reference — human-readable documentation page",
517 include_in_schema=True,
518 )
519 async def mcp_docs(request: Request) -> Response:
520 """Render a human-readable reference page for the MuseHub MCP server.
521
522 Lists all tools, resources, resource templates, and prompts with their
523 descriptions and input schemas. Also shows:
524 - Connection instructions (endpoint URL, auth model, protocol version)
525 - Agent onboarding quick-start guide
526 - Link to ``/mcp/docs.json`` for machine-readable access
527
528 No authentication required.
529 """
530 try:
531 from musehub.api.routes.musehub._templates import templates
532 from musehub.mcp.tools import MCP_TOOLS
533 from musehub.mcp.resources import STATIC_RESOURCES, RESOURCE_TEMPLATES
534 from musehub.mcp.prompts import PROMPT_CATALOGUE
535
536 ctx = {
537 "request": request,
538 "tools": MCP_TOOLS,
539 "static_resources": STATIC_RESOURCES,
540 "resource_templates": RESOURCE_TEMPLATES,
541 "prompts": PROMPT_CATALOGUE,
542 "protocol_version": _PROTOCOL_VERSION,
543 }
544 return templates.TemplateResponse(request, "musehub/pages/mcp_docs.html", ctx)
545 except Exception as exc:
546 logger.warning("MCP docs template missing, falling back to JSON redirect: %s", exc)
547 from fastapi.responses import RedirectResponse
548 return RedirectResponse(url="/mcp/docs.json")
549
550
551 # ── Helpers ───────────────────────────────────────────────────────────────────
552
553
554 def _create_session_from_initialize(
555 raw: JSONObject,
556 user_id: str | None,
557 ) -> MCPSession:
558 """Extract client capabilities from initialize params and create a session."""
559 params = raw.get("params") or {}
560 client_caps: JSONObject = {}
561 if isinstance(params, dict):
562 caps = params.get("capabilities")
563 if isinstance(caps, dict):
564 client_caps = caps
565 return create_session(user_id, client_capabilities=client_caps)
566
567
568 # Tools that may use elicitation or progress streaming (SSE required).
569 _SSE_TOOL_NAMES: frozenset[str] = frozenset({
570 "musehub_review_proposal_interactive",
571 "musehub_create_release_interactive",
572 })
573
574
575 def _method_needs_sse(raw: JSONObject) -> bool:
576 """Return True if this request should be streamed as SSE."""
577 if raw.get("method") != "tools/call":
578 return False
579 params = raw.get("params")
580 if not isinstance(params, dict):
581 return False
582 name = params.get("name")
583 return name in _SSE_TOOL_NAMES
584
585
586 def _make_sse_tool_response(
587 raw: JSONObject,
588 *,
589 user_id: str | None,
590 session: MCPSession,
591 is_agent: bool = False,
592 agent_name: str | None = None,
593 ) -> StreamingResponse:
594 """Return a StreamingResponse that runs the tool and streams results via SSE."""
595 from musehub.mcp.sse import sse_response, sse_notification
596
597 raw_id = raw.get("id")
598 req_id: str | int | None = raw_id if isinstance(raw_id, (str, int)) else None
599
600 async def generator() -> AsyncIterator[str]:
601 try:
602 result = await handle_request(
603 raw, user_id=user_id, session=session,
604 is_agent=is_agent, agent_name=agent_name,
605 )
606 if result is not None:
607 yield sse_response(req_id, result)
608 except Exception as exc:
609 logger.exception("SSE tool call error: %s", exc)
610 error_payload: JSONObject = {
611 "jsonrpc": "2.0",
612 "id": req_id,
613 "error": {"code": -32603, "message": str(exc)},
614 }
615 from musehub.mcp.sse import sse_event
616 yield sse_event(error_payload)
617
618 return StreamingResponse(
619 generator(),
620 media_type=SSE_CONTENT_TYPE,
621 headers={
622 "Cache-Control": "no-cache",
623 "X-Accel-Buffering": "no",
624 },
625 )
File History 1 commit
sha256:3c58668648c7323bb9f5c6881cfe6a3f14fc93fcb73b537d253732952a5bf8bf chore: bump version to 0.2.0rc12 Sonnet 4.6 patch 8 days ago