mcp-proxy.mjs
sha256:6f47d53a6adbcf105ba1b9cfc126c788d6a0f461d197f84f78794914305b4bd5
fix(mcp): bound hosted discovery context
Human
patch
16 hours ago
| 1 | /** |
| 2 | * Issue #1 Phase D2 — MCP gateway proxy for hosted MCP. |
| 3 | * Express router that handles /mcp with JWT auth, session pool, rate limiting, and cleanup. |
| 4 | */ |
| 5 | |
| 6 | import { randomUUID } from 'node:crypto'; |
| 7 | import express from 'express'; |
| 8 | import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; |
| 9 | import { createHostedMcpServer } from './mcp-hosted-server.mjs'; |
| 10 | |
| 11 | const DEFAULT_RATE_LIMIT = 60; |
| 12 | const RATE_WINDOW_MS = 60_000; |
| 13 | const CLEANUP_INTERVAL_MS = 60_000; |
| 14 | |
| 15 | /** |
| 16 | * Default idle TTL for hosted MCP HTTP sessions (gateway RAM). Longer than the |
| 17 | * historical 30m default so Cursor does not lose tools/resources after a coffee break. |
| 18 | * Override with **`MCP_SESSION_TTL_MS`** (milliseconds), clamped 5m–24h. |
| 19 | */ |
| 20 | const DEFAULT_MCP_SESSION_TTL_MS = 8 * 60 * 60 * 1000; |
| 21 | const MIN_MCP_SESSION_TTL_MS = 5 * 60 * 1000; |
| 22 | const MAX_MCP_SESSION_TTL_MS = 24 * 60 * 60 * 1000; |
| 23 | |
| 24 | /** |
| 25 | * Max concurrent MCP sessions per user id before the oldest is evicted. |
| 26 | * Override with **`MCP_MAX_SESSIONS_PER_USER`**, clamped 2–20. |
| 27 | */ |
| 28 | const DEFAULT_MCP_MAX_SESSIONS_PER_USER = 8; |
| 29 | const MIN_MCP_MAX_SESSIONS_PER_USER = 2; |
| 30 | const MAX_MCP_MAX_SESSIONS_PER_USER = 20; |
| 31 | |
| 32 | /** |
| 33 | * @param {NodeJS.ProcessEnv} [env] |
| 34 | * @returns {number} |
| 35 | */ |
| 36 | export function parseMcpSessionTtlMs(env = process.env) { |
| 37 | const raw = env.MCP_SESSION_TTL_MS; |
| 38 | if (raw == null || String(raw).trim() === '') return DEFAULT_MCP_SESSION_TTL_MS; |
| 39 | const n = parseInt(String(raw).trim(), 10); |
| 40 | if (!Number.isFinite(n)) return DEFAULT_MCP_SESSION_TTL_MS; |
| 41 | return Math.min(MAX_MCP_SESSION_TTL_MS, Math.max(MIN_MCP_SESSION_TTL_MS, n)); |
| 42 | } |
| 43 | |
| 44 | /** |
| 45 | * @param {NodeJS.ProcessEnv} [env] |
| 46 | * @returns {number} |
| 47 | */ |
| 48 | export function parseMcpMaxSessionsPerUser(env = process.env) { |
| 49 | const raw = env.MCP_MAX_SESSIONS_PER_USER; |
| 50 | if (raw == null || String(raw).trim() === '') return DEFAULT_MCP_MAX_SESSIONS_PER_USER; |
| 51 | const n = parseInt(String(raw).trim(), 10); |
| 52 | if (!Number.isFinite(n)) return DEFAULT_MCP_MAX_SESSIONS_PER_USER; |
| 53 | return Math.min(MAX_MCP_MAX_SESSIONS_PER_USER, Math.max(MIN_MCP_MAX_SESSIONS_PER_USER, n)); |
| 54 | } |
| 55 | |
| 56 | /** |
| 57 | * Streamable HTTP session creation is only valid during JSON-RPC initialize. |
| 58 | * Sessionless follow-up calls such as tools/list must fail fast instead of |
| 59 | * triggering hosted bridge/canister context work before the SDK can reject them. |
| 60 | * |
| 61 | * @param {unknown} body |
| 62 | * @returns {boolean} |
| 63 | */ |
| 64 | export function isMcpInitializeRequest(body) { |
| 65 | if (Array.isArray(body)) return body.some((item) => isMcpInitializeRequest(item)); |
| 66 | return Boolean(body && typeof body === 'object' && body.method === 'initialize'); |
| 67 | } |
| 68 | |
| 69 | /** |
| 70 | * @typedef {{ |
| 71 | * transport: StreamableHTTPServerTransport, |
| 72 | * server: import('@modelcontextprotocol/sdk/server/mcp.js').McpServer, |
| 73 | * userId: string, |
| 74 | * vaultId: string, |
| 75 | * lastActive: number, |
| 76 | * }} McpSession |
| 77 | */ |
| 78 | |
| 79 | /** |
| 80 | * Simple sliding-window rate limiter per user. |
| 81 | */ |
| 82 | function createRateLimiter(maxReqs = DEFAULT_RATE_LIMIT, windowMs = RATE_WINDOW_MS) { |
| 83 | /** @type {Map<string, number[]>} */ |
| 84 | const hits = new Map(); |
| 85 | |
| 86 | return (userId) => { |
| 87 | const now = Date.now(); |
| 88 | const cutoff = now - windowMs; |
| 89 | let userHits = hits.get(userId) || []; |
| 90 | userHits = userHits.filter((t) => t > cutoff); |
| 91 | if (userHits.length >= maxReqs) return false; |
| 92 | userHits.push(now); |
| 93 | hits.set(userId, userHits); |
| 94 | return true; |
| 95 | }; |
| 96 | } |
| 97 | |
| 98 | /** |
| 99 | * Create the MCP proxy Express router. |
| 100 | * |
| 101 | * @param {{ |
| 102 | * getUserId: (req: import('express').Request) => string | null, |
| 103 | * getHostedAccessContext: (req: import('express').Request) => Promise<Record<string, unknown> | null>, |
| 104 | * canisterUrl: string, |
| 105 | * canisterAuthSecret?: string, |
| 106 | * bridgeUrl: string, |
| 107 | * gatewayApiBaseUrl: (string|undefined) — public gateway base (no path), e.g. HUB_BASE_URL; enables hub_create_proposal. |
| 108 | * sessionSecret: string, |
| 109 | * rateLimit?: number, |
| 110 | * sessionTtlMs?: number, |
| 111 | * maxSessionsPerUser?: number, |
| 112 | * }} deps |
| 113 | * @returns {import('express').Router} |
| 114 | */ |
| 115 | export function createMcpProxyRouter(deps) { |
| 116 | const { |
| 117 | getUserId, |
| 118 | getHostedAccessContext, |
| 119 | canisterUrl, |
| 120 | canisterAuthSecret, |
| 121 | bridgeUrl, |
| 122 | gatewayApiBaseUrl, |
| 123 | rateLimit = DEFAULT_RATE_LIMIT, |
| 124 | sessionTtlMs = parseMcpSessionTtlMs(), |
| 125 | maxSessionsPerUser = parseMcpMaxSessionsPerUser(), |
| 126 | } = deps; |
| 127 | |
| 128 | const router = express.Router(); |
| 129 | |
| 130 | /** @type {Map<string, McpSession>} */ |
| 131 | const sessions = new Map(); |
| 132 | |
| 133 | /** @type {Map<string, Set<string>>} */ |
| 134 | const userSessions = new Map(); |
| 135 | |
| 136 | const checkRate = createRateLimiter(rateLimit); |
| 137 | |
| 138 | router.use((req, res, next) => { |
| 139 | const uid = getUserId(req); |
| 140 | if (!uid) return res.status(401).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Unauthorized' } }); |
| 141 | if (!checkRate(uid)) { |
| 142 | return res.status(429).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Rate limit exceeded' } }); |
| 143 | } |
| 144 | req.mcpUserId = uid; |
| 145 | next(); |
| 146 | }); |
| 147 | |
| 148 | async function getOrCreateSession(req, res) { |
| 149 | const sessionId = req.headers['mcp-session-id']; |
| 150 | if (sessionId) { |
| 151 | const existing = sessions.get(String(sessionId)); |
| 152 | if (existing && existing.userId === req.mcpUserId) { |
| 153 | existing.lastActive = Date.now(); |
| 154 | return existing; |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | if (!isMcpInitializeRequest(req.body)) { |
| 159 | res.status(404).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Session not found' } }); |
| 160 | return null; |
| 161 | } |
| 162 | |
| 163 | const uid = req.mcpUserId; |
| 164 | const userSessionIds = userSessions.get(uid) || new Set(); |
| 165 | if (userSessionIds.size >= maxSessionsPerUser) { |
| 166 | let oldest = null; |
| 167 | let oldestTime = Infinity; |
| 168 | for (const sid of userSessionIds) { |
| 169 | const s = sessions.get(sid); |
| 170 | if (s && s.lastActive < oldestTime) { |
| 171 | oldest = sid; |
| 172 | oldestTime = s.lastActive; |
| 173 | } |
| 174 | } |
| 175 | if (oldest) destroySession(oldest); |
| 176 | } |
| 177 | |
| 178 | const ctx = await getHostedAccessContext(req); |
| 179 | if (!ctx) { |
| 180 | res.status(403).json({ jsonrpc: '2.0', error: { code: -32600, message: 'No hosted access' } }); |
| 181 | return null; |
| 182 | } |
| 183 | |
| 184 | const vaultId = String(req.headers['x-vault-id'] || 'default'); |
| 185 | const role = ctx.role || 'viewer'; |
| 186 | const token = (req.headers.authorization || '').replace(/^Bearer\s+/i, ''); |
| 187 | /** Match `gatewayProxyGetNotesList` / `proxyToCanister`: canister reads use effective workspace user when set. */ |
| 188 | const canisterUserId = |
| 189 | typeof ctx.effective_canister_user_id === 'string' && ctx.effective_canister_user_id.trim() !== '' |
| 190 | ? ctx.effective_canister_user_id.trim() |
| 191 | : uid; |
| 192 | |
| 193 | const transport = new StreamableHTTPServerTransport({ |
| 194 | sessionIdGenerator: () => randomUUID(), |
| 195 | onsessioninitialized: (id) => { |
| 196 | const session = { |
| 197 | transport, |
| 198 | server: mcpServer, |
| 199 | userId: uid, |
| 200 | vaultId, |
| 201 | lastActive: Date.now(), |
| 202 | }; |
| 203 | sessions.set(id, session); |
| 204 | const set = userSessions.get(uid) || new Set(); |
| 205 | set.add(id); |
| 206 | userSessions.set(uid, set); |
| 207 | }, |
| 208 | }); |
| 209 | |
| 210 | const mcpServer = createHostedMcpServer({ |
| 211 | userId: uid, |
| 212 | canisterUserId, |
| 213 | vaultId, |
| 214 | role, |
| 215 | token, |
| 216 | canisterUrl, |
| 217 | canisterAuthSecret: canisterAuthSecret || '', |
| 218 | bridgeUrl, |
| 219 | scope: ctx.scope || {}, |
| 220 | ...(gatewayApiBaseUrl && String(gatewayApiBaseUrl).trim() |
| 221 | ? { gatewayApiBaseUrl: String(gatewayApiBaseUrl).trim().replace(/\/$/, '') } |
| 222 | : {}), |
| 223 | }); |
| 224 | |
| 225 | await mcpServer.connect(transport); |
| 226 | |
| 227 | return { transport, server: mcpServer, userId: uid, vaultId, lastActive: Date.now(), _pending: true }; |
| 228 | } |
| 229 | |
| 230 | function destroySession(sessionId) { |
| 231 | const session = sessions.get(sessionId); |
| 232 | if (!session) return; |
| 233 | try { |
| 234 | session.transport.close?.(); |
| 235 | } catch (_) {} |
| 236 | sessions.delete(sessionId); |
| 237 | const set = userSessions.get(session.userId); |
| 238 | if (set) { |
| 239 | set.delete(sessionId); |
| 240 | if (set.size === 0) userSessions.delete(session.userId); |
| 241 | } |
| 242 | } |
| 243 | |
| 244 | router.post('/', async (req, res) => { |
| 245 | try { |
| 246 | const sessionId = req.headers['mcp-session-id']; |
| 247 | const existing = sessionId ? sessions.get(String(sessionId)) : null; |
| 248 | |
| 249 | if (existing && existing.userId === req.mcpUserId) { |
| 250 | existing.lastActive = Date.now(); |
| 251 | await existing.transport.handleRequest(req, res, req.body); |
| 252 | return; |
| 253 | } |
| 254 | |
| 255 | const session = await getOrCreateSession(req, res); |
| 256 | if (!session) return; |
| 257 | await session.transport.handleRequest(req, res, req.body); |
| 258 | } catch (e) { |
| 259 | if (!res.headersSent) { |
| 260 | res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: e.message || 'Internal error' } }); |
| 261 | } |
| 262 | } |
| 263 | }); |
| 264 | |
| 265 | router.get('/', async (req, res) => { |
| 266 | const sessionId = req.headers['mcp-session-id']; |
| 267 | const session = sessionId ? sessions.get(String(sessionId)) : null; |
| 268 | if (!session || session.userId !== req.mcpUserId) { |
| 269 | return res.status(404).json({ jsonrpc: '2.0', error: { code: -32600, message: 'Session not found' } }); |
| 270 | } |
| 271 | session.lastActive = Date.now(); |
| 272 | await session.transport.handleRequest(req, res, req.body); |
| 273 | }); |
| 274 | |
| 275 | router.delete('/', async (req, res) => { |
| 276 | const sessionId = req.headers['mcp-session-id']; |
| 277 | if (sessionId) { |
| 278 | const session = sessions.get(String(sessionId)); |
| 279 | if (session && session.userId === req.mcpUserId) { |
| 280 | destroySession(String(sessionId)); |
| 281 | } |
| 282 | } |
| 283 | res.status(200).json({ ok: true }); |
| 284 | }); |
| 285 | |
| 286 | const cleanup = setInterval(() => { |
| 287 | const now = Date.now(); |
| 288 | for (const [id, session] of sessions) { |
| 289 | if (now - session.lastActive > sessionTtlMs) { |
| 290 | destroySession(id); |
| 291 | } |
| 292 | } |
| 293 | }, CLEANUP_INTERVAL_MS); |
| 294 | cleanup.unref?.(); |
| 295 | |
| 296 | router._sessions = sessions; |
| 297 | router._userSessions = userSessions; |
| 298 | router._destroySession = destroySession; |
| 299 | router._cleanup = cleanup; |
| 300 | |
| 301 | return router; |
| 302 | } |
File History
1 commit
sha256:6f47d53a6adbcf105ba1b9cfc126c788d6a0f461d197f84f78794914305b4bd5
fix(mcp): bound hosted discovery context
Human
patch
16 hours ago