consolidation-scheduler.mjs
247 lines 9.1 KB
Raw
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor ⚠ breaking 1 day ago
1 /**
2 * Netlify Scheduled Function — consolidation-scheduler
3 *
4 * Runs hourly. For each hosted user with consolidation_enabled=true whose
5 * (consolidation_last_pass_at + consolidation_interval_minutes) <= now, this
6 * function calls POST /api/v1/memory/consolidate on the bridge via a fresh
7 * short-lived service JWT signed with SESSION_SECRET.
8 *
9 * Each user's effective schedule is controlled by their own
10 * consolidation_interval_minutes setting (e.g. 60, 120, 1440 for daily).
11 * The cron runs hourly and only triggers users who are actually due.
12 *
13 * Env:
14 * SESSION_SECRET required — sign per-user service JWTs
15 * BRIDGE_URL required — bridge origin (no trailing slash)
16 * CONSOLIDATION_SCHEDULER_MAX_USERS_PER_RUN optional — cap per invocation (default 20)
17 * BILLING_ENFORCE optional — 'true' to actually call bridge;
18 * any other value = shadow-log mode (default)
19 *
20 * Safety:
21 * - Never forwards user credentials; always generates a fresh 5-minute JWT per user.
22 * - Per-user errors are caught and logged without aborting the rest of the run.
23 * - In shadow-log mode (BILLING_ENFORCE !== 'true'), logs what would have been triggered
24 * without calling the bridge or updating the billing DB.
25 */
26 import { getStore } from '@netlify/blobs';
27 import jwt from 'jsonwebtoken';
28 import { loadBillingDb, mutateBillingDb } from '../../hub/gateway/billing-store.mjs';
29 import { normalizeBillingUser } from '../../hub/gateway/billing-logic.mjs';
30 import { mergeConsolidateRequestBodyWithBillingDefaults } from '../../lib/hosted-consolidation-advanced.mjs';
31
32 export const config = { schedule: '0 * * * *' };
33
34 const SESSION_SECRET = process.env.SESSION_SECRET || process.env.HUB_JWT_SECRET || '';
35 const BRIDGE_URL = (process.env.BRIDGE_URL || '').replace(/\/+$/, '');
36 const MAX_USERS_PER_RUN = (() => {
37 const v = parseInt(process.env.CONSOLIDATION_SCHEDULER_MAX_USERS_PER_RUN || '20', 10);
38 return Number.isFinite(v) && v > 0 ? v : 20;
39 })();
40
41 /**
42 * Returns true if the user is due for a consolidation pass.
43 *
44 * A user is due when:
45 * - consolidation_enabled = true
46 * - consolidation_interval_minutes is a positive number
47 * - consolidation_last_pass_at is null (never run) OR
48 * (last_pass_at + interval_minutes * 60_000) <= nowMs
49 *
50 * @param {object} user - Billing user record
51 * @param {number} nowMs - Current epoch ms (injectable for testing)
52 * @returns {boolean}
53 */
54 export function isUserDue(user, nowMs) {
55 if (!user || !user.consolidation_enabled) return false;
56 const interval = Number(user.consolidation_interval_minutes);
57 if (!interval || !Number.isFinite(interval) || interval <= 0) return false;
58 if (!user.consolidation_last_pass_at) return true;
59 const lastPass = new Date(user.consolidation_last_pass_at).getTime();
60 if (!Number.isFinite(lastPass)) return true;
61 return lastPass + interval * 60_000 <= nowMs;
62 }
63
64 /**
65 * Signs a short-lived (5-minute) service JWT for server-to-server bridge calls.
66 * The bridge reads payload.sub as the user ID via jwt.verify(token, SESSION_SECRET).
67 *
68 * @param {string} userId - Billing user ID (becomes JWT sub claim)
69 * @param {string} secret - SESSION_SECRET
70 * @returns {string} Signed JWT
71 */
72 export function signServiceJwt(userId, secret) {
73 return jwt.sign({ sub: userId, role: 'service' }, secret, { expiresIn: '5m' });
74 }
75
76 /**
77 * Core scheduler logic — fully dependency-injected for testability.
78 *
79 * @param {object} opts
80 * @param {string} opts.sessionSecret - JWT signing secret
81 * @param {string} opts.bridgeUrl - Bridge origin URL
82 * @param {number} opts.maxUsersPerRun - Max users to process per invocation
83 * @param {boolean} opts.billingEnforce - If false, shadow-log only (no bridge calls)
84 * @param {number} opts.nowMs - Current epoch ms (injectable for testing)
85 * @param {Function} opts.loadDb - Loads the billing DB
86 * @param {Function} opts.mutateDb - Mutates and saves the billing DB
87 * @param {Function} opts.fetchFn - fetch implementation (injectable for testing)
88 * @returns {Promise<object>} Summary: pass_count, skipped_not_enabled, skipped_not_due, capped, errors, shadow_mode
89 */
90 export async function runScheduler({
91 sessionSecret = SESSION_SECRET,
92 bridgeUrl = BRIDGE_URL,
93 maxUsersPerRun = MAX_USERS_PER_RUN,
94 billingEnforce = process.env.BILLING_ENFORCE === 'true',
95 nowMs = Date.now(),
96 loadDb = loadBillingDb,
97 mutateDb = mutateBillingDb,
98 fetchFn = globalThis.fetch,
99 } = {}) {
100 const summary = {
101 pass_count: 0,
102 skipped_not_enabled: 0,
103 skipped_not_due: 0,
104 capped: 0,
105 errors: [],
106 shadow_mode: !billingEnforce,
107 };
108
109 if (!sessionSecret) throw new Error('SESSION_SECRET is not configured');
110 if (!bridgeUrl) throw new Error('BRIDGE_URL is not configured');
111
112 const db = await loadDb();
113 const allUsers = Object.values(db.users);
114
115 const enabledUsers = allUsers.filter(u => u.consolidation_enabled);
116 summary.skipped_not_enabled = allUsers.length - enabledUsers.length;
117
118 const dueUsers = enabledUsers.filter(u => isUserDue(u, nowMs));
119 summary.skipped_not_due = enabledUsers.length - dueUsers.length;
120
121 // Respect the per-invocation cap to bound total runtime.
122 const batch = dueUsers.slice(0, maxUsersPerRun);
123 summary.capped = dueUsers.length - batch.length;
124
125 for (const user of batch) {
126 const userId = user.user_id;
127
128 if (!billingEnforce) {
129 // Shadow-log mode: record what would have been triggered, but do not call
130 // the bridge and do not update consolidation_last_pass_at.
131 summary.pass_count++;
132 console.log(
133 JSON.stringify({
134 type: 'knowtation_billing_shadow',
135 source: 'consolidation_scheduler',
136 user_id: userId,
137 would_trigger: true,
138 ts: new Date(nowMs).toISOString(),
139 }),
140 );
141 continue;
142 }
143
144 try {
145 // Issue a fresh 5-minute JWT per user — never reuse or forward stored credentials.
146 const token = signServiceJwt(userId, sessionSecret);
147
148 const uNorm = normalizeBillingUser({ ...user });
149 const consolidatePayload = mergeConsolidateRequestBodyWithBillingDefaults(
150 { passes: uNorm.consolidation_passes || undefined },
151 uNorm,
152 );
153
154 const res = await fetchFn(`${bridgeUrl}/api/v1/memory/consolidate`, {
155 method: 'POST',
156 headers: {
157 Authorization: `Bearer ${token}`,
158 'Content-Type': 'application/json',
159 },
160 body: JSON.stringify(consolidatePayload),
161 signal: AbortSignal.timeout(25_000),
162 });
163
164 if (!res.ok) {
165 const body = await res.text().catch(() => '');
166 throw new Error(`Bridge responded ${res.status}: ${body.slice(0, 300)}`);
167 }
168
169 const data = await res.json();
170
171 // Stamp last_pass_at so this user is not triggered again until their interval elapses.
172 await mutateDb(dbMut => {
173 const u = dbMut.users[userId];
174 if (u) u.consolidation_last_pass_at = new Date(nowMs).toISOString();
175 });
176
177 summary.pass_count++;
178 console.log(
179 JSON.stringify({
180 type: 'knowtation_scheduler_pass',
181 user_id: userId,
182 topics: data.topics,
183 total_events: data.total_events,
184 cost_usd: data.cost_usd,
185 pass_id: data.pass_id,
186 ts: new Date(nowMs).toISOString(),
187 }),
188 );
189 } catch (e) {
190 // Catch per-user errors without aborting the rest of the run.
191 const errMsg = e?.message ?? String(e);
192 summary.errors.push({ user_id: userId, error: errMsg });
193 console.error(
194 JSON.stringify({
195 type: 'knowtation_scheduler_error',
196 user_id: userId,
197 error: errMsg,
198 ts: new Date(nowMs).toISOString(),
199 }),
200 );
201 }
202 }
203
204 return summary;
205 }
206
207 /**
208 * Netlify Scheduled Function entry point.
209 * Runs every hour (config.schedule = '0 * * * *').
210 * Scheduled functions on Netlify are invoked without an HTTP request;
211 * the req parameter is a synthetic Request object.
212 */
213 export default async (_req) => {
214 const startMs = Date.now();
215
216 // Set up Netlify Blob store so billing-store.mjs can load/save the billing DB.
217 // In Netlify Functions v2 (export default), getStore() works without connectLambda.
218 // In local dev, this throws; billing-store.mjs falls back to file-based storage.
219 let blobStoreSet = false;
220 try {
221 globalThis.__knowtation_gateway_blob = getStore({
222 name: 'gateway-billing',
223 consistency: 'strong',
224 });
225 blobStoreSet = true;
226 } catch (_) {
227 // Local / non-Netlify environment: billing-store.mjs uses data/hosted_billing.json.
228 }
229
230 try {
231 const summary = await runScheduler();
232 summary.elapsed_ms = Date.now() - startMs;
233 console.log(JSON.stringify({ type: 'knowtation_scheduler_summary', ...summary }));
234 } catch (e) {
235 console.error(
236 JSON.stringify({
237 type: 'knowtation_scheduler_fatal',
238 error: e?.message ?? String(e),
239 elapsed_ms: Date.now() - startMs,
240 }),
241 );
242 } finally {
243 if (blobStoreSet) delete globalThis.__knowtation_gateway_blob;
244 }
245
246 return new Response('ok');
247 };
File History 2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor 1 day ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6 docs: accept Calendar Events v0 spec with Phase 0 security … Human 2 days ago