flow-store.mjs
797 lines 24.9 KB
Raw
sha256:8915fe406161f95c1681f9469375e7bae5b28c884f00bedbdef65e4b0cd0738d docs(flow): commit FLOW-V0-SPEC.md hygiene for 7A-INT merge Human 13 hours ago
1 /**
2 * Local file-backed Flow store (Flow v0 — Phase 7A-10b, Option A calendar parity).
3 *
4 * Persists flow definitions, steps, runs, candidates, and projections per vault under
5 * data_dir. Read-only list/get in v0; idempotent starter seed on first read.
6 *
7 * @see docs/FLOW-STORE-CONTRACT-7A-10.md
8 * @see docs/FLOW-V0-SPEC.md
9 */
10
11 import fs from 'fs';
12 import path from 'path';
13 import { randomUUID } from 'crypto';
14 import { getRepoRoot } from '../repo-root.mjs';
15
16 export const FLOW_STORE_FILENAME = 'hub_flow_store.json';
17 export const STARTER_FLOWS_DIRNAME = 'flows/starter';
18 export const MAX_FLOW_SUMMARIES = 200;
19 export const MAX_STEPS_PER_FLOW = 100;
20
21 export const FLOW_ID_RE = /^flow_[a-z0-9_]{1,64}$/;
22 export const FLOW_STEP_ID_RE = /^flow_[a-z0-9_]{1,64}#[1-9][0-9]*$/;
23 export const FLOW_RUN_ID_RE = /^run_[a-z0-9_]{1,48}$/;
24 export const FLOW_CANDIDATE_ID_RE = /^cand_[a-z0-9]{4,32}$/;
25 export const SEMVER_RE = /^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$/;
26
27 /** @typedef {'personal'|'project'|'org'} FlowScope */
28
29 /**
30 * @typedef {Object} StoredFlow
31 * @property {'knowtation.flow/v0'} schema
32 * @property {string} flow_id
33 * @property {string} title
34 * @property {string} version
35 * @property {FlowScope} scope
36 * @property {string} summary
37 * @property {string[]} [tags]
38 * @property {string[]} steps
39 * @property {{ name: string, type: string, required: boolean }[]} [inputs]
40 * @property {string|null} [vault_mirror_path]
41 * @property {string} updated
42 * @property {boolean} truncated
43 */
44
45 /**
46 * @typedef {Object} StoredFlowStep
47 * @property {'knowtation.flow_step/v0'} schema
48 * @property {string} step_id
49 * @property {string} flow_id
50 * @property {number} ordinal
51 * @property {string} owned_job
52 * @property {string} instruction
53 * @property {string} trigger
54 * @property {string} when_not_to_run
55 * @property {{ kind: string, id: string }[]} [requires]
56 * @property {string[]} boundaries
57 * @property {{ kind: string, id: string }[]} [skill_refs]
58 * @property {{ name: string, from: string }[]} [inputs]
59 * @property {{ name: string, type: string }[]} [outputs]
60 * @property {string} output_shape
61 * @property {{ kind: string, evidence_required: boolean, description: string }} verification
62 * @property {'manual'|'agent_assisted'|'automatable'} automatable
63 */
64
65 /**
66 * @typedef {Object} VaultFlowStore
67 * @property {StoredFlow[]} flows
68 * @property {StoredFlowStep[]} steps
69 * @property {object[]} runs
70 * @property {object[]} candidates
71 * @property {object[]} projections
72 */
73
74 /**
75 * @typedef {Object} FlowStoreFile
76 * @property {Record<string, VaultFlowStore>} vaults
77 */
78
79 /**
80 * @param {string} dataDir
81 * @returns {string}
82 */
83 export function getFlowStorePath(dataDir) {
84 return path.join(dataDir, FLOW_STORE_FILENAME);
85 }
86
87 /**
88 * @param {string} dataDir
89 * @returns {FlowStoreFile}
90 */
91 export function loadFlowStore(dataDir) {
92 const filePath = getFlowStorePath(dataDir);
93 if (!fs.existsSync(filePath)) {
94 return { vaults: {} };
95 }
96 try {
97 const raw = fs.readFileSync(filePath, 'utf8');
98 const parsed = JSON.parse(raw);
99 if (!parsed || typeof parsed !== 'object' || !parsed.vaults || typeof parsed.vaults !== 'object') {
100 return { vaults: {} };
101 }
102 return /** @type {FlowStoreFile} */ (parsed);
103 } catch {
104 return { vaults: {} };
105 }
106 }
107
108 /**
109 * @param {string} dataDir
110 * @param {FlowStoreFile} store
111 */
112 export function saveFlowStore(dataDir, store) {
113 const filePath = getFlowStorePath(dataDir);
114 const dir = path.dirname(filePath);
115 if (!fs.existsSync(dir)) {
116 fs.mkdirSync(dir, { recursive: true });
117 }
118 const tmp = `${filePath}.${process.pid}.${randomUUID()}.tmp`;
119 fs.writeFileSync(tmp, JSON.stringify(store, null, 2), 'utf8');
120 fs.renameSync(tmp, filePath);
121 }
122
123 /**
124 * @param {string} dataDir
125 * @param {string} vaultId
126 * @returns {VaultFlowStore}
127 */
128 export function getVaultFlowStore(dataDir, vaultId) {
129 const store = loadFlowStore(dataDir);
130 if (!store.vaults[vaultId]) {
131 store.vaults[vaultId] = {
132 flows: [],
133 steps: [],
134 runs: [],
135 candidates: [],
136 projections: [],
137 };
138 }
139 return store.vaults[vaultId];
140 }
141
142 /**
143 * @param {string} flowId
144 * @param {number} ordinal
145 * @returns {string}
146 */
147 export function buildFlowStepId(flowId, ordinal) {
148 return `${flowId}#${ordinal}`;
149 }
150
151 /**
152 * @param {string} version
153 * @returns {[number, number, number]|null}
154 */
155 export function parseSemver(version) {
156 const m = SEMVER_RE.exec(version);
157 if (!m) return null;
158 return [parseInt(m[1], 10), parseInt(m[2], 10), parseInt(m[3], 10)];
159 }
160
161 /**
162 * @param {[number, number, number]} a
163 * @param {[number, number, number]} b
164 * @returns {number}
165 */
166 export function compareSemver(a, b) {
167 for (let i = 0; i < 3; i += 1) {
168 if (a[i] !== b[i]) return a[i] - b[i];
169 }
170 return 0;
171 }
172
173 /**
174 * @param {unknown} scope
175 * @returns {scope is FlowScope}
176 */
177 function isFlowScope(scope) {
178 return scope === 'personal' || scope === 'project' || scope === 'org';
179 }
180
181 /**
182 * Validate a starter bundle against FLOW-V0-SPEC §1 anatomy rules.
183 *
184 * @param {{ flow?: unknown, steps?: unknown }} bundle
185 * @returns {{ ok: true, flow: StoredFlow, steps: StoredFlowStep[] } | { ok: false, reason: string }}
186 */
187 export function validateFlowBundle(bundle) {
188 if (!bundle || typeof bundle !== 'object') {
189 return { ok: false, reason: 'bundle must be an object' };
190 }
191 const flow = /** @type {Record<string, unknown>} */ (bundle.flow);
192 const stepsRaw = bundle.steps;
193 if (!flow || typeof flow !== 'object') {
194 return { ok: false, reason: 'bundle.flow is required' };
195 }
196 if (!Array.isArray(stepsRaw) || stepsRaw.length === 0) {
197 return { ok: false, reason: 'bundle.steps must be a non-empty array' };
198 }
199
200 const flowId = flow.flow_id;
201 if (typeof flowId !== 'string' || !FLOW_ID_RE.test(flowId)) {
202 return { ok: false, reason: 'invalid flow_id' };
203 }
204 if (flow.schema !== 'knowtation.flow/v0') {
205 return { ok: false, reason: 'flow.schema must be knowtation.flow/v0' };
206 }
207 if (typeof flow.title !== 'string' || !flow.title.trim()) {
208 return { ok: false, reason: 'flow.title is required' };
209 }
210 if (typeof flow.version !== 'string' || !SEMVER_RE.test(flow.version)) {
211 return { ok: false, reason: 'flow.version must be semver' };
212 }
213 if (!isFlowScope(flow.scope)) {
214 return { ok: false, reason: 'flow.scope must be personal|project|org' };
215 }
216 if (typeof flow.summary !== 'string') {
217 return { ok: false, reason: 'flow.summary is required' };
218 }
219 if (!Array.isArray(flow.steps) || flow.steps.length === 0) {
220 return { ok: false, reason: 'flow.steps must be a non-empty array' };
221 }
222 if (typeof flow.updated !== 'string' || !flow.updated.trim()) {
223 return { ok: false, reason: 'flow.updated is required' };
224 }
225 if (typeof flow.truncated !== 'boolean') {
226 return { ok: false, reason: 'flow.truncated must be boolean' };
227 }
228
229 /** @type {StoredFlowStep[]} */
230 const steps = [];
231 const stepIds = new Set();
232
233 for (const raw of stepsRaw) {
234 if (!raw || typeof raw !== 'object') {
235 return { ok: false, reason: 'each step must be an object' };
236 }
237 const step = /** @type {Record<string, unknown>} */ (raw);
238 if (step.schema !== 'knowtation.flow_step/v0') {
239 return { ok: false, reason: 'step.schema must be knowtation.flow_step/v0' };
240 }
241 if (typeof step.step_id !== 'string' || !FLOW_STEP_ID_RE.test(step.step_id)) {
242 return { ok: false, reason: 'invalid step_id' };
243 }
244 if (step.flow_id !== flowId) {
245 return { ok: false, reason: 'step.flow_id must match flow.flow_id' };
246 }
247 if (typeof step.ordinal !== 'number' || !Number.isInteger(step.ordinal) || step.ordinal < 1) {
248 return { ok: false, reason: 'step.ordinal must be a 1-based integer' };
249 }
250 if (buildFlowStepId(flowId, step.ordinal) !== step.step_id) {
251 return { ok: false, reason: 'step_id must equal flow_id#ordinal' };
252 }
253 if (typeof step.owned_job !== 'string' || !step.owned_job.trim()) {
254 return { ok: false, reason: 'step.owned_job is required' };
255 }
256 if (typeof step.instruction !== 'string' || !step.instruction.trim()) {
257 return { ok: false, reason: 'step.instruction is required' };
258 }
259 if (typeof step.trigger !== 'string' || !step.trigger.trim()) {
260 return { ok: false, reason: 'step.trigger is required (anatomy completeness)' };
261 }
262 if (typeof step.when_not_to_run !== 'string' || !step.when_not_to_run.trim()) {
263 return { ok: false, reason: 'step.when_not_to_run is required (anatomy completeness)' };
264 }
265 if (!Array.isArray(step.boundaries)) {
266 return { ok: false, reason: 'step.boundaries must be an array' };
267 }
268 if (typeof step.output_shape !== 'string' || !step.output_shape.trim()) {
269 return { ok: false, reason: 'step.output_shape is required (anatomy completeness)' };
270 }
271 const verification = step.verification;
272 if (!verification || typeof verification !== 'object') {
273 return { ok: false, reason: 'step.verification is required (anatomy completeness)' };
274 }
275 const ver = /** @type {Record<string, unknown>} */ (verification);
276 if (typeof ver.kind !== 'string' || !ver.kind.trim()) {
277 return { ok: false, reason: 'step.verification.kind is required' };
278 }
279 if (typeof ver.evidence_required !== 'boolean') {
280 return { ok: false, reason: 'step.verification.evidence_required must be boolean' };
281 }
282 if (typeof ver.description !== 'string' || !ver.description.trim()) {
283 return { ok: false, reason: 'step.verification.description is required' };
284 }
285 if (step.automatable !== 'manual' && step.automatable !== 'agent_assisted' && step.automatable !== 'automatable') {
286 return { ok: false, reason: 'step.automatable must be manual|agent_assisted|automatable' };
287 }
288 if (stepIds.has(step.step_id)) {
289 return { ok: false, reason: 'duplicate step_id' };
290 }
291 stepIds.add(step.step_id);
292 steps.push(/** @type {StoredFlowStep} */ (step));
293 }
294
295 for (const ref of flow.steps) {
296 if (typeof ref !== 'string' || !stepIds.has(ref)) {
297 return { ok: false, reason: 'flow.steps references missing step_id' };
298 }
299 }
300
301 const orderedStepIds = [...steps].sort((a, b) => a.ordinal - b.ordinal).map((s) => s.step_id);
302 if (JSON.stringify(flow.steps) !== JSON.stringify(orderedStepIds)) {
303 return { ok: false, reason: 'flow.steps must list step ids in ascending ordinal order' };
304 }
305
306 /** @type {StoredFlow} */
307 const storedFlow = {
308 schema: 'knowtation.flow/v0',
309 flow_id: flowId,
310 title: flow.title,
311 version: flow.version,
312 scope: flow.scope,
313 summary: flow.summary,
314 tags: Array.isArray(flow.tags) ? flow.tags.filter((t) => typeof t === 'string') : [],
315 steps: flow.steps,
316 inputs: Array.isArray(flow.inputs) ? flow.inputs : [],
317 vault_mirror_path: typeof flow.vault_mirror_path === 'string' ? flow.vault_mirror_path : null,
318 updated: flow.updated,
319 truncated: flow.truncated,
320 };
321
322 return { ok: true, flow: storedFlow, steps };
323 }
324
325 /**
326 * Idempotently seed canonical starter flows from flows/starter/.
327 *
328 * @param {string} dataDir
329 * @param {string} vaultId
330 * @param {{ starterDir?: string, onReject?: (name: string, reason: string) => void }} [options]
331 * @returns {{ seeded: number, skipped: number }}
332 */
333 export function seedStarterFlows(dataDir, vaultId, options = {}) {
334 const starterDir = options.starterDir ?? path.join(getRepoRoot(), STARTER_FLOWS_DIRNAME);
335 const onReject = options.onReject ?? ((name, reason) => {
336 console.warn(`[flow-store] rejected starter bundle ${name}: ${reason}`);
337 });
338
339 if (!fs.existsSync(starterDir)) {
340 return { seeded: 0, skipped: 0 };
341 }
342
343 const store = loadFlowStore(dataDir);
344 if (!store.vaults[vaultId]) {
345 store.vaults[vaultId] = {
346 flows: [],
347 steps: [],
348 runs: [],
349 candidates: [],
350 projections: [],
351 };
352 }
353 const vault = store.vaults[vaultId];
354
355 let seeded = 0;
356 let skipped = 0;
357
358 const files = fs.readdirSync(starterDir).filter((f) => f.startsWith('flow_') && f.endsWith('.json')).sort();
359 for (const file of files) {
360 let bundle;
361 try {
362 bundle = JSON.parse(fs.readFileSync(path.join(starterDir, file), 'utf8'));
363 } catch {
364 onReject(file, 'invalid JSON');
365 continue;
366 }
367
368 const validated = validateFlowBundle(bundle);
369 if (!validated.ok) {
370 onReject(file, validated.reason);
371 continue;
372 }
373
374 const { flow, steps } = validated;
375 const exists = vault.flows.some((f) => f.flow_id === flow.flow_id && f.version === flow.version);
376 if (exists) {
377 skipped += 1;
378 continue;
379 }
380
381 vault.flows.push(flow);
382 for (const step of steps) {
383 const stepExists = vault.steps.some((s) => s.step_id === step.step_id);
384 if (!stepExists) {
385 vault.steps.push(step);
386 }
387 }
388 seeded += 1;
389 }
390
391 if (seeded > 0) {
392 saveFlowStore(dataDir, store);
393 }
394
395 return { seeded, skipped };
396 }
397
398 /**
399 * Lazy seed when vault has no flows.
400 *
401 * @param {string} dataDir
402 * @param {string} vaultId
403 * @param {{ starterDir?: string }} [options]
404 */
405 function ensureStarterSeed(dataDir, vaultId, options = {}) {
406 const store = loadFlowStore(dataDir);
407 const vault = store.vaults[vaultId];
408 if (vault && vault.flows.length > 0) return;
409 seedStarterFlows(dataDir, vaultId, options);
410 }
411
412 /**
413 * @param {StoredFlow} flow
414 * @param {number} stepCount
415 * @returns {object}
416 */
417 export function flowSummaryForClient(flow, stepCount) {
418 return {
419 schema: 'knowtation.flow/v0',
420 flow_id: flow.flow_id,
421 title: flow.title,
422 version: flow.version,
423 scope: flow.scope,
424 summary: flow.summary,
425 tags: flow.tags ?? [],
426 step_count: stepCount,
427 updated: flow.updated,
428 truncated: flow.truncated,
429 };
430 }
431
432 /**
433 * @param {StoredFlow} flow
434 * @param {StoredFlowStep[]} steps
435 * @returns {{ flow: object, steps: object[] }}
436 */
437 export function flowDefinitionForClient(flow, steps) {
438 return {
439 flow: {
440 schema: flow.schema,
441 flow_id: flow.flow_id,
442 title: flow.title,
443 version: flow.version,
444 scope: flow.scope,
445 summary: flow.summary,
446 tags: flow.tags ?? [],
447 steps: flow.steps,
448 inputs: flow.inputs ?? [],
449 vault_mirror_path: flow.vault_mirror_path ?? null,
450 updated: flow.updated,
451 truncated: flow.truncated,
452 },
453 steps: steps.map((step) => ({
454 schema: step.schema,
455 step_id: step.step_id,
456 flow_id: step.flow_id,
457 ordinal: step.ordinal,
458 owned_job: step.owned_job,
459 instruction: step.instruction,
460 trigger: step.trigger,
461 when_not_to_run: step.when_not_to_run,
462 requires: step.requires ?? [],
463 boundaries: step.boundaries,
464 skill_refs: step.skill_refs ?? [],
465 inputs: step.inputs ?? [],
466 outputs: step.outputs ?? [],
467 output_shape: step.output_shape,
468 verification: step.verification,
469 automatable: step.automatable,
470 })),
471 };
472 }
473
474 /**
475 * @param {VaultFlowStore} vault
476 * @param {string} flowId
477 * @returns {number}
478 */
479 function countStepsForFlow(vault, flowId) {
480 return vault.steps.filter((s) => s.flow_id === flowId).length;
481 }
482
483 /**
484 * List scope-visible flows (content-minimized).
485 *
486 * @param {string} dataDir
487 * @param {string} vaultId
488 * @param {{
489 * visibleScopes?: Set<FlowScope>,
490 * filterScopes?: Set<FlowScope>,
491 * effectiveScope: FlowScope,
492 * tag?: string,
493 * limit?: number,
494 * starterDir?: string,
495 * }} query
496 * @returns {{ schema: 'knowtation.flow_list/v0', vault_id: string, effective_scope: FlowScope, flows: object[], truncated: boolean }}
497 */
498 export function listFlows(dataDir, vaultId, query) {
499 ensureStarterSeed(dataDir, vaultId, { starterDir: query.starterDir });
500
501 const visibleScopes = query.visibleScopes ?? query.filterScopes ?? new Set(['personal']);
502 const filterScopes = query.filterScopes ?? visibleScopes;
503 const tag = typeof query.tag === 'string' && query.tag.trim() ? query.tag.trim() : '';
504 let limit = typeof query.limit === 'number' ? query.limit : MAX_FLOW_SUMMARIES;
505 if (!Number.isInteger(limit) || limit < 1) limit = MAX_FLOW_SUMMARIES;
506 if (limit > MAX_FLOW_SUMMARIES) limit = MAX_FLOW_SUMMARIES;
507
508 const store = loadFlowStore(dataDir);
509 const vault = store.vaults[vaultId] ?? {
510 flows: [],
511 steps: [],
512 runs: [],
513 candidates: [],
514 projections: [],
515 };
516
517 /** @type {Map<string, StoredFlow>} */
518 const latestById = new Map();
519 for (const flow of vault.flows) {
520 if (!filterScopes.has(flow.scope)) continue;
521 if (tag && !(flow.tags ?? []).includes(tag)) continue;
522 const parsed = parseSemver(flow.version);
523 if (!parsed) continue;
524 const existing = latestById.get(flow.flow_id);
525 if (!existing) {
526 latestById.set(flow.flow_id, flow);
527 continue;
528 }
529 const existingParsed = parseSemver(existing.version);
530 if (existingParsed && compareSemver(parsed, existingParsed) > 0) {
531 latestById.set(flow.flow_id, flow);
532 }
533 }
534
535 let candidates = [...latestById.values()].sort((a, b) => {
536 const t = Date.parse(b.updated) - Date.parse(a.updated);
537 if (t !== 0) return t;
538 return a.flow_id.localeCompare(b.flow_id);
539 });
540
541 const totalMatching = candidates.length;
542 let truncated = totalMatching > limit;
543 if (candidates.length > limit) {
544 candidates = candidates.slice(0, limit);
545 }
546
547 const flows = candidates.map((flow) => flowSummaryForClient(flow, countStepsForFlow(vault, flow.flow_id)));
548
549 return {
550 schema: 'knowtation.flow_list/v0',
551 vault_id: vaultId,
552 effective_scope: query.effectiveScope,
553 flows,
554 truncated,
555 };
556 }
557
558 /**
559 * Resolve the latest stored version of a flow **regardless of reader scope**.
560 *
561 * Used by the authoring write-back path (approve→apply reconcile and the
562 * propose-time concurrency precheck), where the server compares against the
563 * actual canonical state, not a reader-filtered projection.
564 *
565 * @param {VaultFlowStore} vault
566 * @param {string} flowId
567 * @returns {{ flow: StoredFlow, steps: StoredFlowStep[] } | null}
568 */
569 export function latestStoredFlow(vault, flowId) {
570 if (!vault) return null;
571 const matching = vault.flows.filter((f) => f.flow_id === flowId);
572 if (matching.length === 0) return null;
573 let flow = matching[0];
574 for (const candidate of matching) {
575 const a = parseSemver(candidate.version);
576 const b = parseSemver(flow.version);
577 if (a && b && compareSemver(a, b) > 0) flow = candidate;
578 }
579 const steps = vault.steps
580 .filter((s) => s.flow_id === flowId)
581 .sort((a, b) => a.ordinal - b.ordinal);
582 return { flow, steps };
583 }
584
585 /**
586 * Reconcile a validated bundle into the Flow index as a **new (flow_id, version)
587 * row** (Phase 7A-L1b; the only index write besides seed).
588 *
589 * Carry-forward constraint (FLOW-AUTHORING-WRITEBACK-CONTRACT-7A-L1 §4): an edit
590 * is reconciled as a new version record — an existing version row is never
591 * mutated in place. The flow row is upserted by `(flow_id, version)` so prior
592 * versions stay pinnable. Because the 7A-10b store keys step bodies by `step_id`
593 * (= `flow_id#ordinal`) only — not `(step_id, version)` — step bodies for a
594 * `flow_id` are shared across versions; the new version's steps replace that
595 * shared set (a versioned-step-keying slice is recommended at 7A-10c). Writes
596 * atomically (tmp + rename) so a failed reconcile leaves zero partial state.
597 *
598 * @param {string} dataDir
599 * @param {string} vaultId
600 * @param {StoredFlow} flow - validated flow record (from `validateFlowBundle`).
601 * @param {StoredFlowStep[]} steps - validated ordered steps.
602 * @returns {{ created: boolean, version: string }}
603 */
604 export function upsertFlowVersion(dataDir, vaultId, flow, steps) {
605 const store = loadFlowStore(dataDir);
606 if (!store.vaults[vaultId]) {
607 store.vaults[vaultId] = { flows: [], steps: [], runs: [], candidates: [], projections: [] };
608 }
609 const vault = store.vaults[vaultId];
610
611 const idx = vault.flows.findIndex((f) => f.flow_id === flow.flow_id && f.version === flow.version);
612 const created = idx === -1;
613 if (created) {
614 vault.flows.push(flow);
615 } else {
616 vault.flows[idx] = flow;
617 }
618
619 vault.steps = vault.steps.filter((s) => s.flow_id !== flow.flow_id);
620 for (const step of steps) {
621 vault.steps.push(step);
622 }
623
624 saveFlowStore(dataDir, store);
625 return { created, version: flow.version };
626 }
627
628 /**
629 * Get one flow definition + ordered steps, or null when missing/invisible.
630 *
631 * @param {string} dataDir
632 * @param {string} vaultId
633 * @param {string} flowId
634 * @param {{
635 * visibleScopes?: Set<FlowScope>,
636 * filterScopes?: Set<FlowScope>,
637 * version?: string,
638 * starterDir?: string,
639 * }} query
640 * @returns {{ schema: 'knowtation.flow_get/v0', vault_id: string, flow: object, steps: object[] } | null}
641 */
642 export function getFlow(dataDir, vaultId, flowId, query) {
643 if (!FLOW_ID_RE.test(flowId)) {
644 return null;
645 }
646
647 ensureStarterSeed(dataDir, vaultId, { starterDir: query.starterDir });
648
649 const filterScopes = query.filterScopes ?? query.visibleScopes ?? new Set(['personal']);
650 const pinnedVersion = typeof query.version === 'string' && query.version.trim() ? query.version.trim() : '';
651
652 if (pinnedVersion && !SEMVER_RE.test(pinnedVersion)) {
653 return null;
654 }
655
656 const store = loadFlowStore(dataDir);
657 const vault = store.vaults[vaultId];
658 if (!vault) return null;
659
660 const matching = vault.flows.filter((f) => {
661 if (f.flow_id !== flowId) return false;
662 if (!filterScopes.has(f.scope)) return false;
663 if (pinnedVersion) return f.version === pinnedVersion;
664 return true;
665 });
666
667 if (matching.length === 0) return null;
668
669 let flow = matching[0];
670 if (!pinnedVersion) {
671 for (const candidate of matching) {
672 const a = parseSemver(candidate.version);
673 const b = parseSemver(flow.version);
674 if (a && b && compareSemver(a, b) > 0) {
675 flow = candidate;
676 }
677 }
678 }
679
680 let steps = vault.steps
681 .filter((s) => s.flow_id === flowId)
682 .sort((a, b) => a.ordinal - b.ordinal);
683
684 let truncated = false;
685 if (steps.length > MAX_STEPS_PER_FLOW) {
686 steps = steps.slice(0, MAX_STEPS_PER_FLOW);
687 truncated = true;
688 }
689
690 const client = flowDefinitionForClient(
691 truncated ? { ...flow, truncated: true } : flow,
692 steps,
693 );
694
695 return {
696 schema: 'knowtation.flow_get/v0',
697 vault_id: vaultId,
698 flow: client.flow,
699 steps: client.steps,
700 };
701 }
702
703 /**
704 * Upsert a `knowtation.flow_candidate/v0` record (latest row wins by candidate_id).
705 *
706 * @param {string} dataDir
707 * @param {string} vaultId
708 * @param {object} candidate
709 * @returns {object}
710 */
711 export function upsertCandidate(dataDir, vaultId, candidate) {
712 const store = loadFlowStore(dataDir);
713 if (!store.vaults[vaultId]) {
714 store.vaults[vaultId] = { flows: [], steps: [], runs: [], candidates: [], projections: [] };
715 }
716 const vault = store.vaults[vaultId];
717 const idx = vault.candidates.findIndex((c) => c.candidate_id === candidate.candidate_id);
718 const row = { ...candidate, updated: candidate.updated ?? new Date().toISOString() };
719 if (idx === -1) {
720 vault.candidates.push(row);
721 } else {
722 vault.candidates[idx] = row;
723 }
724 saveFlowStore(dataDir, store);
725 return row;
726 }
727
728 /**
729 * Get one candidate when readable in caller scope, or null (no existence leak).
730 *
731 * @param {string} dataDir
732 * @param {string} vaultId
733 * @param {string} candidateId
734 * @param {Set<import('./flow-scope.mjs').FlowScope>} visibleScopes
735 * @returns {object|null}
736 */
737 export function getCandidate(dataDir, vaultId, candidateId, visibleScopes) {
738 if (!FLOW_CANDIDATE_ID_RE.test(candidateId)) return null;
739 const store = loadFlowStore(dataDir);
740 const vault = store.vaults[vaultId];
741 if (!vault) return null;
742 const row = vault.candidates.find((c) => c.candidate_id === candidateId);
743 if (!row) return null;
744 if (!visibleScopes.has(row.scope_hint)) return null;
745 return row;
746 }
747
748 /**
749 * List candidates in a vault (content-minimized rows).
750 *
751 * @param {string} dataDir
752 * @param {string} vaultId
753 * @param {{ limit?: number, statusFilter?: string }} [query]
754 * @returns {{ candidates: object[], truncated: boolean }}
755 */
756 export function listCandidatesInVault(dataDir, vaultId, query = {}) {
757 let limit = typeof query.limit === 'number' ? query.limit : 50;
758 if (!Number.isInteger(limit) || limit < 1) limit = 50;
759 if (limit > 50) limit = 50;
760
761 const store = loadFlowStore(dataDir);
762 const vault = store.vaults[vaultId] ?? { candidates: [] };
763 let rows = [...(vault.candidates ?? [])];
764 if (query.statusFilter) {
765 rows = rows.filter((c) => c.status === query.statusFilter);
766 }
767 rows.sort((a, b) => Date.parse(b.updated ?? 0) - Date.parse(a.updated ?? 0));
768 const truncated = rows.length > limit;
769 if (rows.length > limit) rows = rows.slice(0, limit);
770 return { candidates: rows, truncated };
771 }
772
773 /**
774 * Update candidate terminal/non-terminal status.
775 *
776 * @param {string} dataDir
777 * @param {string} vaultId
778 * @param {string} candidateId
779 * @param {string} status
780 * @returns {object|null}
781 */
782 export function updateCandidateStatus(dataDir, vaultId, candidateId, status) {
783 const store = loadFlowStore(dataDir);
784 const vault = store.vaults[vaultId];
785 if (!vault) return null;
786 const idx = vault.candidates.findIndex((c) => c.candidate_id === candidateId);
787 if (idx === -1) return null;
788 const prev = vault.candidates[idx].status;
789 if (prev !== 'pending_review') return null;
790 vault.candidates[idx] = {
791 ...vault.candidates[idx],
792 status,
793 updated: new Date().toISOString(),
794 };
795 saveFlowStore(dataDir, store);
796 return vault.candidates[idx];
797 }
File History 1 commit
sha256:8915fe406161f95c1681f9469375e7bae5b28c884f00bedbdef65e4b0cd0738d docs(flow): commit FLOW-V0-SPEC.md hygiene for 7A-INT merge Human 13 hours ago