parallel-embed-pool.mjs
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd
feat(calendar): enforce agent context tiers in retrieval AP…
Human
minor
⚠ breaking
2 days ago
| 1 | /** |
| 2 | * Bounded-concurrency worker pool for embedding (or any other) batches. |
| 3 | * |
| 4 | * Why this exists: hub/bridge/server.mjs `POST /api/v1/index` historically embedded |
| 5 | * batches strictly sequentially. After the OpenAI → DeepInfra (BAAI/bge-large-en-v1.5) |
| 6 | * switch, per-batch latency roughly doubled (median 2.5s, tails 5–8.5s) and the |
| 7 | * sequential loop on a 251-chunk vault exceeded Netlify's 60s synchronous-function |
| 8 | * cap (see `hub/bridge/index-timing.mjs` post-mortem). |
| 9 | * |
| 10 | * `runWithConcurrency` lets the bridge fan out N requests to the embedding provider |
| 11 | * in parallel while preserving: |
| 12 | * - input order in the returned array (so we can zip back to chunks); |
| 13 | * - "fail fast" semantics: as soon as one task throws, no new tasks are scheduled |
| 14 | * and `Promise.all` rejects with the first observed error; |
| 15 | * - stable concurrency (never more than `concurrency` workers in flight). |
| 16 | * |
| 17 | * No third-party dependency: this lives at the cold-start path of the bridge |
| 18 | * Netlify Function, where every extra dep adds bundle size and load time. |
| 19 | */ |
| 20 | |
| 21 | /** |
| 22 | * Run `tasks` with at most `concurrency` in flight. Returns results in the same |
| 23 | * order as `tasks`. If any task throws, the returned promise rejects with the |
| 24 | * first error and remaining unstarted tasks are skipped (already-started tasks |
| 25 | * are allowed to settle to avoid leaking unhandled rejections). |
| 26 | * |
| 27 | * @template T |
| 28 | * @param {Array<() => Promise<T>>} tasks - Each task is a thunk that returns a Promise. |
| 29 | * Using thunks (not pre-started promises) is what enables true concurrency capping; |
| 30 | * if you pass `Promise[]` instead, all of them start immediately and `concurrency` |
| 31 | * becomes a no-op. |
| 32 | * @param {{ concurrency?: number, onSettled?: (info: { index: number, ok: boolean, error?: unknown, ms: number }) => void }} [options] |
| 33 | * - `concurrency`: max workers in flight. Coerced to integer; floors at 1, ceils at `tasks.length`. |
| 34 | * - `onSettled`: optional per-task callback invoked after each task settles. Used by the |
| 35 | * bridge to feed per-batch timing into `createIndexTimer.step('embed_batch', ...)`. |
| 36 | * @returns {Promise<T[]>} Results in `tasks`-input order. |
| 37 | */ |
| 38 | export async function runWithConcurrency(tasks, options = {}) { |
| 39 | if (!Array.isArray(tasks)) { |
| 40 | throw new TypeError('runWithConcurrency: tasks must be an array of thunks'); |
| 41 | } |
| 42 | for (let i = 0; i < tasks.length; i++) { |
| 43 | if (typeof tasks[i] !== 'function') { |
| 44 | throw new TypeError( |
| 45 | `runWithConcurrency: tasks[${i}] must be a thunk (() => Promise), got ${typeof tasks[i]}. ` + |
| 46 | 'If you have an array of pre-started promises, wrap each: tasks.map((p) => () => p).', |
| 47 | ); |
| 48 | } |
| 49 | } |
| 50 | if (tasks.length === 0) return []; |
| 51 | |
| 52 | const rawConcurrency = options.concurrency; |
| 53 | let concurrency = Number.isFinite(rawConcurrency) ? Math.floor(Number(rawConcurrency)) : 1; |
| 54 | if (concurrency < 1) concurrency = 1; |
| 55 | if (concurrency > tasks.length) concurrency = tasks.length; |
| 56 | |
| 57 | const onSettled = typeof options.onSettled === 'function' ? options.onSettled : null; |
| 58 | |
| 59 | const results = new Array(tasks.length); |
| 60 | let nextIndex = 0; |
| 61 | let firstError = null; |
| 62 | |
| 63 | async function worker() { |
| 64 | while (true) { |
| 65 | if (firstError) return; |
| 66 | const myIndex = nextIndex++; |
| 67 | if (myIndex >= tasks.length) return; |
| 68 | const startedAt = Date.now(); |
| 69 | try { |
| 70 | const value = await tasks[myIndex](); |
| 71 | const ms = Date.now() - startedAt; |
| 72 | results[myIndex] = value; |
| 73 | if (onSettled) { |
| 74 | try { |
| 75 | onSettled({ index: myIndex, ok: true, ms }); |
| 76 | } catch (_) { |
| 77 | // onSettled is observability-only; never let a logger bug fail the index. |
| 78 | } |
| 79 | } |
| 80 | } catch (err) { |
| 81 | const ms = Date.now() - startedAt; |
| 82 | if (!firstError) firstError = err; |
| 83 | if (onSettled) { |
| 84 | try { |
| 85 | onSettled({ index: myIndex, ok: false, error: err, ms }); |
| 86 | } catch (_) {} |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | const workers = []; |
| 93 | for (let i = 0; i < concurrency; i++) workers.push(worker()); |
| 94 | await Promise.all(workers); |
| 95 | if (firstError) throw firstError; |
| 96 | return results; |
| 97 | } |
| 98 | |
| 99 | /** |
| 100 | * Parse `INDEXER_EMBED_CONCURRENCY` (env or override) and clamp to a safe range. |
| 101 | * Default 5: balances DeepInfra rate-limit headroom with wall-clock speedup for |
| 102 | * the bridge's 60s sync cap. Hard ceiling 16 so an env typo can't fork 1000 sockets. |
| 103 | * |
| 104 | * @param {string|number|null|undefined} raw |
| 105 | * @returns {number} |
| 106 | */ |
| 107 | export function parseEmbedConcurrency(raw) { |
| 108 | if (raw == null || raw === '') return 5; |
| 109 | const n = typeof raw === 'number' ? raw : parseInt(String(raw).trim(), 10); |
| 110 | if (!Number.isFinite(n) || n < 1) return 5; |
| 111 | if (n > 16) return 16; |
| 112 | return Math.floor(n); |
| 113 | } |
| 114 | |
| 115 | /** |
| 116 | * Parse `INDEXER_EMBED_BATCH_SIZE`. Default 50 (DeepInfra/OpenAI both accept ≥50 |
| 117 | * inputs per `/v1/embeddings` request without payload-size issues for 2KB chunks). |
| 118 | * Hard ceiling 256 so accidental "1000" doesn't blow past provider per-request limits. |
| 119 | * |
| 120 | * @param {string|number|null|undefined} raw |
| 121 | * @returns {number} |
| 122 | */ |
| 123 | export function parseEmbedBatchSize(raw) { |
| 124 | if (raw == null || raw === '') return 50; |
| 125 | const n = typeof raw === 'number' ? raw : parseInt(String(raw).trim(), 10); |
| 126 | if (!Number.isFinite(n) || n < 1) return 50; |
| 127 | if (n > 256) return 256; |
| 128 | return Math.floor(n); |
| 129 | } |
File History
2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd
feat(calendar): enforce agent context tiers in retrieval AP…
Human
minor
⚠
2 days ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6
docs: accept Calendar Events v0 spec with Phase 0 security …
Human
2 days ago