mcp-proxy.mjs
302 lines 9.6 KB
Raw
sha256:6f47d53a6adbcf105ba1b9cfc126c788d6a0f461d197f84f78794914305b4bd5 fix(mcp): bound hosted discovery context Human patch 16 hours ago
1 /**
2 * Issue #1 Phase D2 — MCP gateway proxy for hosted MCP.
3 * Express router that handles /mcp with JWT auth, session pool, rate limiting, and cleanup.
4 */
5
6 import { randomUUID } from 'node:crypto';
7 import express from 'express';
8 import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
9 import { createHostedMcpServer } from './mcp-hosted-server.mjs';
10
11 const DEFAULT_RATE_LIMIT = 60;
12 const RATE_WINDOW_MS = 60_000;
13 const CLEANUP_INTERVAL_MS = 60_000;
14
15 /**
16 * Default idle TTL for hosted MCP HTTP sessions (gateway RAM). Longer than the
17 * historical 30m default so Cursor does not lose tools/resources after a coffee break.
18 * Override with **`MCP_SESSION_TTL_MS`** (milliseconds), clamped 5m–24h.
19 */
20 const DEFAULT_MCP_SESSION_TTL_MS = 8 * 60 * 60 * 1000;
21 const MIN_MCP_SESSION_TTL_MS = 5 * 60 * 1000;
22 const MAX_MCP_SESSION_TTL_MS = 24 * 60 * 60 * 1000;
23
24 /**
25 * Max concurrent MCP sessions per user id before the oldest is evicted.
26 * Override with **`MCP_MAX_SESSIONS_PER_USER`**, clamped 2–20.
27 */
28 const DEFAULT_MCP_MAX_SESSIONS_PER_USER = 8;
29 const MIN_MCP_MAX_SESSIONS_PER_USER = 2;
30 const MAX_MCP_MAX_SESSIONS_PER_USER = 20;
31
32 /**
33 * @param {NodeJS.ProcessEnv} [env]
34 * @returns {number}
35 */
36 export function parseMcpSessionTtlMs(env = process.env) {
37 const raw = env.MCP_SESSION_TTL_MS;
38 if (raw == null || String(raw).trim() === '') return DEFAULT_MCP_SESSION_TTL_MS;
39 const n = parseInt(String(raw).trim(), 10);
40 if (!Number.isFinite(n)) return DEFAULT_MCP_SESSION_TTL_MS;
41 return Math.min(MAX_MCP_SESSION_TTL_MS, Math.max(MIN_MCP_SESSION_TTL_MS, n));
42 }
43
44 /**
45 * @param {NodeJS.ProcessEnv} [env]
46 * @returns {number}
47 */
48 export function parseMcpMaxSessionsPerUser(env = process.env) {
49 const raw = env.MCP_MAX_SESSIONS_PER_USER;
50 if (raw == null || String(raw).trim() === '') return DEFAULT_MCP_MAX_SESSIONS_PER_USER;
51 const n = parseInt(String(raw).trim(), 10);
52 if (!Number.isFinite(n)) return DEFAULT_MCP_MAX_SESSIONS_PER_USER;
53 return Math.min(MAX_MCP_MAX_SESSIONS_PER_USER, Math.max(MIN_MCP_MAX_SESSIONS_PER_USER, n));
54 }
55
56 /**
57 * Streamable HTTP session creation is only valid during JSON-RPC initialize.
58 * Sessionless follow-up calls such as tools/list must fail fast instead of
59 * triggering hosted bridge/canister context work before the SDK can reject them.
60 *
61 * @param {unknown} body
62 * @returns {boolean}
63 */
64 export function isMcpInitializeRequest(body) {
65 if (Array.isArray(body)) return body.some((item) => isMcpInitializeRequest(item));
66 return Boolean(body && typeof body === 'object' && body.method === 'initialize');
67 }
68
69 /**
70 * @typedef {{
71 * transport: StreamableHTTPServerTransport,
72 * server: import('@modelcontextprotocol/sdk/server/mcp.js').McpServer,
73 * userId: string,
74 * vaultId: string,
75 * lastActive: number,
76 * }} McpSession
77 */
78
79 /**
80 * Simple sliding-window rate limiter per user.
81 */
82 function createRateLimiter(maxReqs = DEFAULT_RATE_LIMIT, windowMs = RATE_WINDOW_MS) {
83 /** @type {Map<string, number[]>} */
84 const hits = new Map();
85
86 return (userId) => {
87 const now = Date.now();
88 const cutoff = now - windowMs;
89 let userHits = hits.get(userId) || [];
90 userHits = userHits.filter((t) => t > cutoff);
91 if (userHits.length >= maxReqs) return false;
92 userHits.push(now);
93 hits.set(userId, userHits);
94 return true;
95 };
96 }
97
98 /**
99 * Create the MCP proxy Express router.
100 *
101 * @param {{
102 * getUserId: (req: import('express').Request) => string | null,
103 * getHostedAccessContext: (req: import('express').Request) => Promise<Record<string, unknown> | null>,
104 * canisterUrl: string,
105 * canisterAuthSecret?: string,
106 * bridgeUrl: string,
107 * gatewayApiBaseUrl: (string|undefined) — public gateway base (no path), e.g. HUB_BASE_URL; enables hub_create_proposal.
108 * sessionSecret: string,
109 * rateLimit?: number,
110 * sessionTtlMs?: number,
111 * maxSessionsPerUser?: number,
112 * }} deps
113 * @returns {import('express').Router}
114 */
115 export function createMcpProxyRouter(deps) {
116 const {
117 getUserId,
118 getHostedAccessContext,
119 canisterUrl,
120 canisterAuthSecret,
121 bridgeUrl,
122 gatewayApiBaseUrl,
123 rateLimit = DEFAULT_RATE_LIMIT,
124 sessionTtlMs = parseMcpSessionTtlMs(),
125 maxSessionsPerUser = parseMcpMaxSessionsPerUser(),
126 } = deps;
127
128 const router = express.Router();
129
130 /** @type {Map<string, McpSession>} */
131 const sessions = new Map();
132
133 /** @type {Map<string, Set<string>>} */
134 const userSessions = new Map();
135
136 const checkRate = createRateLimiter(rateLimit);
137
138 router.use((req, res, next) => {
139 const uid = getUserId(req);
140 if (!uid) return res.status(401).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Unauthorized' } });
141 if (!checkRate(uid)) {
142 return res.status(429).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Rate limit exceeded' } });
143 }
144 req.mcpUserId = uid;
145 next();
146 });
147
148 async function getOrCreateSession(req, res) {
149 const sessionId = req.headers['mcp-session-id'];
150 if (sessionId) {
151 const existing = sessions.get(String(sessionId));
152 if (existing && existing.userId === req.mcpUserId) {
153 existing.lastActive = Date.now();
154 return existing;
155 }
156 }
157
158 if (!isMcpInitializeRequest(req.body)) {
159 res.status(404).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Session not found' } });
160 return null;
161 }
162
163 const uid = req.mcpUserId;
164 const userSessionIds = userSessions.get(uid) || new Set();
165 if (userSessionIds.size >= maxSessionsPerUser) {
166 let oldest = null;
167 let oldestTime = Infinity;
168 for (const sid of userSessionIds) {
169 const s = sessions.get(sid);
170 if (s && s.lastActive < oldestTime) {
171 oldest = sid;
172 oldestTime = s.lastActive;
173 }
174 }
175 if (oldest) destroySession(oldest);
176 }
177
178 const ctx = await getHostedAccessContext(req);
179 if (!ctx) {
180 res.status(403).json({ jsonrpc: '2.0', error: { code: -32600, message: 'No hosted access' } });
181 return null;
182 }
183
184 const vaultId = String(req.headers['x-vault-id'] || 'default');
185 const role = ctx.role || 'viewer';
186 const token = (req.headers.authorization || '').replace(/^Bearer\s+/i, '');
187 /** Match `gatewayProxyGetNotesList` / `proxyToCanister`: canister reads use effective workspace user when set. */
188 const canisterUserId =
189 typeof ctx.effective_canister_user_id === 'string' && ctx.effective_canister_user_id.trim() !== ''
190 ? ctx.effective_canister_user_id.trim()
191 : uid;
192
193 const transport = new StreamableHTTPServerTransport({
194 sessionIdGenerator: () => randomUUID(),
195 onsessioninitialized: (id) => {
196 const session = {
197 transport,
198 server: mcpServer,
199 userId: uid,
200 vaultId,
201 lastActive: Date.now(),
202 };
203 sessions.set(id, session);
204 const set = userSessions.get(uid) || new Set();
205 set.add(id);
206 userSessions.set(uid, set);
207 },
208 });
209
210 const mcpServer = createHostedMcpServer({
211 userId: uid,
212 canisterUserId,
213 vaultId,
214 role,
215 token,
216 canisterUrl,
217 canisterAuthSecret: canisterAuthSecret || '',
218 bridgeUrl,
219 scope: ctx.scope || {},
220 ...(gatewayApiBaseUrl && String(gatewayApiBaseUrl).trim()
221 ? { gatewayApiBaseUrl: String(gatewayApiBaseUrl).trim().replace(/\/$/, '') }
222 : {}),
223 });
224
225 await mcpServer.connect(transport);
226
227 return { transport, server: mcpServer, userId: uid, vaultId, lastActive: Date.now(), _pending: true };
228 }
229
230 function destroySession(sessionId) {
231 const session = sessions.get(sessionId);
232 if (!session) return;
233 try {
234 session.transport.close?.();
235 } catch (_) {}
236 sessions.delete(sessionId);
237 const set = userSessions.get(session.userId);
238 if (set) {
239 set.delete(sessionId);
240 if (set.size === 0) userSessions.delete(session.userId);
241 }
242 }
243
244 router.post('/', async (req, res) => {
245 try {
246 const sessionId = req.headers['mcp-session-id'];
247 const existing = sessionId ? sessions.get(String(sessionId)) : null;
248
249 if (existing && existing.userId === req.mcpUserId) {
250 existing.lastActive = Date.now();
251 await existing.transport.handleRequest(req, res, req.body);
252 return;
253 }
254
255 const session = await getOrCreateSession(req, res);
256 if (!session) return;
257 await session.transport.handleRequest(req, res, req.body);
258 } catch (e) {
259 if (!res.headersSent) {
260 res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: e.message || 'Internal error' } });
261 }
262 }
263 });
264
265 router.get('/', async (req, res) => {
266 const sessionId = req.headers['mcp-session-id'];
267 const session = sessionId ? sessions.get(String(sessionId)) : null;
268 if (!session || session.userId !== req.mcpUserId) {
269 return res.status(404).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Session not found' } });
270 }
271 session.lastActive = Date.now();
272 await session.transport.handleRequest(req, res, req.body);
273 });
274
275 router.delete('/', async (req, res) => {
276 const sessionId = req.headers['mcp-session-id'];
277 if (sessionId) {
278 const session = sessions.get(String(sessionId));
279 if (session && session.userId === req.mcpUserId) {
280 destroySession(String(sessionId));
281 }
282 }
283 res.status(200).json({ ok: true });
284 });
285
286 const cleanup = setInterval(() => {
287 const now = Date.now();
288 for (const [id, session] of sessions) {
289 if (now - session.lastActive > sessionTtlMs) {
290 destroySession(id);
291 }
292 }
293 }, CLEANUP_INTERVAL_MS);
294 cleanup.unref?.();
295
296 router._sessions = sessions;
297 router._userSessions = userSessions;
298 router._destroySession = destroySession;
299 router._cleanup = cleanup;
300
301 return router;
302 }
File History 1 commit
sha256:6f47d53a6adbcf105ba1b9cfc126c788d6a0f461d197f84f78794914305b4bd5 fix(mcp): bound hosted discovery context Human patch 16 hours ago