parallel-embed-pool.mjs
129 lines 5.2 KB
Raw
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor ⚠ breaking 2 days ago
1 /**
2 * Bounded-concurrency worker pool for embedding (or any other) batches.
3 *
4 * Why this exists: hub/bridge/server.mjs `POST /api/v1/index` historically embedded
5 * batches strictly sequentially. After the OpenAI → DeepInfra (BAAI/bge-large-en-v1.5)
6 * switch, per-batch latency roughly doubled (median 2.5s, tails 5–8.5s) and the
7 * sequential loop on a 251-chunk vault exceeded Netlify's 60s synchronous-function
8 * cap (see `hub/bridge/index-timing.mjs` post-mortem).
9 *
10 * `runWithConcurrency` lets the bridge fan out N requests to the embedding provider
11 * in parallel while preserving:
12 * - input order in the returned array (so we can zip back to chunks);
13 * - "fail fast" semantics: as soon as one task throws, no new tasks are scheduled
14 * and `Promise.all` rejects with the first observed error;
15 * - stable concurrency (never more than `concurrency` workers in flight).
16 *
17 * No third-party dependency: this lives at the cold-start path of the bridge
18 * Netlify Function, where every extra dep adds bundle size and load time.
19 */
20
21 /**
22 * Run `tasks` with at most `concurrency` in flight. Returns results in the same
23 * order as `tasks`. If any task throws, the returned promise rejects with the
24 * first error and remaining unstarted tasks are skipped (already-started tasks
25 * are allowed to settle to avoid leaking unhandled rejections).
26 *
27 * @template T
28 * @param {Array<() => Promise<T>>} tasks - Each task is a thunk that returns a Promise.
29 * Using thunks (not pre-started promises) is what enables true concurrency capping;
30 * if you pass `Promise[]` instead, all of them start immediately and `concurrency`
31 * becomes a no-op.
32 * @param {{ concurrency?: number, onSettled?: (info: { index: number, ok: boolean, error?: unknown, ms: number }) => void }} [options]
33 * - `concurrency`: max workers in flight. Coerced to integer; floors at 1, ceils at `tasks.length`.
34 * - `onSettled`: optional per-task callback invoked after each task settles. Used by the
35 * bridge to feed per-batch timing into `createIndexTimer.step('embed_batch', ...)`.
36 * @returns {Promise<T[]>} Results in `tasks`-input order.
37 */
38 export async function runWithConcurrency(tasks, options = {}) {
39 if (!Array.isArray(tasks)) {
40 throw new TypeError('runWithConcurrency: tasks must be an array of thunks');
41 }
42 for (let i = 0; i < tasks.length; i++) {
43 if (typeof tasks[i] !== 'function') {
44 throw new TypeError(
45 `runWithConcurrency: tasks[${i}] must be a thunk (() => Promise), got ${typeof tasks[i]}. ` +
46 'If you have an array of pre-started promises, wrap each: tasks.map((p) => () => p).',
47 );
48 }
49 }
50 if (tasks.length === 0) return [];
51
52 const rawConcurrency = options.concurrency;
53 let concurrency = Number.isFinite(rawConcurrency) ? Math.floor(Number(rawConcurrency)) : 1;
54 if (concurrency < 1) concurrency = 1;
55 if (concurrency > tasks.length) concurrency = tasks.length;
56
57 const onSettled = typeof options.onSettled === 'function' ? options.onSettled : null;
58
59 const results = new Array(tasks.length);
60 let nextIndex = 0;
61 let firstError = null;
62
63 async function worker() {
64 while (true) {
65 if (firstError) return;
66 const myIndex = nextIndex++;
67 if (myIndex >= tasks.length) return;
68 const startedAt = Date.now();
69 try {
70 const value = await tasks[myIndex]();
71 const ms = Date.now() - startedAt;
72 results[myIndex] = value;
73 if (onSettled) {
74 try {
75 onSettled({ index: myIndex, ok: true, ms });
76 } catch (_) {
77 // onSettled is observability-only; never let a logger bug fail the index.
78 }
79 }
80 } catch (err) {
81 const ms = Date.now() - startedAt;
82 if (!firstError) firstError = err;
83 if (onSettled) {
84 try {
85 onSettled({ index: myIndex, ok: false, error: err, ms });
86 } catch (_) {}
87 }
88 }
89 }
90 }
91
92 const workers = [];
93 for (let i = 0; i < concurrency; i++) workers.push(worker());
94 await Promise.all(workers);
95 if (firstError) throw firstError;
96 return results;
97 }
98
99 /**
100 * Parse `INDEXER_EMBED_CONCURRENCY` (env or override) and clamp to a safe range.
101 * Default 5: balances DeepInfra rate-limit headroom with wall-clock speedup for
102 * the bridge's 60s sync cap. Hard ceiling 16 so an env typo can't fork 1000 sockets.
103 *
104 * @param {string|number|null|undefined} raw
105 * @returns {number}
106 */
107 export function parseEmbedConcurrency(raw) {
108 if (raw == null || raw === '') return 5;
109 const n = typeof raw === 'number' ? raw : parseInt(String(raw).trim(), 10);
110 if (!Number.isFinite(n) || n < 1) return 5;
111 if (n > 16) return 16;
112 return Math.floor(n);
113 }
114
115 /**
116 * Parse `INDEXER_EMBED_BATCH_SIZE`. Default 50 (DeepInfra/OpenAI both accept ≥50
117 * inputs per `/v1/embeddings` request without payload-size issues for 2KB chunks).
118 * Hard ceiling 256 so accidental "1000" doesn't blow past provider per-request limits.
119 *
120 * @param {string|number|null|undefined} raw
121 * @returns {number}
122 */
123 export function parseEmbedBatchSize(raw) {
124 if (raw == null || raw === '') return 50;
125 const n = typeof raw === 'number' ? raw : parseInt(String(raw).trim(), 10);
126 if (!Number.isFinite(n) || n < 1) return 50;
127 if (n > 256) return 256;
128 return Math.floor(n);
129 }
File History 2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor 2 days ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6 docs: accept Calendar Events v0 spec with Phase 0 security … Human 2 days ago