memory-provider-file.mjs
382 lines 10.8 KB
Raw
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor ⚠ breaking 1 day ago
1 /**
2 * File-based memory provider: append-only JSONL event log + latest-value state overlay.
3 * Phase 8 Memory Augmentation.
4 *
5 * Storage layout per vault:
6 * {baseDir}/events.jsonl — one JSON object per line, append-only
7 * {baseDir}/state.json — { [type]: latestEvent } for O(1) latest lookup
8 * {baseDir}/topics/{slug}.jsonl — per-topic mirror (opt-in via topicPartition: true)
9 */
10
11 import fs from 'fs';
12 import path from 'path';
13 import { extractTopicFromEvent, slugify } from './memory-event.mjs';
14
15 /**
16 * @param {string} baseDir — absolute path to the memory directory for one vault
17 * @param {{ topicPartition?: boolean }} [opts]
18 */
19 export class FileMemoryProvider {
20 #baseDir;
21 #topicPartition;
22
23 constructor(baseDir, opts = {}) {
24 this.#baseDir = baseDir;
25 this.#topicPartition = Boolean(opts.topicPartition);
26 }
27
28 get baseDir() {
29 return this.#baseDir;
30 }
31
32 get topicPartitionEnabled() {
33 return this.#topicPartition;
34 }
35
36 #eventsPath() {
37 return path.join(this.#baseDir, 'events.jsonl');
38 }
39
40 #statePath() {
41 return path.join(this.#baseDir, 'state.json');
42 }
43
44 #topicsDir() {
45 return path.join(this.#baseDir, 'topics');
46 }
47
48 #topicFilePath(slug) {
49 return path.join(this.#topicsDir(), `${slug}.jsonl`);
50 }
51
52 #ensureDir() {
53 fs.mkdirSync(this.#baseDir, { recursive: true });
54 }
55
56 #ensureTopicsDir() {
57 fs.mkdirSync(this.#topicsDir(), { recursive: true });
58 }
59
60 #readState() {
61 const p = this.#statePath();
62 if (!fs.existsSync(p)) return {};
63 try {
64 return JSON.parse(fs.readFileSync(p, 'utf8'));
65 } catch (_) {
66 return {};
67 }
68 }
69
70 #writeState(state) {
71 this.#ensureDir();
72 fs.writeFileSync(this.#statePath(), JSON.stringify(state, null, 2), 'utf8');
73 }
74
75 /**
76 * Append event to log, update state overlay, and optionally write to topic partition.
77 * @param {object} event — validated memory event from createMemoryEvent
78 * @returns {{ id: string, ts: string, topic?: string }}
79 */
80 storeEvent(event) {
81 this.#ensureDir();
82 const line = JSON.stringify(event) + '\n';
83 fs.appendFileSync(this.#eventsPath(), line, 'utf8');
84
85 const state = this.#readState();
86 state[event.type] = event;
87 this.#writeState(state);
88
89 let topic;
90 if (this.#topicPartition) {
91 topic = extractTopicFromEvent(event);
92 this.#ensureTopicsDir();
93 fs.appendFileSync(this.#topicFilePath(topic), line, 'utf8');
94 }
95
96 return topic ? { id: event.id, ts: event.ts, topic } : { id: event.id, ts: event.ts };
97 }
98
99 /**
100 * Get latest event for a given type from the state overlay.
101 * @param {string} type
102 * @returns {object|null}
103 */
104 getLatest(type) {
105 const state = this.#readState();
106 return state[type] ?? null;
107 }
108
109 /**
110 * List events from the JSONL log with optional filters.
111 * When topic filter is provided and topic partition is enabled, reads from the
112 * topic-specific file for efficiency. Otherwise falls back to scanning all events.
113 * @param {{ type?: string, since?: string, until?: string, limit?: number, topic?: string }} [opts]
114 * @returns {object[]}
115 */
116 listEvents(opts = {}) {
117 const limit = Math.min(opts.limit ?? 100, 1000);
118 let events;
119
120 if (opts.topic) {
121 events = this.#readTopicEvents(slugify(opts.topic));
122 } else {
123 const p = this.#eventsPath();
124 if (!fs.existsSync(p)) return [];
125 events = this.#parseJsonlFile(p);
126 }
127
128 if (opts.type) events = events.filter((e) => e.type === opts.type);
129 if (opts.since) events = events.filter((e) => e.ts >= opts.since);
130 if (opts.until) events = events.filter((e) => e.ts <= opts.until);
131 if (opts.topic && !this.#topicPartition) {
132 const slug = slugify(opts.topic);
133 events = events.filter((e) => extractTopicFromEvent(e) === slug);
134 }
135 events.sort((a, b) => (b.ts > a.ts ? 1 : b.ts < a.ts ? -1 : 0));
136 return events.slice(0, limit);
137 }
138
139 /**
140 * Read events from a topic-specific partition file.
141 * Falls back to scanning the main log when topic partition is disabled.
142 * @param {string} slug
143 * @returns {object[]}
144 */
145 #readTopicEvents(slug) {
146 if (this.#topicPartition) {
147 const tp = this.#topicFilePath(slug);
148 if (!fs.existsSync(tp)) return [];
149 return this.#parseJsonlFile(tp);
150 }
151 const p = this.#eventsPath();
152 if (!fs.existsSync(p)) return [];
153 return this.#parseJsonlFile(p);
154 }
155
156 /**
157 * Parse a JSONL file into an array of objects, skipping malformed lines.
158 * @param {string} filePath
159 * @returns {object[]}
160 */
161 #parseJsonlFile(filePath) {
162 const lines = fs.readFileSync(filePath, 'utf8').split('\n').filter(Boolean);
163 const events = [];
164 for (const line of lines) {
165 try { events.push(JSON.parse(line)); } catch (_) { /* skip malformed */ }
166 }
167 return events;
168 }
169
170 /**
171 * List all topic slugs that have partition files.
172 * @returns {string[]}
173 */
174 listTopics() {
175 const dir = this.#topicsDir();
176 if (!fs.existsSync(dir)) return [];
177 return fs.readdirSync(dir)
178 .filter((f) => f.endsWith('.jsonl'))
179 .map((f) => f.replace(/\.jsonl$/, ''))
180 .sort();
181 }
182
183 /**
184 * Get statistics for a specific topic.
185 * @param {string} slug
186 * @returns {{ topic: string, total: number, oldest: string|null, newest: string|null }}
187 */
188 getTopicStats(slug) {
189 const events = this.#readTopicEvents(slugify(slug));
190 if (events.length === 0) return { topic: slug, total: 0, oldest: null, newest: null };
191 let oldest = events[0].ts;
192 let newest = events[0].ts;
193 for (const e of events) {
194 if (e.ts < oldest) oldest = e.ts;
195 if (e.ts > newest) newest = e.ts;
196 }
197 return { topic: slug, total: events.length, oldest, newest };
198 }
199
200 /**
201 * Semantic search is not supported by the file provider.
202 * @returns {object[]}
203 */
204 searchEvents(_query, _opts) {
205 return [];
206 }
207
208 /**
209 * @returns {boolean}
210 */
211 supportsSearch() {
212 return false;
213 }
214
215 /**
216 * Clear events, optionally by type or before a date.
217 * @param {{ type?: string, before?: string }} [opts]
218 * @returns {{ cleared: number }}
219 */
220 clearEvents(opts = {}) {
221 const p = this.#eventsPath();
222 if (!fs.existsSync(p)) return { cleared: 0 };
223
224 const lines = fs.readFileSync(p, 'utf8').split('\n').filter(Boolean);
225 let events = [];
226 for (const line of lines) {
227 try { events.push(JSON.parse(line)); } catch (_) {}
228 }
229
230 const before = events.length;
231 let kept = events;
232 if (opts.type) {
233 kept = kept.filter((e) => e.type !== opts.type);
234 }
235 if (opts.before) {
236 kept = kept.filter((e) => e.ts >= opts.before);
237 }
238 if (!opts.type && !opts.before) {
239 kept = [];
240 }
241
242 const cleared = before - kept.length;
243
244 this.#ensureDir();
245 if (kept.length === 0) {
246 fs.writeFileSync(p, '', 'utf8');
247 } else {
248 fs.writeFileSync(p, kept.map((e) => JSON.stringify(e)).join('\n') + '\n', 'utf8');
249 }
250
251 const state = this.#readState();
252 const removedTypes = new Set(events.filter((e) => !kept.includes(e)).map((e) => e.type));
253 for (const t of removedTypes) {
254 const latestOfType = kept.filter((e) => e.type === t).sort((a, b) => (b.ts > a.ts ? 1 : -1))[0];
255 if (latestOfType) {
256 state[t] = latestOfType;
257 } else {
258 delete state[t];
259 }
260 }
261 this.#writeState(state);
262
263 if (this.#topicPartition) {
264 this.#rebuildTopicPartitions(kept);
265 }
266
267 return { cleared };
268 }
269
270 /**
271 * Remove events older than retentionDays from the log and update state.
272 * @param {number} retentionDays
273 * @returns {{ pruned: number }}
274 */
275 pruneExpired(retentionDays) {
276 if (!retentionDays || retentionDays <= 0) return { pruned: 0 };
277 const p = this.#eventsPath();
278 if (!fs.existsSync(p)) return { pruned: 0 };
279
280 const cutoff = new Date(Date.now() - retentionDays * 86_400_000).toISOString();
281 const lines = fs.readFileSync(p, 'utf8').split('\n').filter(Boolean);
282 const events = [];
283 for (const line of lines) {
284 try { events.push(JSON.parse(line)); } catch (_) {}
285 }
286
287 const kept = events.filter((e) => e.ts >= cutoff);
288 const pruned = events.length - kept.length;
289 if (pruned === 0) return { pruned: 0 };
290
291 this.#ensureDir();
292 if (kept.length === 0) {
293 fs.writeFileSync(p, '', 'utf8');
294 } else {
295 fs.writeFileSync(p, kept.map((e) => JSON.stringify(e)).join('\n') + '\n', 'utf8');
296 }
297
298 const state = this.#readState();
299 const removedTypes = new Set(
300 events.filter((e) => e.ts < cutoff).map((e) => e.type)
301 );
302 for (const t of removedTypes) {
303 const latestOfType = kept
304 .filter((e) => e.type === t)
305 .sort((a, b) => (b.ts > a.ts ? 1 : -1))[0];
306 if (latestOfType) {
307 state[t] = latestOfType;
308 } else {
309 delete state[t];
310 }
311 }
312 this.#writeState(state);
313
314 if (this.#topicPartition) {
315 this.#rebuildTopicPartitions(kept);
316 }
317
318 return { pruned };
319 }
320
321 /**
322 * Rebuild all topic partition files from a complete set of surviving events.
323 * Removes stale topic files and rewrites remaining ones.
324 * @param {object[]} events
325 */
326 #rebuildTopicPartitions(events) {
327 const dir = this.#topicsDir();
328 if (fs.existsSync(dir)) {
329 for (const f of fs.readdirSync(dir)) {
330 if (f.endsWith('.jsonl')) {
331 fs.unlinkSync(path.join(dir, f));
332 }
333 }
334 }
335 if (events.length === 0) return;
336 this.#ensureTopicsDir();
337 const byTopic = new Map();
338 for (const e of events) {
339 const slug = extractTopicFromEvent(e);
340 if (!byTopic.has(slug)) byTopic.set(slug, []);
341 byTopic.get(slug).push(e);
342 }
343 for (const [slug, topicEvents] of byTopic) {
344 const content = topicEvents.map((e) => JSON.stringify(e)).join('\n') + '\n';
345 fs.writeFileSync(this.#topicFilePath(slug), content, 'utf8');
346 }
347 }
348
349 /**
350 * Get memory statistics.
351 * @returns {{ counts_by_type: Record<string, number>, total: number, oldest: string|null, newest: string|null, size_bytes: number, topics?: string[] }}
352 */
353 getStats() {
354 const p = this.#eventsPath();
355 if (!fs.existsSync(p)) {
356 return { counts_by_type: {}, total: 0, oldest: null, newest: null, size_bytes: 0 };
357 }
358 const raw = fs.readFileSync(p, 'utf8');
359 const lines = raw.split('\n').filter(Boolean);
360 const counts = {};
361 let oldest = null;
362 let newest = null;
363 let total = 0;
364 for (const line of lines) {
365 try {
366 const e = JSON.parse(line);
367 counts[e.type] = (counts[e.type] || 0) + 1;
368 total++;
369 if (!oldest || e.ts < oldest) oldest = e.ts;
370 if (!newest || e.ts > newest) newest = e.ts;
371 } catch (_) {}
372 }
373 let size = 0;
374 try { size = fs.statSync(p).size; } catch (_) {}
375 try { size += fs.statSync(this.#statePath()).size; } catch (_) {}
376 const result = { counts_by_type: counts, total, oldest, newest, size_bytes: size };
377 if (this.#topicPartition) {
378 result.topics = this.listTopics();
379 }
380 return result;
381 }
382 }
File History 2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor 1 day ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6 docs: accept Calendar Events v0 spec with Phase 0 security … Human 1 day ago