daemon.mjs
508 lines 17.7 KB
Raw
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor ⚠ breaking 1 day ago
1 /**
2 * Daemon process lifecycle: start, stop, status, scheduling, idle detection.
3 * Phase B of the Daemon Consolidation Spec.
4 * Phase F adds cost tracking and daily cap enforcement.
5 *
6 * Exports:
7 * getPidPath, getLogPath — resolve file paths from config
8 * writePidFile, readPidFile,
9 * removePidFile, isProcessAlive,
10 * detectStalePid — PID file management
11 * appendDaemonLog, readDaemonLog — structured JSONL log
12 * isIdle — idle detection via mtime
13 * validateLlmConnectivity — LLM health check (trivial test prompt)
14 * getDaemonStatus — running state, last pass, next pass, cost
15 * stopDaemon — SIGTERM → SIGKILL with timeout
16 * startDaemon — full foreground lifecycle
17 */
18
19 import fs from 'fs';
20 import path from 'path';
21 import { consolidateMemory } from './memory-consolidate.mjs';
22 import { completeChat } from './llm-complete.mjs';
23 import { resolveMemoryDir } from './memory.mjs';
24 import { computeCallCost, getDailyCost, recordCallCost } from './daemon-cost.mjs';
25
26 // ── Path helpers ──────────────────────────────────────────────────────────────
27
28 /**
29 * Absolute path of the PID file.
30 * @param {object} config — loadConfig() result
31 * @returns {string}
32 */
33 export function getPidPath(config) {
34 return path.join(config.data_dir, 'daemon.pid');
35 }
36
37 /**
38 * Absolute path of the daemon log file.
39 * Falls back to {data_dir}/daemon.log when daemon.log_file is not set.
40 * @param {object} config — loadConfig() result
41 * @returns {string}
42 */
43 export function getLogPath(config) {
44 const explicitLogFile = config.daemon?.log_file;
45 if (explicitLogFile) return explicitLogFile;
46 return path.join(config.data_dir, 'daemon.log');
47 }
48
49 // ── PID file management ───────────────────────────────────────────────────────
50
51 /**
52 * Write a PID to the PID file. Creates parent dirs as needed.
53 * @param {string} pidPath
54 * @param {number} pid
55 */
56 export function writePidFile(pidPath, pid) {
57 fs.mkdirSync(path.dirname(pidPath), { recursive: true });
58 fs.writeFileSync(pidPath, String(pid), 'utf8');
59 }
60
61 /**
62 * Read the PID from the PID file. Returns null if missing or unparseable.
63 * @param {string} pidPath
64 * @returns {number|null}
65 */
66 export function readPidFile(pidPath) {
67 try {
68 const raw = fs.readFileSync(pidPath, 'utf8').trim();
69 const n = parseInt(raw, 10);
70 return Number.isFinite(n) && n > 0 ? n : null;
71 } catch {
72 return null;
73 }
74 }
75
76 /**
77 * Remove the PID file, silently ignoring missing-file errors.
78 * @param {string} pidPath
79 */
80 export function removePidFile(pidPath) {
81 try {
82 fs.unlinkSync(pidPath);
83 } catch {
84 // ignore
85 }
86 }
87
88 /**
89 * Check whether a given process is alive by sending signal 0.
90 * @param {number} pid
91 * @returns {boolean}
92 */
93 export function isProcessAlive(pid) {
94 if (!pid || typeof pid !== 'number' || pid < 1) return false;
95 try {
96 process.kill(pid, 0);
97 return true;
98 } catch {
99 return false;
100 }
101 }
102
103 /**
104 * Detect a stale PID file (file exists but the process is no longer running).
105 * @param {string} pidPath
106 * @returns {{ stale: boolean, pid: number|null }}
107 */
108 export function detectStalePid(pidPath) {
109 const pid = readPidFile(pidPath);
110 if (pid === null) return { stale: false, pid: null };
111 return { stale: !isProcessAlive(pid), pid };
112 }
113
114 // ── Daemon log ────────────────────────────────────────────────────────────────
115
116 /**
117 * Append a structured entry to the daemon log (JSONL).
118 * Automatically adds a `ts` field.
119 * @param {string} logPath
120 * @param {object} entry
121 */
122 export function appendDaemonLog(logPath, entry) {
123 fs.mkdirSync(path.dirname(logPath), { recursive: true });
124 // ts always reflects the time the entry was written, overriding any ts in entry
125 const line = JSON.stringify({ ...entry, ts: new Date().toISOString() }) + '\n';
126 fs.appendFileSync(logPath, line, 'utf8');
127 }
128
129 /**
130 * Read all entries from the daemon log, skipping malformed lines.
131 * @param {string} logPath
132 * @param {{ tail?: number }} [opts] — if tail > 0, return last N entries
133 * @returns {object[]}
134 */
135 export function readDaemonLog(logPath, { tail } = {}) {
136 try {
137 const content = fs.readFileSync(logPath, 'utf8');
138 const entries = [];
139 for (const line of content.split('\n')) {
140 if (!line.trim()) continue;
141 try {
142 entries.push(JSON.parse(line));
143 } catch {
144 // skip malformed
145 }
146 }
147 if (tail && tail > 0) return entries.slice(-tail);
148 return entries;
149 } catch {
150 return [];
151 }
152 }
153
154 // ── Idle detection ────────────────────────────────────────────────────────────
155
156 /**
157 * Determine whether the vault is idle by checking the mtime of the memory
158 * activity files (events.jsonl, state.json).
159 *
160 * Returns true when:
161 * - the files don't exist (no recorded activity), or
162 * - the most recent mtime is older than idle_threshold_minutes.
163 *
164 * @param {object} config — loadConfig() result with daemon.idle_threshold_minutes
165 * @returns {boolean}
166 */
167 export function isIdle(config) {
168 const thresholdMs = (config.daemon?.idle_threshold_minutes ?? 15) * 60_000;
169 const now = Date.now();
170
171 const memDir = resolveMemoryDir(config.data_dir, 'default');
172 const filesToCheck = [
173 path.join(memDir, 'events.jsonl'),
174 path.join(memDir, 'state.json'),
175 ];
176
177 let latestMtime = 0;
178 for (const f of filesToCheck) {
179 try {
180 const stat = fs.statSync(f);
181 if (stat.mtimeMs > latestMtime) latestMtime = stat.mtimeMs;
182 } catch {
183 // file does not exist — skip
184 }
185 }
186
187 // No files → no activity signal → treat as idle so daemon proceeds
188 if (latestMtime === 0) return true;
189 return now - latestMtime >= thresholdMs;
190 }
191
192 // ── LLM connectivity validation ───────────────────────────────────────────────
193
194 /**
195 * Send a trivial test prompt to the LLM and verify we get a non-empty response.
196 * Fails fast with a descriptive error if the LLM is unreachable.
197 *
198 * @param {object} config — loadConfig() result
199 * @param {Function} [llmFn] — injectable LLM function (defaults to completeChat)
200 * @returns {Promise<true>}
201 * @throws {Error} when the LLM is unreachable or returns an empty response
202 */
203 export async function validateLlmConnectivity(config, llmFn = completeChat) {
204 let response;
205 try {
206 response = await llmFn(config, {
207 system: 'You are a health check. Respond with exactly: OK',
208 user: 'Health check. Respond with OK.',
209 maxTokens: 10,
210 });
211 } catch (err) {
212 throw new Error(`LLM unreachable: ${err.message}`);
213 }
214 if (!response || !String(response).trim()) {
215 throw new Error('LLM connectivity check returned empty response');
216 }
217 return true;
218 }
219
220 // ── Status ────────────────────────────────────────────────────────────────────
221
222 /**
223 * Return the current daemon status without side effects.
224 *
225 * @param {object} config — loadConfig() result
226 * @returns {{
227 * running: boolean,
228 * pid: number|null,
229 * started_at: string|null,
230 * uptime_ms: number|null,
231 * last_pass: { ts: string, events_processed: number, topics: number }|null,
232 * next_pass_at: string|null,
233 * log_path: string,
234 * pid_path: string,
235 * cost_today_usd: number,
236 * cost_cap_usd: number|null,
237 * }}
238 */
239 export function getDaemonStatus(config) {
240 const pidPath = getPidPath(config);
241 const logPath = getLogPath(config);
242
243 const pid = readPidFile(pidPath);
244 const running = pid !== null && isProcessAlive(pid);
245
246 const log = readDaemonLog(logPath);
247 const startupEntry = log.find((e) => e.event === 'startup');
248 const lastPassEntry = [...log].reverse().find((e) => e.event === 'pass_complete');
249
250 const uptimeMs = running && startupEntry
251 ? Date.now() - new Date(startupEntry.ts).getTime()
252 : null;
253
254 let nextPassAt = null;
255 if (running && lastPassEntry) {
256 const intervalMs = (config.daemon?.interval_minutes ?? 120) * 60_000;
257 nextPassAt = new Date(new Date(lastPassEntry.ts).getTime() + intervalMs).toISOString();
258 }
259
260 return {
261 running,
262 pid: running ? pid : null,
263 started_at: running ? (startupEntry?.ts ?? null) : null,
264 uptime_ms: running ? uptimeMs : null,
265 last_pass: lastPassEntry
266 ? {
267 ts: lastPassEntry.ts,
268 events_processed: lastPassEntry.events_processed ?? 0,
269 topics: lastPassEntry.topics ?? 0,
270 }
271 : null,
272 next_pass_at: nextPassAt,
273 log_path: logPath,
274 pid_path: pidPath,
275 cost_today_usd: getDailyCost(config),
276 cost_cap_usd: config.daemon?.max_cost_per_day_usd ?? null,
277 };
278 }
279
280 // ── Stop ──────────────────────────────────────────────────────────────────────
281
282 /**
283 * Stop a running daemon.
284 *
285 * Reads the PID from the PID file, sends SIGTERM, and waits up to killTimeoutMs
286 * for the process to exit. Falls back to SIGKILL if it doesn't exit in time.
287 * Cleans up the stale PID file in all cases.
288 *
289 * @param {object} config — loadConfig() result
290 * @param {{
291 * killTimeoutMs?: number,
292 * _signalFn?: (pid: number, signal: string) => void,
293 * }} [opts]
294 * @returns {Promise<{ stopped: boolean, pid?: number, signal?: string, reason?: string }>}
295 */
296 export async function stopDaemon(config, { killTimeoutMs = 10_000, _signalFn } = {}) {
297 const pidPath = getPidPath(config);
298 const pid = readPidFile(pidPath);
299
300 if (!pid) {
301 return { stopped: false, reason: 'no PID file found' };
302 }
303
304 if (!isProcessAlive(pid)) {
305 removePidFile(pidPath);
306 return { stopped: false, reason: 'process not running (stale PID file cleaned up)', pid };
307 }
308
309 const sendSignal = _signalFn ?? ((p, sig) => process.kill(p, sig));
310 sendSignal(pid, 'SIGTERM');
311
312 const deadline = Date.now() + killTimeoutMs;
313 while (Date.now() < deadline) {
314 await new Promise((r) => setTimeout(r, 200));
315 if (!isProcessAlive(pid)) {
316 removePidFile(pidPath);
317 return { stopped: true, pid, signal: 'SIGTERM' };
318 }
319 }
320
321 // Process didn't exit — escalate to SIGKILL
322 try {
323 sendSignal(pid, 'SIGKILL');
324 } catch {
325 // may already be dead
326 }
327 await new Promise((r) => setTimeout(r, 200));
328 removePidFile(pidPath);
329 return { stopped: true, pid, signal: 'SIGKILL' };
330 }
331
332 // ── Start (foreground) ────────────────────────────────────────────────────────
333
334 /**
335 * Start the daemon in the foreground. Blocks until shutdown.
336 *
337 * Lifecycle:
338 * 1. Crash recovery: detects stale PID file and removes it.
339 * 2. Validates LLM connectivity (trivial test prompt).
340 * 3. Writes PID file.
341 * 4. Logs startup to daemon log.
342 * 5. If run_on_start: runs one full consolidation pass immediately.
343 * 6. Enters scheduling loop: sleeps interval_minutes, checks idle, calls consolidateMemory.
344 * 7. On SIGTERM/SIGINT: writes shutdown event, removes PID file, returns.
345 *
346 * Injectable options for testing (prefixed with _):
347 * - _sleep(ms): replaces the interval sleep (useful for fast test cycles)
348 * - _signalTarget: EventEmitter to listen on instead of process (avoids polluting process handlers)
349 * - consolidateFn: replaces consolidateMemory (mock in tests)
350 * - llmFn: replaces completeChat (mock in tests)
351 * - costRates: overrides DEFAULT_RATES for cost computation (lets tests use exact values)
352 *
353 * @param {object} config — loadConfig() result
354 * @param {{
355 * consolidateFn?: Function,
356 * llmFn?: Function,
357 * costRates?: { input_per_token?: number, output_per_token?: number },
358 * _sleep?: (ms: number) => Promise<void>,
359 * _signalTarget?: import('events').EventEmitter,
360 * }} [opts]
361 * @returns {Promise<{ stopped: boolean }>}
362 */
363 export async function startDaemon(config, opts = {}) {
364 const {
365 consolidateFn = consolidateMemory,
366 llmFn = completeChat,
367 costRates,
368 _sleep: injectedSleep,
369 _signalTarget = process,
370 } = opts;
371
372 // Wrap llmFn so every LLM call made during consolidation is recorded to the
373 // daily cost accumulator. The raw llmFn is still used for the LLM health
374 // check (validateLlmConnectivity) so startup overhead is not billed.
375 const trackedLlmFn = async (cfg, llmOpts) => {
376 const response = await llmFn(cfg, llmOpts);
377 const cost = computeCallCost(llmOpts, String(response ?? ''), costRates);
378 recordCallCost(config, cost);
379 return response;
380 };
381
382 const pidPath = getPidPath(config);
383 const logPath = getLogPath(config);
384 const daemonCfg = config.daemon ?? {};
385
386 // ── 1. Crash recovery ─────────────────────────────────────────────────────
387 const { stale, pid: stalePid } = detectStalePid(pidPath);
388 if (stale) {
389 appendDaemonLog(logPath, { event: 'stale_pid_cleanup', stale_pid: stalePid });
390 removePidFile(pidPath);
391 }
392
393 // Refuse to start a second instance
394 const existingPid = readPidFile(pidPath);
395 if (existingPid && isProcessAlive(existingPid)) {
396 throw new Error(`Daemon already running (PID ${existingPid})`);
397 }
398
399 // ── 2. Validate LLM ───────────────────────────────────────────────────────
400 await validateLlmConnectivity(config, llmFn);
401
402 // ── 3. Write PID ──────────────────────────────────────────────────────────
403 writePidFile(pidPath, process.pid);
404
405 // ── 4. Log startup ────────────────────────────────────────────────────────
406 appendDaemonLog(logPath, {
407 event: 'startup',
408 pid: process.pid,
409 interval_minutes: daemonCfg.interval_minutes ?? 120,
410 idle_only: daemonCfg.idle_only ?? true,
411 dry_run: daemonCfg.dry_run ?? false,
412 });
413
414 // ── Signal handling ───────────────────────────────────────────────────────
415 let running = true;
416 let wakeUp = null; // resolves the current cancellable sleep early
417
418 const shutdown = (signal) => {
419 if (!running) return;
420 running = false;
421 if (wakeUp) wakeUp(); // cancel the waiting sleep
422 appendDaemonLog(logPath, { event: 'shutdown', signal, pid: process.pid });
423 removePidFile(pidPath);
424 };
425
426 const sigtermHandler = () => shutdown('SIGTERM');
427 const sigintHandler = () => shutdown('SIGINT');
428 _signalTarget.on('SIGTERM', sigtermHandler);
429 _signalTarget.on('SIGINT', sigintHandler);
430
431 // Cancellable sleep: wakeUp resolves it early when shutdown is called.
432 // Tests may inject their own _sleep to skip the wait entirely.
433 const cancellableSleep = (ms) =>
434 new Promise((resolve) => {
435 wakeUp = resolve;
436 setTimeout(resolve, ms);
437 });
438 const sleepFn = injectedSleep ?? cancellableSleep;
439
440 // ── 5. run_on_start ───────────────────────────────────────────────────────
441 if (daemonCfg.run_on_start) {
442 try {
443 const result = await consolidateFn(config, { llmFn: trackedLlmFn });
444 appendDaemonLog(logPath, {
445 event: 'pass_complete',
446 trigger: 'run_on_start',
447 events_processed: result.total_events,
448 topics: result.topics.length,
449 });
450 } catch (err) {
451 appendDaemonLog(logPath, {
452 event: 'pass_error',
453 error: err.message,
454 trigger: 'run_on_start',
455 });
456 }
457 }
458
459 // ── 6. Scheduling loop ────────────────────────────────────────────────────
460 const intervalMs = (daemonCfg.interval_minutes ?? 120) * 60_000;
461 const maxCostPerDay = daemonCfg.max_cost_per_day_usd ?? null;
462
463 while (running) {
464 await sleepFn(intervalMs);
465 if (!running) break;
466
467 if (daemonCfg.idle_only && !isIdle(config)) {
468 appendDaemonLog(logPath, { event: 'skip_not_idle' });
469 continue;
470 }
471
472 // Cost cap guard: skip the pass (but keep the daemon running) if the
473 // daily budget has been met or exceeded. Null cap means no limit.
474 if (maxCostPerDay !== null) {
475 const costToday = getDailyCost(config);
476 if (costToday >= maxCostPerDay) {
477 appendDaemonLog(logPath, {
478 event: 'cost_cap_reached',
479 cost_today_usd: costToday,
480 cap_usd: maxCostPerDay,
481 });
482 continue;
483 }
484 }
485
486 try {
487 const result = await consolidateFn(config, { llmFn: trackedLlmFn });
488 appendDaemonLog(logPath, {
489 event: 'pass_complete',
490 trigger: 'scheduled',
491 events_processed: result.total_events,
492 topics: result.topics.length,
493 });
494 } catch (err) {
495 appendDaemonLog(logPath, {
496 event: 'pass_error',
497 error: err.message,
498 trigger: 'scheduled',
499 });
500 }
501 }
502
503 // Clean up signal listeners to avoid handler accumulation in tests
504 _signalTarget.off('SIGTERM', sigtermHandler);
505 _signalTarget.off('SIGINT', sigintHandler);
506
507 return { stopped: true };
508 }
File History 2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor 1 day ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6 docs: accept Calendar Events v0 spec with Phase 0 security … Human 1 day ago