consolidation-scheduler.mjs
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd
feat(calendar): enforce agent context tiers in retrieval AP…
Human
minor
⚠ breaking
1 day ago
| 1 | /** |
| 2 | * Netlify Scheduled Function — consolidation-scheduler |
| 3 | * |
| 4 | * Runs hourly. For each hosted user with consolidation_enabled=true whose |
| 5 | * (consolidation_last_pass_at + consolidation_interval_minutes) <= now, this |
| 6 | * function calls POST /api/v1/memory/consolidate on the bridge via a fresh |
| 7 | * short-lived service JWT signed with SESSION_SECRET. |
| 8 | * |
| 9 | * Each user's effective schedule is controlled by their own |
| 10 | * consolidation_interval_minutes setting (e.g. 60, 120, 1440 for daily). |
| 11 | * The cron runs hourly and only triggers users who are actually due. |
| 12 | * |
| 13 | * Env: |
| 14 | * SESSION_SECRET required — sign per-user service JWTs |
| 15 | * BRIDGE_URL required — bridge origin (no trailing slash) |
| 16 | * CONSOLIDATION_SCHEDULER_MAX_USERS_PER_RUN optional — cap per invocation (default 20) |
| 17 | * BILLING_ENFORCE optional — 'true' to actually call bridge; |
| 18 | * any other value = shadow-log mode (default) |
| 19 | * |
| 20 | * Safety: |
| 21 | * - Never forwards user credentials; always generates a fresh 5-minute JWT per user. |
| 22 | * - Per-user errors are caught and logged without aborting the rest of the run. |
| 23 | * - In shadow-log mode (BILLING_ENFORCE !== 'true'), logs what would have been triggered |
| 24 | * without calling the bridge or updating the billing DB. |
| 25 | */ |
| 26 | import { getStore } from '@netlify/blobs'; |
| 27 | import jwt from 'jsonwebtoken'; |
| 28 | import { loadBillingDb, mutateBillingDb } from '../../hub/gateway/billing-store.mjs'; |
| 29 | import { normalizeBillingUser } from '../../hub/gateway/billing-logic.mjs'; |
| 30 | import { mergeConsolidateRequestBodyWithBillingDefaults } from '../../lib/hosted-consolidation-advanced.mjs'; |
| 31 | |
| 32 | export const config = { schedule: '0 * * * *' }; |
| 33 | |
| 34 | const SESSION_SECRET = process.env.SESSION_SECRET || process.env.HUB_JWT_SECRET || ''; |
| 35 | const BRIDGE_URL = (process.env.BRIDGE_URL || '').replace(/\/+$/, ''); |
| 36 | const MAX_USERS_PER_RUN = (() => { |
| 37 | const v = parseInt(process.env.CONSOLIDATION_SCHEDULER_MAX_USERS_PER_RUN || '20', 10); |
| 38 | return Number.isFinite(v) && v > 0 ? v : 20; |
| 39 | })(); |
| 40 | |
| 41 | /** |
| 42 | * Returns true if the user is due for a consolidation pass. |
| 43 | * |
| 44 | * A user is due when: |
| 45 | * - consolidation_enabled = true |
| 46 | * - consolidation_interval_minutes is a positive number |
| 47 | * - consolidation_last_pass_at is null (never run) OR |
| 48 | * (last_pass_at + interval_minutes * 60_000) <= nowMs |
| 49 | * |
| 50 | * @param {object} user - Billing user record |
| 51 | * @param {number} nowMs - Current epoch ms (injectable for testing) |
| 52 | * @returns {boolean} |
| 53 | */ |
| 54 | export function isUserDue(user, nowMs) { |
| 55 | if (!user || !user.consolidation_enabled) return false; |
| 56 | const interval = Number(user.consolidation_interval_minutes); |
| 57 | if (!interval || !Number.isFinite(interval) || interval <= 0) return false; |
| 58 | if (!user.consolidation_last_pass_at) return true; |
| 59 | const lastPass = new Date(user.consolidation_last_pass_at).getTime(); |
| 60 | if (!Number.isFinite(lastPass)) return true; |
| 61 | return lastPass + interval * 60_000 <= nowMs; |
| 62 | } |
| 63 | |
| 64 | /** |
| 65 | * Signs a short-lived (5-minute) service JWT for server-to-server bridge calls. |
| 66 | * The bridge reads payload.sub as the user ID via jwt.verify(token, SESSION_SECRET). |
| 67 | * |
| 68 | * @param {string} userId - Billing user ID (becomes JWT sub claim) |
| 69 | * @param {string} secret - SESSION_SECRET |
| 70 | * @returns {string} Signed JWT |
| 71 | */ |
| 72 | export function signServiceJwt(userId, secret) { |
| 73 | return jwt.sign({ sub: userId, role: 'service' }, secret, { expiresIn: '5m' }); |
| 74 | } |
| 75 | |
| 76 | /** |
| 77 | * Core scheduler logic — fully dependency-injected for testability. |
| 78 | * |
| 79 | * @param {object} opts |
| 80 | * @param {string} opts.sessionSecret - JWT signing secret |
| 81 | * @param {string} opts.bridgeUrl - Bridge origin URL |
| 82 | * @param {number} opts.maxUsersPerRun - Max users to process per invocation |
| 83 | * @param {boolean} opts.billingEnforce - If false, shadow-log only (no bridge calls) |
| 84 | * @param {number} opts.nowMs - Current epoch ms (injectable for testing) |
| 85 | * @param {Function} opts.loadDb - Loads the billing DB |
| 86 | * @param {Function} opts.mutateDb - Mutates and saves the billing DB |
| 87 | * @param {Function} opts.fetchFn - fetch implementation (injectable for testing) |
| 88 | * @returns {Promise<object>} Summary: pass_count, skipped_not_enabled, skipped_not_due, capped, errors, shadow_mode |
| 89 | */ |
| 90 | export async function runScheduler({ |
| 91 | sessionSecret = SESSION_SECRET, |
| 92 | bridgeUrl = BRIDGE_URL, |
| 93 | maxUsersPerRun = MAX_USERS_PER_RUN, |
| 94 | billingEnforce = process.env.BILLING_ENFORCE === 'true', |
| 95 | nowMs = Date.now(), |
| 96 | loadDb = loadBillingDb, |
| 97 | mutateDb = mutateBillingDb, |
| 98 | fetchFn = globalThis.fetch, |
| 99 | } = {}) { |
| 100 | const summary = { |
| 101 | pass_count: 0, |
| 102 | skipped_not_enabled: 0, |
| 103 | skipped_not_due: 0, |
| 104 | capped: 0, |
| 105 | errors: [], |
| 106 | shadow_mode: !billingEnforce, |
| 107 | }; |
| 108 | |
| 109 | if (!sessionSecret) throw new Error('SESSION_SECRET is not configured'); |
| 110 | if (!bridgeUrl) throw new Error('BRIDGE_URL is not configured'); |
| 111 | |
| 112 | const db = await loadDb(); |
| 113 | const allUsers = Object.values(db.users); |
| 114 | |
| 115 | const enabledUsers = allUsers.filter(u => u.consolidation_enabled); |
| 116 | summary.skipped_not_enabled = allUsers.length - enabledUsers.length; |
| 117 | |
| 118 | const dueUsers = enabledUsers.filter(u => isUserDue(u, nowMs)); |
| 119 | summary.skipped_not_due = enabledUsers.length - dueUsers.length; |
| 120 | |
| 121 | // Respect the per-invocation cap to bound total runtime. |
| 122 | const batch = dueUsers.slice(0, maxUsersPerRun); |
| 123 | summary.capped = dueUsers.length - batch.length; |
| 124 | |
| 125 | for (const user of batch) { |
| 126 | const userId = user.user_id; |
| 127 | |
| 128 | if (!billingEnforce) { |
| 129 | // Shadow-log mode: record what would have been triggered, but do not call |
| 130 | // the bridge and do not update consolidation_last_pass_at. |
| 131 | summary.pass_count++; |
| 132 | console.log( |
| 133 | JSON.stringify({ |
| 134 | type: 'knowtation_billing_shadow', |
| 135 | source: 'consolidation_scheduler', |
| 136 | user_id: userId, |
| 137 | would_trigger: true, |
| 138 | ts: new Date(nowMs).toISOString(), |
| 139 | }), |
| 140 | ); |
| 141 | continue; |
| 142 | } |
| 143 | |
| 144 | try { |
| 145 | // Issue a fresh 5-minute JWT per user — never reuse or forward stored credentials. |
| 146 | const token = signServiceJwt(userId, sessionSecret); |
| 147 | |
| 148 | const uNorm = normalizeBillingUser({ ...user }); |
| 149 | const consolidatePayload = mergeConsolidateRequestBodyWithBillingDefaults( |
| 150 | { passes: uNorm.consolidation_passes || undefined }, |
| 151 | uNorm, |
| 152 | ); |
| 153 | |
| 154 | const res = await fetchFn(`${bridgeUrl}/api/v1/memory/consolidate`, { |
| 155 | method: 'POST', |
| 156 | headers: { |
| 157 | Authorization: `Bearer ${token}`, |
| 158 | 'Content-Type': 'application/json', |
| 159 | }, |
| 160 | body: JSON.stringify(consolidatePayload), |
| 161 | signal: AbortSignal.timeout(25_000), |
| 162 | }); |
| 163 | |
| 164 | if (!res.ok) { |
| 165 | const body = await res.text().catch(() => ''); |
| 166 | throw new Error(`Bridge responded ${res.status}: ${body.slice(0, 300)}`); |
| 167 | } |
| 168 | |
| 169 | const data = await res.json(); |
| 170 | |
| 171 | // Stamp last_pass_at so this user is not triggered again until their interval elapses. |
| 172 | await mutateDb(dbMut => { |
| 173 | const u = dbMut.users[userId]; |
| 174 | if (u) u.consolidation_last_pass_at = new Date(nowMs).toISOString(); |
| 175 | }); |
| 176 | |
| 177 | summary.pass_count++; |
| 178 | console.log( |
| 179 | JSON.stringify({ |
| 180 | type: 'knowtation_scheduler_pass', |
| 181 | user_id: userId, |
| 182 | topics: data.topics, |
| 183 | total_events: data.total_events, |
| 184 | cost_usd: data.cost_usd, |
| 185 | pass_id: data.pass_id, |
| 186 | ts: new Date(nowMs).toISOString(), |
| 187 | }), |
| 188 | ); |
| 189 | } catch (e) { |
| 190 | // Catch per-user errors without aborting the rest of the run. |
| 191 | const errMsg = e?.message ?? String(e); |
| 192 | summary.errors.push({ user_id: userId, error: errMsg }); |
| 193 | console.error( |
| 194 | JSON.stringify({ |
| 195 | type: 'knowtation_scheduler_error', |
| 196 | user_id: userId, |
| 197 | error: errMsg, |
| 198 | ts: new Date(nowMs).toISOString(), |
| 199 | }), |
| 200 | ); |
| 201 | } |
| 202 | } |
| 203 | |
| 204 | return summary; |
| 205 | } |
| 206 | |
| 207 | /** |
| 208 | * Netlify Scheduled Function entry point. |
| 209 | * Runs every hour (config.schedule = '0 * * * *'). |
| 210 | * Scheduled functions on Netlify are invoked without an HTTP request; |
| 211 | * the req parameter is a synthetic Request object. |
| 212 | */ |
| 213 | export default async (_req) => { |
| 214 | const startMs = Date.now(); |
| 215 | |
| 216 | // Set up Netlify Blob store so billing-store.mjs can load/save the billing DB. |
| 217 | // In Netlify Functions v2 (export default), getStore() works without connectLambda. |
| 218 | // In local dev, this throws; billing-store.mjs falls back to file-based storage. |
| 219 | let blobStoreSet = false; |
| 220 | try { |
| 221 | globalThis.__knowtation_gateway_blob = getStore({ |
| 222 | name: 'gateway-billing', |
| 223 | consistency: 'strong', |
| 224 | }); |
| 225 | blobStoreSet = true; |
| 226 | } catch (_) { |
| 227 | // Local / non-Netlify environment: billing-store.mjs uses data/hosted_billing.json. |
| 228 | } |
| 229 | |
| 230 | try { |
| 231 | const summary = await runScheduler(); |
| 232 | summary.elapsed_ms = Date.now() - startMs; |
| 233 | console.log(JSON.stringify({ type: 'knowtation_scheduler_summary', ...summary })); |
| 234 | } catch (e) { |
| 235 | console.error( |
| 236 | JSON.stringify({ |
| 237 | type: 'knowtation_scheduler_fatal', |
| 238 | error: e?.message ?? String(e), |
| 239 | elapsed_ms: Date.now() - startMs, |
| 240 | }), |
| 241 | ); |
| 242 | } finally { |
| 243 | if (blobStoreSet) delete globalThis.__knowtation_gateway_blob; |
| 244 | } |
| 245 | |
| 246 | return new Response('ok'); |
| 247 | }; |
File History
2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd
feat(calendar): enforce agent context tiers in retrieval AP…
Human
minor
⚠
1 day ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6
docs: accept Calendar Events v0 spec with Phase 0 security …
Human
2 days ago