resource-subscriptions.mjs
210 lines 6.3 KB
Raw
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor ⚠ breaking 1 day ago
1 /**
2 * Issue #1 Phase E — resource subscriptions + vault watcher.
3 * Tracks resources/subscribe URIs and emits notifications/resources/updated when allowed.
4 */
5
6 import path from 'path';
7 import chokidar from 'chokidar';
8 import { SubscribeRequestSchema, UnsubscribeRequestSchema } from '@modelcontextprotocol/sdk/types.js';
9
10 /** @type {Set<string>} */
11 const subscribedUris = new Set();
12
13 const FLUSH_MS = 150;
14
15 /** @type {ReturnType<typeof setTimeout> | null} */
16 let flushTimer = null;
17 /** @type {Set<string>} */
18 const pendingUpdatedUris = new Set();
19 let pendingListChanged = false;
20
21 function normalizeUri(u) {
22 return String(u || '').trim();
23 }
24
25 /**
26 * True if a subscription to `subscriberUri` should receive updates for `changedUri`.
27 * @param {string} subscriberUri
28 * @param {string} changedUri
29 */
30 export function subscriptionCoversUri(subscriberUri, changedUri) {
31 const sub = normalizeUri(subscriberUri);
32 const ch = normalizeUri(changedUri);
33 if (!sub || !ch) return false;
34 if (sub === ch) return true;
35 const base = sub.replace(/\/+$/, '') || sub;
36 return ch.startsWith(`${base}/`);
37 }
38
39 function shouldNotifyUpdated(changedUri) {
40 const ch = normalizeUri(changedUri);
41 for (const sub of subscribedUris) {
42 if (subscriptionCoversUri(sub, ch)) return true;
43 }
44 return false;
45 }
46
47 async function emitResourceUpdated(mcpServer, uri) {
48 if (!mcpServer.isConnected() || !shouldNotifyUpdated(uri)) return;
49 try {
50 await mcpServer.server.sendResourceUpdated({ uri });
51 } catch (_) {
52 /* transport may be closing */
53 }
54 }
55
56 async function flushPending(mcpServer) {
57 if (pendingListChanged) {
58 mcpServer.sendResourceListChanged();
59 pendingListChanged = false;
60 }
61 const batch = [...pendingUpdatedUris];
62 pendingUpdatedUris.clear();
63 for (const u of batch) {
64 await emitResourceUpdated(mcpServer, u);
65 }
66 }
67
68 function scheduleFlush(mcpServer) {
69 if (flushTimer) clearTimeout(flushTimer);
70 flushTimer = setTimeout(() => {
71 flushTimer = null;
72 void flushPending(mcpServer);
73 }, FLUSH_MS);
74 }
75
76 /**
77 * Listing resources that may need refresh when a vault-relative path changes.
78 * @param {string} relPosix vault-relative path with forward slashes
79 * @returns {string[]}
80 */
81 export function listingUrisForRelPath(relPosix) {
82 const uris = new Set(['knowtation://vault/']);
83 if (!relPosix) return [...uris];
84 const seg = relPosix.split('/');
85 if (seg[0] === 'inbox') uris.add('knowtation://vault/inbox');
86 if (seg[0] === 'captures') uris.add('knowtation://vault/captures');
87 if (seg[0] === 'imports') uris.add('knowtation://vault/imports');
88 if (seg[0] === 'templates' || relPosix.startsWith('templates/')) uris.add('knowtation://vault/templates');
89 if (seg[0] === 'media' && seg[1] === 'audio') uris.add('knowtation://vault/media/audio');
90 if (seg[0] === 'media' && seg[1] === 'video') uris.add('knowtation://vault/media/video');
91 if (seg[0] === 'projects' && seg[1]) {
92 uris.add(`knowtation://vault/projects/${seg[1]}`);
93 }
94 return [...uris];
95 }
96
97 /**
98 * @param {string} vaultPath absolute
99 * @param {string} absPath absolute path from watcher
100 * @returns {string | null} vault-relative posix path
101 */
102 export function vaultRelativePosix(vaultPath, absPath) {
103 const rel = path.relative(vaultPath, absPath);
104 if (!rel || rel.startsWith('..')) return null;
105 return rel.split(path.sep).join('/');
106 }
107
108 function queueVaultFsChange(mcpServer, event, absFilePath, vaultPath) {
109 const rel = vaultRelativePosix(vaultPath, absFilePath);
110 if (rel === null) return;
111
112 if (event === 'unlink' || event === 'unlinkDir') {
113 pendingListChanged = true;
114 if (rel.endsWith('.md')) {
115 pendingUpdatedUris.add(`knowtation://vault/${rel}`);
116 }
117 for (const u of listingUrisForRelPath(rel)) pendingUpdatedUris.add(u);
118 scheduleFlush(mcpServer);
119 return;
120 }
121
122 if (event === 'add' || event === 'change' || event === 'addDir') {
123 if (rel.endsWith('.md')) {
124 pendingUpdatedUris.add(`knowtation://vault/${rel}`);
125 for (const u of listingUrisForRelPath(rel)) pendingUpdatedUris.add(u);
126 } else {
127 for (const u of listingUrisForRelPath(rel)) pendingUpdatedUris.add(u);
128 }
129 scheduleFlush(mcpServer);
130 }
131 }
132
133 const INDEX_METADATA_URIS = [
134 'knowtation://index/stats',
135 'knowtation://tags',
136 'knowtation://projects',
137 'knowtation://index/graph',
138 ];
139
140 /**
141 * After indexer run: notify metadata resources if clients subscribed (Issue #1 E3).
142 * @param {import('@modelcontextprotocol/sdk/server/mcp.js').McpServer} mcpServer
143 */
144 export async function notifyIndexMetadataResources(mcpServer) {
145 if (!mcpServer.isConnected()) return;
146 for (const uri of INDEX_METADATA_URIS) {
147 await emitResourceUpdated(mcpServer, uri);
148 }
149 }
150
151 /**
152 * Register resources/subscribe and resources/unsubscribe on the underlying Server.
153 * Call after resources are registered, before connect().
154 * @param {import('@modelcontextprotocol/sdk/server/mcp.js').McpServer} mcpServer
155 */
156 export function registerResourceSubscriptionHandlers(mcpServer) {
157 const srv = mcpServer.server;
158
159 srv.setRequestHandler(SubscribeRequestSchema, async (request) => {
160 const uri = normalizeUri(request.params?.uri);
161 if (!uri.startsWith('knowtation://')) {
162 return {};
163 }
164 subscribedUris.add(uri);
165 return {};
166 });
167
168 srv.setRequestHandler(UnsubscribeRequestSchema, async (request) => {
169 const uri = normalizeUri(request.params?.uri);
170 subscribedUris.delete(uri);
171 return {};
172 });
173 }
174
175 /**
176 * Start watching the vault; emit debounced list/updated notifications.
177 * @param {import('@modelcontextprotocol/sdk/server/mcp.js').McpServer} mcpServer
178 * @param {string} vaultPath absolute
179 * @returns {{ close: () => Promise<void> }}
180 */
181 export function startVaultResourceWatcher(mcpServer, vaultPath) {
182 if (process.env.KNOWTATION_MCP_NO_WATCH === '1') {
183 return { close: async () => {} };
184 }
185
186 const watcher = chokidar.watch(vaultPath, {
187 ignoreInitial: true,
188 awaitWriteFinish: { stabilityThreshold: 200, pollInterval: 100 },
189 ignored: (fp) => {
190 const rel = path.relative(vaultPath, fp);
191 return rel.split(path.sep).includes('.git');
192 },
193 });
194
195 watcher.on('all', (event, fp) => {
196 if (typeof fp !== 'string') return;
197 queueVaultFsChange(mcpServer, event, path.resolve(fp), vaultPath);
198 });
199
200 return {
201 close: async () => {
202 if (flushTimer) {
203 clearTimeout(flushTimer);
204 flushTimer = null;
205 await flushPending(mcpServer);
206 }
207 await watcher.close();
208 },
209 };
210 }
File History 2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor 1 day ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6 docs: accept Calendar Events v0 spec with Phase 0 security … Human 1 day ago