bridge-index-job-lock.mjs
184 lines 7.4 KB
Raw
sha256:94ec65bd2b200240ac785a97cf14c5db066832bd608a24d6a9c151f17b918b02 feat(calendar): hosted bridge/gateway route parity and time… Human minor ⚠ breaking 1 day ago
1 /**
2 * Background-index job lock for `hub/bridge/server.mjs POST /api/v1/index`.
3 *
4 * When the preflight (`lib/bridge-index-preflight-estimate.mjs`) decides a re-index
5 * is too big for the synchronous path, the bridge writes a lock record to Netlify
6 * Blobs, kicks off the `bridge-index-background` Netlify Function, and returns 202
7 * to the client. The lock prevents:
8 * 1. **Double-clicks** — a second `POST /api/v1/index` arriving while the
9 * background job is still running returns 409 instead of starting another
10 * DeepInfra-billed re-embed of the same vault.
11 * 2. **Stale locks blocking forever** — if the background function crashes,
12 * every lock has a TTL (default 16 min, just past the 15-min Netlify
13 * background-function cap) after which `acquireJobLock` will overwrite it.
14 *
15 * Storage shape (Netlify Blob): one record per `(canisterUid, vaultId)` pair at
16 * key `index-jobs/${canisterUid}/${vaultId}.json`. Keep the schema small and
17 * append-only so older bridge deploys can still read newer lock records during
18 * a rolling deploy.
19 *
20 * Pure-ish module: takes a `blobStore` arg shaped like the Netlify `@netlify/blobs`
21 * `getStore({...})` return value (`get(key, { type })`, `set(key, value)`,
22 * `delete(key)`). Tests inject an in-memory implementation; production uses the
23 * real Netlify store mounted by `netlify/functions/bridge.mjs`.
24 */
25
26 import crypto from 'crypto';
27
28 /**
29 * TTL for a lock record. Netlify background functions max out at 15 min; the
30 * extra minute lets a slow finalize-on-success path still find its own lock to
31 * release without racing against a fresh re-index attempt.
32 */
33 export const JOB_LOCK_TTL_MS = 16 * 60 * 1000;
34
35 /**
36 * Build the canonical Blob key for a vault's job lock. Centralized so callers
37 * can't accidentally collide with each other (e.g. `index-jobs/foo/bar` vs
38 * `/index-jobs/foo/bar`) or read a stale legacy key.
39 *
40 * @param {string} canisterUid - Sanitized canister user id.
41 * @param {string} vaultId - Sanitized vault id.
42 * @returns {string}
43 */
44 export function jobLockKey(canisterUid, vaultId) {
45 if (typeof canisterUid !== 'string' || canisterUid === '') {
46 throw new TypeError('jobLockKey: canisterUid must be a non-empty string');
47 }
48 if (typeof vaultId !== 'string' || vaultId === '') {
49 throw new TypeError('jobLockKey: vaultId must be a non-empty string');
50 }
51 return `index-jobs/${canisterUid}/${vaultId}.json`;
52 }
53
54 /**
55 * Try to acquire a lock for `(canisterUid, vaultId)`. Returns `{ acquired: true,
56 * jobId, record }` on success, or `{ acquired: false, existing }` when a valid
57 * (non-expired) lock is already held by another in-flight background job.
58 *
59 * Stale locks (where `now > expiresAt`) are silently overwritten — this is the
60 * only safe recovery path when the previous background function crashed before
61 * it could call `releaseJobLock`. Without overwrite-on-stale, a single crash
62 * would block every future re-index until manual intervention.
63 *
64 * @param {{ get: Function, set: Function, delete: Function }} blobStore
65 * @param {{
66 * canisterUid: string,
67 * vaultId: string,
68 * actorUid?: string,
69 * chunksToEmbed?: number,
70 * estimatedSeconds?: number,
71 * reason?: string,
72 * ttlMs?: number,
73 * now?: () => number,
74 * randomUUID?: () => string,
75 * }} opts
76 * @returns {Promise<
77 * | { acquired: true, jobId: string, record: object }
78 * | { acquired: false, existing: object }
79 * >}
80 */
81 export async function acquireJobLock(blobStore, opts) {
82 if (!blobStore || typeof blobStore.set !== 'function' || typeof blobStore.get !== 'function') {
83 throw new TypeError('acquireJobLock: blobStore with get/set is required');
84 }
85 if (opts == null || typeof opts !== 'object') {
86 throw new TypeError('acquireJobLock: opts is required');
87 }
88 const { canisterUid, vaultId } = opts;
89 const ttlMs = Number.isFinite(opts.ttlMs) && opts.ttlMs > 0 ? opts.ttlMs : JOB_LOCK_TTL_MS;
90 const now = typeof opts.now === 'function' ? opts.now : Date.now;
91 const randomUUID =
92 typeof opts.randomUUID === 'function' ? opts.randomUUID : () => crypto.randomUUID();
93
94 const key = jobLockKey(canisterUid, vaultId);
95 const tNow = now();
96 const existing = await readLockRecord(blobStore, key);
97 if (existing && Number.isFinite(existing.expiresAt) && existing.expiresAt > tNow) {
98 return { acquired: false, existing };
99 }
100
101 const jobId = randomUUID();
102 const record = {
103 jobId,
104 canisterUid,
105 vaultId,
106 actorUid: typeof opts.actorUid === 'string' ? opts.actorUid : null,
107 chunksToEmbed: Number.isFinite(opts.chunksToEmbed) ? opts.chunksToEmbed : null,
108 estimatedSeconds: Number.isFinite(opts.estimatedSeconds) ? opts.estimatedSeconds : null,
109 reason: typeof opts.reason === 'string' ? opts.reason : null,
110 startedAt: tNow,
111 expiresAt: tNow + ttlMs,
112 };
113 await blobStore.set(key, JSON.stringify(record));
114 return { acquired: true, jobId, record };
115 }
116
117 /**
118 * Release the lock for `(canisterUid, vaultId)`. If the lock has already been
119 * overwritten by a fresher acquire (different jobId), we DO NOT delete it —
120 * that would clobber the in-flight job. Pass `expectedJobId` to opt into this
121 * "release my lock only" semantics; pass nothing to delete unconditionally
122 * (used by admin/operator recovery paths).
123 *
124 * @param {{ get: Function, set: Function, delete: Function }} blobStore
125 * @param {{ canisterUid: string, vaultId: string, expectedJobId?: string }} opts
126 * @returns {Promise<{ released: boolean, reason?: string }>}
127 */
128 export async function releaseJobLock(blobStore, opts) {
129 if (!blobStore || typeof blobStore.delete !== 'function') {
130 throw new TypeError('releaseJobLock: blobStore with delete is required');
131 }
132 if (opts == null || typeof opts !== 'object') {
133 throw new TypeError('releaseJobLock: opts is required');
134 }
135 const { canisterUid, vaultId, expectedJobId } = opts;
136 const key = jobLockKey(canisterUid, vaultId);
137 if (typeof expectedJobId === 'string' && expectedJobId !== '') {
138 const current = await readLockRecord(blobStore, key);
139 if (current && current.jobId !== expectedJobId) {
140 return { released: false, reason: 'lock_owned_by_other_job' };
141 }
142 if (!current) {
143 return { released: false, reason: 'lock_already_gone' };
144 }
145 }
146 await blobStore.delete(key);
147 return { released: true };
148 }
149
150 /**
151 * Read the current lock record without modifying it. Returns `null` when no
152 * lock exists or the record is malformed. Does NOT auto-clear stale records —
153 * `acquireJobLock` does that on the write path so reads stay side-effect-free.
154 *
155 * @param {{ get: Function }} blobStore
156 * @param {{ canisterUid: string, vaultId: string }} opts
157 * @returns {Promise<object|null>}
158 */
159 export async function peekJobLock(blobStore, opts) {
160 if (!blobStore || typeof blobStore.get !== 'function') {
161 throw new TypeError('peekJobLock: blobStore with get is required');
162 }
163 if (opts == null || typeof opts !== 'object') {
164 throw new TypeError('peekJobLock: opts is required');
165 }
166 const { canisterUid, vaultId } = opts;
167 const key = jobLockKey(canisterUid, vaultId);
168 return await readLockRecord(blobStore, key);
169 }
170
171 async function readLockRecord(blobStore, key) {
172 let raw;
173 try {
174 raw = await blobStore.get(key, { type: 'text' });
175 } catch (_) {
176 return null;
177 }
178 if (!raw || typeof raw !== 'string') return null;
179 try {
180 const parsed = JSON.parse(raw);
181 if (parsed && typeof parsed === 'object') return parsed;
182 } catch (_) {}
183 return null;
184 }
File History 1 commit
sha256:94ec65bd2b200240ac785a97cf14c5db066832bd608a24d6a9c151f17b918b02 feat(calendar): hosted bridge/gateway route parity and time… Human minor 1 day ago