parallel-embed-pool.test.mjs
184 lines 6.1 KB
Raw
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor ⚠ breaking 1 day ago
1 /**
2 * Tests for `lib/parallel-embed-pool.mjs`. The bridge `POST /api/v1/index` will call
3 * `runWithConcurrency` with embed-batch thunks, so the contract that matters here is:
4 * 1. Order: results[i] === <return value of tasks[i]()>.
5 * 2. Concurrency cap: never more than N in flight at once (verified with a counter).
6 * 3. Fail-fast: first error rejects; remaining unstarted tasks are skipped.
7 * 4. onSettled callback fires per task with index, ok, ms.
8 * 5. Edge cases: empty array, concurrency 0/NaN/string env values, pre-started promises.
9 */
10
11 import { describe, it } from 'node:test';
12 import assert from 'node:assert/strict';
13 import {
14 runWithConcurrency,
15 parseEmbedConcurrency,
16 parseEmbedBatchSize,
17 } from '../lib/parallel-embed-pool.mjs';
18
19 function delay(ms) {
20 return new Promise((resolve) => setTimeout(resolve, ms));
21 }
22
23 describe('runWithConcurrency', () => {
24 it('preserves input order in results', async () => {
25 const tasks = [10, 5, 15, 1, 20].map((n, i) => async () => {
26 await delay(n);
27 return i;
28 });
29 const out = await runWithConcurrency(tasks, { concurrency: 3 });
30 assert.deepEqual(out, [0, 1, 2, 3, 4]);
31 });
32
33 it('respects the concurrency cap (never more than N in flight)', async () => {
34 let inFlight = 0;
35 let maxObserved = 0;
36 const tasks = Array.from({ length: 20 }, (_, i) => async () => {
37 inFlight++;
38 if (inFlight > maxObserved) maxObserved = inFlight;
39 await delay(15);
40 inFlight--;
41 return i;
42 });
43 const concurrency = 4;
44 const out = await runWithConcurrency(tasks, { concurrency });
45 assert.equal(out.length, 20);
46 assert.ok(
47 maxObserved <= concurrency,
48 `maxObserved=${maxObserved} exceeded cap=${concurrency}`,
49 );
50 assert.ok(maxObserved >= 2, 'expected actual parallelism, not just sequential');
51 });
52
53 it('returns immediately on empty tasks array', async () => {
54 const out = await runWithConcurrency([], { concurrency: 5 });
55 assert.deepEqual(out, []);
56 });
57
58 it('fails fast on first error and does not start tasks scheduled after the failure', async () => {
59 let startedAfterFailure = 0;
60 const failureIndex = 1;
61 const tasks = Array.from({ length: 10 }, (_, i) => async () => {
62 if (i === failureIndex) {
63 throw new Error('boom-' + i);
64 }
65 // Tasks 0 + 1 should start (concurrency=2). After 1 throws, 2..9 must not start.
66 if (i > failureIndex + 1) {
67 startedAfterFailure++;
68 }
69 await delay(20);
70 return i;
71 });
72 await assert.rejects(
73 () => runWithConcurrency(tasks, { concurrency: 2 }),
74 /boom-1/,
75 );
76 assert.equal(
77 startedAfterFailure,
78 0,
79 'tasks scheduled after the failure should be skipped',
80 );
81 });
82
83 it('invokes onSettled per task with index, ok, ms', async () => {
84 const events = [];
85 const tasks = [
86 async () => {
87 await delay(5);
88 return 'a';
89 },
90 async () => {
91 await delay(5);
92 throw new Error('nope');
93 },
94 ];
95 await assert.rejects(
96 () =>
97 runWithConcurrency(tasks, {
98 concurrency: 2,
99 onSettled: (info) => events.push(info),
100 }),
101 /nope/,
102 );
103 assert.equal(events.length, 2);
104 const byIndex = events.sort((a, b) => a.index - b.index);
105 assert.equal(byIndex[0].ok, true);
106 assert.equal(byIndex[0].index, 0);
107 assert.ok(typeof byIndex[0].ms === 'number' && byIndex[0].ms >= 0);
108 assert.equal(byIndex[1].ok, false);
109 assert.match(String(byIndex[1].error?.message || ''), /nope/);
110 });
111
112 it('does not crash if onSettled throws (observability never fails the index)', async () => {
113 const tasks = [async () => 1, async () => 2];
114 const out = await runWithConcurrency(tasks, {
115 concurrency: 2,
116 onSettled: () => {
117 throw new Error('logger crash');
118 },
119 });
120 assert.deepEqual(out, [1, 2]);
121 });
122
123 it('clamps invalid concurrency: NaN, 0, negative → 1; > tasks.length → tasks.length', async () => {
124 const tasks = [async () => 1, async () => 2, async () => 3];
125 assert.deepEqual(await runWithConcurrency(tasks, { concurrency: 0 }), [1, 2, 3]);
126 assert.deepEqual(await runWithConcurrency(tasks, { concurrency: -5 }), [1, 2, 3]);
127 assert.deepEqual(await runWithConcurrency(tasks, { concurrency: NaN }), [1, 2, 3]);
128 assert.deepEqual(await runWithConcurrency(tasks, { concurrency: 9999 }), [1, 2, 3]);
129 });
130
131 it('rejects non-thunk inputs with a descriptive error pointing at the bad index', async () => {
132 await assert.rejects(
133 () => runWithConcurrency([async () => 1, Promise.resolve(2)], { concurrency: 2 }),
134 /tasks\[1\] must be a thunk/,
135 );
136 });
137 });
138
139 describe('parseEmbedConcurrency', () => {
140 it('defaults to 5 when missing or empty', () => {
141 assert.equal(parseEmbedConcurrency(null), 5);
142 assert.equal(parseEmbedConcurrency(undefined), 5);
143 assert.equal(parseEmbedConcurrency(''), 5);
144 });
145
146 it('parses string env values', () => {
147 assert.equal(parseEmbedConcurrency('3'), 3);
148 assert.equal(parseEmbedConcurrency(' 8 '), 8);
149 });
150
151 it('rejects garbage and falls back to default 5', () => {
152 assert.equal(parseEmbedConcurrency('abc'), 5);
153 assert.equal(parseEmbedConcurrency('-3'), 5);
154 assert.equal(parseEmbedConcurrency('0'), 5);
155 });
156
157 it('clamps absurdly large values to ceiling 16', () => {
158 assert.equal(parseEmbedConcurrency('1000'), 16);
159 assert.equal(parseEmbedConcurrency(17), 16);
160 });
161 });
162
163 describe('parseEmbedBatchSize', () => {
164 it('defaults to 50 when missing/empty', () => {
165 assert.equal(parseEmbedBatchSize(null), 50);
166 assert.equal(parseEmbedBatchSize(''), 50);
167 });
168
169 it('parses valid values', () => {
170 assert.equal(parseEmbedBatchSize('20'), 20);
171 assert.equal(parseEmbedBatchSize(75), 75);
172 });
173
174 it('rejects garbage and falls back to default 50', () => {
175 assert.equal(parseEmbedBatchSize('xyz'), 50);
176 assert.equal(parseEmbedBatchSize('-1'), 50);
177 assert.equal(parseEmbedBatchSize('0'), 50);
178 });
179
180 it('clamps to 256 to keep within provider per-request limits', () => {
181 assert.equal(parseEmbedBatchSize('500'), 256);
182 assert.equal(parseEmbedBatchSize(1024), 256);
183 });
184 });
File History 2 commits
sha256:65ccb454656ea5acdea0a10e559b78bcde1eb6ff753ecc2911bc99d1c3d7cadd feat(calendar): enforce agent context tiers in retrieval AP… Human minor 1 day ago
sha256:9103f98c89257ed2b01c237cea895dabb3e85ea337dccb1161c175e4422355b6 docs: accept Calendar Events v0 spec with Phase 0 security … Human 1 day ago