flow-store.mjs
870 lines 27.3 KB
Raw
sha256:cfe8c8cf68336f6d46318bd40610c18d9ff7df231df2fb190af1f5a9c4f4f93b fix(flow-store): versioned step keying for multi-version fl… Human minor ⚠ breaking 7 hours ago
1 /**
2 * Local file-backed Flow store (Flow v0 — Phase 7A-10b, Option A calendar parity).
3 *
4 * Persists flow definitions, steps, runs, candidates, and projections per vault under
5 * data_dir. Read-only list/get in v0; idempotent starter seed on first read.
6 *
7 * @see docs/FLOW-STORE-CONTRACT-7A-10.md
8 * @see docs/FLOW-V0-SPEC.md
9 */
10
11 import fs from 'fs';
12 import path from 'path';
13 import { randomUUID } from 'crypto';
14 import { getRepoRoot } from '../repo-root.mjs';
15
16 export const FLOW_STORE_FILENAME = 'hub_flow_store.json';
17 export const STARTER_FLOWS_DIRNAME = 'flows/starter';
18 export const MAX_FLOW_SUMMARIES = 200;
19 export const MAX_STEPS_PER_FLOW = 100;
20
21 export const FLOW_ID_RE = /^flow_[a-z0-9_]{1,64}$/;
22 export const FLOW_STEP_ID_RE = /^flow_[a-z0-9_]{1,64}#[1-9][0-9]*$/;
23 export const FLOW_RUN_ID_RE = /^run_[a-z0-9_]{1,48}$/;
24 export const FLOW_CANDIDATE_ID_RE = /^cand_[a-z0-9]{4,32}$/;
25 export const SEMVER_RE = /^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)$/;
26
27 /** @typedef {'personal'|'project'|'org'} FlowScope */
28
29 /**
30 * @typedef {Object} StoredFlow
31 * @property {'knowtation.flow/v0'} schema
32 * @property {string} flow_id
33 * @property {string} title
34 * @property {string} version
35 * @property {FlowScope} scope
36 * @property {string} summary
37 * @property {string[]} [tags]
38 * @property {string[]} steps
39 * @property {{ name: string, type: string, required: boolean }[]} [inputs]
40 * @property {string|null} [vault_mirror_path]
41 * @property {string} updated
42 * @property {boolean} truncated
43 */
44
45 /**
46 * @typedef {Object} StoredFlowStep
47 * @property {'knowtation.flow_step/v0'} schema
48 * @property {string} step_id
49 * @property {string} flow_id
50 * @property {string} [flow_version] - store-internal parent semver (7A-10c); omitted on wire
51 * @property {number} ordinal
52 * @property {string} owned_job
53 * @property {string} instruction
54 * @property {string} trigger
55 * @property {string} when_not_to_run
56 * @property {{ kind: string, id: string }[]} [requires]
57 * @property {string[]} boundaries
58 * @property {{ kind: string, id: string }[]} [skill_refs]
59 * @property {{ name: string, from: string }[]} [inputs]
60 * @property {{ name: string, type: string }[]} [outputs]
61 * @property {string} output_shape
62 * @property {{ kind: string, evidence_required: boolean, description: string }} verification
63 * @property {'manual'|'agent_assisted'|'automatable'} automatable
64 */
65
66 /**
67 * @typedef {Object} VaultFlowStore
68 * @property {StoredFlow[]} flows
69 * @property {StoredFlowStep[]} steps
70 * @property {object[]} runs
71 * @property {object[]} candidates
72 * @property {object[]} projections
73 */
74
75 /**
76 * @typedef {Object} FlowStoreFile
77 * @property {Record<string, VaultFlowStore>} vaults
78 */
79
80 /**
81 * @param {string} dataDir
82 * @returns {string}
83 */
84 export function getFlowStorePath(dataDir) {
85 return path.join(dataDir, FLOW_STORE_FILENAME);
86 }
87
88 /**
89 * @param {string} dataDir
90 * @returns {FlowStoreFile}
91 */
92 export function loadFlowStore(dataDir) {
93 const filePath = getFlowStorePath(dataDir);
94 if (!fs.existsSync(filePath)) {
95 return { vaults: {} };
96 }
97 try {
98 const raw = fs.readFileSync(filePath, 'utf8');
99 const parsed = JSON.parse(raw);
100 if (!parsed || typeof parsed !== 'object' || !parsed.vaults || typeof parsed.vaults !== 'object') {
101 return { vaults: {} };
102 }
103 return /** @type {FlowStoreFile} */ (parsed);
104 } catch {
105 return { vaults: {} };
106 }
107 }
108
109 /**
110 * @param {string} dataDir
111 * @param {FlowStoreFile} store
112 */
113 export function saveFlowStore(dataDir, store) {
114 const filePath = getFlowStorePath(dataDir);
115 const dir = path.dirname(filePath);
116 if (!fs.existsSync(dir)) {
117 fs.mkdirSync(dir, { recursive: true });
118 }
119 const tmp = `${filePath}.${process.pid}.${randomUUID()}.tmp`;
120 fs.writeFileSync(tmp, JSON.stringify(store, null, 2), 'utf8');
121 fs.renameSync(tmp, filePath);
122 }
123
124 /**
125 * @param {string} dataDir
126 * @param {string} vaultId
127 * @returns {VaultFlowStore}
128 */
129 export function getVaultFlowStore(dataDir, vaultId) {
130 const store = loadFlowStore(dataDir);
131 if (!store.vaults[vaultId]) {
132 store.vaults[vaultId] = {
133 flows: [],
134 steps: [],
135 runs: [],
136 candidates: [],
137 projections: [],
138 };
139 }
140 return store.vaults[vaultId];
141 }
142
143 /**
144 * @param {string} flowId
145 * @param {number} ordinal
146 * @returns {string}
147 */
148 export function buildFlowStepId(flowId, ordinal) {
149 return `${flowId}#${ordinal}`;
150 }
151
152 /**
153 * @param {string} version
154 * @returns {[number, number, number]|null}
155 */
156 export function parseSemver(version) {
157 const m = SEMVER_RE.exec(version);
158 if (!m) return null;
159 return [parseInt(m[1], 10), parseInt(m[2], 10), parseInt(m[3], 10)];
160 }
161
162 /**
163 * @param {[number, number, number]} a
164 * @param {[number, number, number]} b
165 * @returns {number}
166 */
167 export function compareSemver(a, b) {
168 for (let i = 0; i < 3; i += 1) {
169 if (a[i] !== b[i]) return a[i] - b[i];
170 }
171 return 0;
172 }
173
174 /**
175 * Stamp store-internal `flow_version` on validated steps before persistence.
176 *
177 * @param {StoredFlowStep[]} steps
178 * @param {string} version
179 * @returns {StoredFlowStep[]}
180 */
181 export function stampStepsForStore(steps, version) {
182 return steps.map((step) => ({ ...step, flow_version: version }));
183 }
184
185 /**
186 * Migrate legacy 7A-10b step rows (no `flow_version`) into one row per
187 * `(flow_id, version, step_id)` so prior stores remain readable.
188 *
189 * @param {VaultFlowStore} vault
190 */
191 export function normalizeVaultSteps(vault) {
192 if (!vault || !Array.isArray(vault.steps)) return;
193 const legacy = vault.steps.filter((s) => !s.flow_version);
194 if (legacy.length === 0) return;
195
196 const kept = vault.steps.filter((s) => s.flow_version);
197 /** @type {StoredFlowStep[]} */
198 const migrated = [];
199
200 for (const step of legacy) {
201 const flowRows = vault.flows.filter(
202 (f) => f.flow_id === step.flow_id && (f.steps ?? []).includes(step.step_id),
203 );
204 if (flowRows.length === 0) {
205 const sole = vault.flows.find((f) => f.flow_id === step.flow_id);
206 if (sole) migrated.push({ ...step, flow_version: sole.version });
207 continue;
208 }
209 for (const flow of flowRows) {
210 migrated.push({ ...step, flow_version: flow.version });
211 }
212 }
213
214 const seen = new Set();
215 /** @type {StoredFlowStep[]} */
216 const deduped = [];
217 for (const step of [...kept, ...migrated]) {
218 const key = `${step.flow_id}\0${step.flow_version ?? ''}\0${step.step_id}`;
219 if (seen.has(key)) continue;
220 seen.add(key);
221 deduped.push(step);
222 }
223 vault.steps = deduped;
224 }
225
226 /**
227 * Return ordered steps for one `(flow_id, version)` pair (7A-10c).
228 *
229 * @param {VaultFlowStore} vault
230 * @param {string} flowId
231 * @param {string} version
232 * @returns {StoredFlowStep[]}
233 */
234 export function stepsForFlowVersion(vault, flowId, version) {
235 normalizeVaultSteps(vault);
236 const versionsForFlow = vault.flows.filter((f) => f.flow_id === flowId).map((f) => f.version);
237 const legacySingleVersion = versionsForFlow.length === 1 && versionsForFlow[0] === version;
238 return vault.steps
239 .filter((s) => {
240 if (s.flow_id !== flowId) return false;
241 if (s.flow_version === version) return true;
242 return !s.flow_version && legacySingleVersion;
243 })
244 .sort((a, b) => a.ordinal - b.ordinal);
245 }
246
247 /**
248 * @param {unknown} scope
249 * @returns {scope is FlowScope}
250 */
251 function isFlowScope(scope) {
252 return scope === 'personal' || scope === 'project' || scope === 'org';
253 }
254
255 /**
256 * Validate a starter bundle against FLOW-V0-SPEC §1 anatomy rules.
257 *
258 * @param {{ flow?: unknown, steps?: unknown }} bundle
259 * @returns {{ ok: true, flow: StoredFlow, steps: StoredFlowStep[] } | { ok: false, reason: string }}
260 */
261 export function validateFlowBundle(bundle) {
262 if (!bundle || typeof bundle !== 'object') {
263 return { ok: false, reason: 'bundle must be an object' };
264 }
265 const flow = /** @type {Record<string, unknown>} */ (bundle.flow);
266 const stepsRaw = bundle.steps;
267 if (!flow || typeof flow !== 'object') {
268 return { ok: false, reason: 'bundle.flow is required' };
269 }
270 if (!Array.isArray(stepsRaw) || stepsRaw.length === 0) {
271 return { ok: false, reason: 'bundle.steps must be a non-empty array' };
272 }
273
274 const flowId = flow.flow_id;
275 if (typeof flowId !== 'string' || !FLOW_ID_RE.test(flowId)) {
276 return { ok: false, reason: 'invalid flow_id' };
277 }
278 if (flow.schema !== 'knowtation.flow/v0') {
279 return { ok: false, reason: 'flow.schema must be knowtation.flow/v0' };
280 }
281 if (typeof flow.title !== 'string' || !flow.title.trim()) {
282 return { ok: false, reason: 'flow.title is required' };
283 }
284 if (typeof flow.version !== 'string' || !SEMVER_RE.test(flow.version)) {
285 return { ok: false, reason: 'flow.version must be semver' };
286 }
287 if (!isFlowScope(flow.scope)) {
288 return { ok: false, reason: 'flow.scope must be personal|project|org' };
289 }
290 if (typeof flow.summary !== 'string') {
291 return { ok: false, reason: 'flow.summary is required' };
292 }
293 if (!Array.isArray(flow.steps) || flow.steps.length === 0) {
294 return { ok: false, reason: 'flow.steps must be a non-empty array' };
295 }
296 if (typeof flow.updated !== 'string' || !flow.updated.trim()) {
297 return { ok: false, reason: 'flow.updated is required' };
298 }
299 if (typeof flow.truncated !== 'boolean') {
300 return { ok: false, reason: 'flow.truncated must be boolean' };
301 }
302
303 /** @type {StoredFlowStep[]} */
304 const steps = [];
305 const stepIds = new Set();
306
307 for (const raw of stepsRaw) {
308 if (!raw || typeof raw !== 'object') {
309 return { ok: false, reason: 'each step must be an object' };
310 }
311 const step = /** @type {Record<string, unknown>} */ (raw);
312 if (step.schema !== 'knowtation.flow_step/v0') {
313 return { ok: false, reason: 'step.schema must be knowtation.flow_step/v0' };
314 }
315 if (typeof step.step_id !== 'string' || !FLOW_STEP_ID_RE.test(step.step_id)) {
316 return { ok: false, reason: 'invalid step_id' };
317 }
318 if (step.flow_id !== flowId) {
319 return { ok: false, reason: 'step.flow_id must match flow.flow_id' };
320 }
321 if (typeof step.ordinal !== 'number' || !Number.isInteger(step.ordinal) || step.ordinal < 1) {
322 return { ok: false, reason: 'step.ordinal must be a 1-based integer' };
323 }
324 if (buildFlowStepId(flowId, step.ordinal) !== step.step_id) {
325 return { ok: false, reason: 'step_id must equal flow_id#ordinal' };
326 }
327 if (typeof step.owned_job !== 'string' || !step.owned_job.trim()) {
328 return { ok: false, reason: 'step.owned_job is required' };
329 }
330 if (typeof step.instruction !== 'string' || !step.instruction.trim()) {
331 return { ok: false, reason: 'step.instruction is required' };
332 }
333 if (typeof step.trigger !== 'string' || !step.trigger.trim()) {
334 return { ok: false, reason: 'step.trigger is required (anatomy completeness)' };
335 }
336 if (typeof step.when_not_to_run !== 'string' || !step.when_not_to_run.trim()) {
337 return { ok: false, reason: 'step.when_not_to_run is required (anatomy completeness)' };
338 }
339 if (!Array.isArray(step.boundaries)) {
340 return { ok: false, reason: 'step.boundaries must be an array' };
341 }
342 if (typeof step.output_shape !== 'string' || !step.output_shape.trim()) {
343 return { ok: false, reason: 'step.output_shape is required (anatomy completeness)' };
344 }
345 const verification = step.verification;
346 if (!verification || typeof verification !== 'object') {
347 return { ok: false, reason: 'step.verification is required (anatomy completeness)' };
348 }
349 const ver = /** @type {Record<string, unknown>} */ (verification);
350 if (typeof ver.kind !== 'string' || !ver.kind.trim()) {
351 return { ok: false, reason: 'step.verification.kind is required' };
352 }
353 if (typeof ver.evidence_required !== 'boolean') {
354 return { ok: false, reason: 'step.verification.evidence_required must be boolean' };
355 }
356 if (typeof ver.description !== 'string' || !ver.description.trim()) {
357 return { ok: false, reason: 'step.verification.description is required' };
358 }
359 if (step.automatable !== 'manual' && step.automatable !== 'agent_assisted' && step.automatable !== 'automatable') {
360 return { ok: false, reason: 'step.automatable must be manual|agent_assisted|automatable' };
361 }
362 if (stepIds.has(step.step_id)) {
363 return { ok: false, reason: 'duplicate step_id' };
364 }
365 stepIds.add(step.step_id);
366 steps.push(/** @type {StoredFlowStep} */ (step));
367 }
368
369 for (const ref of flow.steps) {
370 if (typeof ref !== 'string' || !stepIds.has(ref)) {
371 return { ok: false, reason: 'flow.steps references missing step_id' };
372 }
373 }
374
375 const orderedStepIds = [...steps].sort((a, b) => a.ordinal - b.ordinal).map((s) => s.step_id);
376 if (JSON.stringify(flow.steps) !== JSON.stringify(orderedStepIds)) {
377 return { ok: false, reason: 'flow.steps must list step ids in ascending ordinal order' };
378 }
379
380 /** @type {StoredFlow} */
381 const storedFlow = {
382 schema: 'knowtation.flow/v0',
383 flow_id: flowId,
384 title: flow.title,
385 version: flow.version,
386 scope: flow.scope,
387 summary: flow.summary,
388 tags: Array.isArray(flow.tags) ? flow.tags.filter((t) => typeof t === 'string') : [],
389 steps: flow.steps,
390 inputs: Array.isArray(flow.inputs) ? flow.inputs : [],
391 vault_mirror_path: typeof flow.vault_mirror_path === 'string' ? flow.vault_mirror_path : null,
392 updated: flow.updated,
393 truncated: flow.truncated,
394 };
395
396 return { ok: true, flow: storedFlow, steps };
397 }
398
399 /**
400 * Idempotently seed canonical starter flows from flows/starter/.
401 *
402 * @param {string} dataDir
403 * @param {string} vaultId
404 * @param {{ starterDir?: string, onReject?: (name: string, reason: string) => void }} [options]
405 * @returns {{ seeded: number, skipped: number }}
406 */
407 export function seedStarterFlows(dataDir, vaultId, options = {}) {
408 const starterDir = options.starterDir ?? path.join(getRepoRoot(), STARTER_FLOWS_DIRNAME);
409 const onReject = options.onReject ?? ((name, reason) => {
410 console.warn(`[flow-store] rejected starter bundle ${name}: ${reason}`);
411 });
412
413 if (!fs.existsSync(starterDir)) {
414 return { seeded: 0, skipped: 0 };
415 }
416
417 const store = loadFlowStore(dataDir);
418 if (!store.vaults[vaultId]) {
419 store.vaults[vaultId] = {
420 flows: [],
421 steps: [],
422 runs: [],
423 candidates: [],
424 projections: [],
425 };
426 }
427 const vault = store.vaults[vaultId];
428
429 let seeded = 0;
430 let skipped = 0;
431
432 const files = fs.readdirSync(starterDir).filter((f) => f.startsWith('flow_') && f.endsWith('.json')).sort();
433 for (const file of files) {
434 let bundle;
435 try {
436 bundle = JSON.parse(fs.readFileSync(path.join(starterDir, file), 'utf8'));
437 } catch {
438 onReject(file, 'invalid JSON');
439 continue;
440 }
441
442 const validated = validateFlowBundle(bundle);
443 if (!validated.ok) {
444 onReject(file, validated.reason);
445 continue;
446 }
447
448 const { flow, steps } = validated;
449 const exists = vault.flows.some((f) => f.flow_id === flow.flow_id && f.version === flow.version);
450 if (exists) {
451 skipped += 1;
452 continue;
453 }
454
455 vault.flows.push(flow);
456 for (const step of stampStepsForStore(steps, flow.version)) {
457 const stepExists = vault.steps.some(
458 (s) => s.flow_id === flow.flow_id && s.flow_version === flow.version && s.step_id === step.step_id,
459 );
460 if (!stepExists) {
461 vault.steps.push(step);
462 }
463 }
464 seeded += 1;
465 }
466
467 if (seeded > 0) {
468 saveFlowStore(dataDir, store);
469 }
470
471 return { seeded, skipped };
472 }
473
474 /**
475 * Lazy seed when vault has no flows.
476 *
477 * @param {string} dataDir
478 * @param {string} vaultId
479 * @param {{ starterDir?: string }} [options]
480 */
481 function ensureStarterSeed(dataDir, vaultId, options = {}) {
482 const store = loadFlowStore(dataDir);
483 const vault = store.vaults[vaultId];
484 if (vault && vault.flows.length > 0) return;
485 seedStarterFlows(dataDir, vaultId, options);
486 }
487
488 /**
489 * @param {StoredFlow} flow
490 * @param {number} stepCount
491 * @returns {object}
492 */
493 export function flowSummaryForClient(flow, stepCount) {
494 return {
495 schema: 'knowtation.flow/v0',
496 flow_id: flow.flow_id,
497 title: flow.title,
498 version: flow.version,
499 scope: flow.scope,
500 summary: flow.summary,
501 tags: flow.tags ?? [],
502 step_count: stepCount,
503 updated: flow.updated,
504 truncated: flow.truncated,
505 };
506 }
507
508 /**
509 * @param {StoredFlow} flow
510 * @param {StoredFlowStep[]} steps
511 * @returns {{ flow: object, steps: object[] }}
512 */
513 export function flowDefinitionForClient(flow, steps) {
514 return {
515 flow: {
516 schema: flow.schema,
517 flow_id: flow.flow_id,
518 title: flow.title,
519 version: flow.version,
520 scope: flow.scope,
521 summary: flow.summary,
522 tags: flow.tags ?? [],
523 steps: flow.steps,
524 inputs: flow.inputs ?? [],
525 vault_mirror_path: flow.vault_mirror_path ?? null,
526 updated: flow.updated,
527 truncated: flow.truncated,
528 },
529 steps: steps.map((step) => ({
530 schema: step.schema,
531 step_id: step.step_id,
532 flow_id: step.flow_id,
533 ordinal: step.ordinal,
534 owned_job: step.owned_job,
535 instruction: step.instruction,
536 trigger: step.trigger,
537 when_not_to_run: step.when_not_to_run,
538 requires: step.requires ?? [],
539 boundaries: step.boundaries,
540 skill_refs: step.skill_refs ?? [],
541 inputs: step.inputs ?? [],
542 outputs: step.outputs ?? [],
543 output_shape: step.output_shape,
544 verification: step.verification,
545 automatable: step.automatable,
546 })),
547 };
548 }
549
550 /**
551 * @param {VaultFlowStore} vault
552 * @param {string} flowId
553 * @param {string} version
554 * @returns {number}
555 */
556 function countStepsForFlow(vault, flowId, version) {
557 return stepsForFlowVersion(vault, flowId, version).length;
558 }
559
560 /**
561 * List scope-visible flows (content-minimized).
562 *
563 * @param {string} dataDir
564 * @param {string} vaultId
565 * @param {{
566 * visibleScopes?: Set<FlowScope>,
567 * filterScopes?: Set<FlowScope>,
568 * effectiveScope: FlowScope,
569 * tag?: string,
570 * limit?: number,
571 * starterDir?: string,
572 * }} query
573 * @returns {{ schema: 'knowtation.flow_list/v0', vault_id: string, effective_scope: FlowScope, flows: object[], truncated: boolean }}
574 */
575 export function listFlows(dataDir, vaultId, query) {
576 ensureStarterSeed(dataDir, vaultId, { starterDir: query.starterDir });
577
578 const visibleScopes = query.visibleScopes ?? query.filterScopes ?? new Set(['personal']);
579 const filterScopes = query.filterScopes ?? visibleScopes;
580 const tag = typeof query.tag === 'string' && query.tag.trim() ? query.tag.trim() : '';
581 let limit = typeof query.limit === 'number' ? query.limit : MAX_FLOW_SUMMARIES;
582 if (!Number.isInteger(limit) || limit < 1) limit = MAX_FLOW_SUMMARIES;
583 if (limit > MAX_FLOW_SUMMARIES) limit = MAX_FLOW_SUMMARIES;
584
585 const store = loadFlowStore(dataDir);
586 const vault = store.vaults[vaultId] ?? {
587 flows: [],
588 steps: [],
589 runs: [],
590 candidates: [],
591 projections: [],
592 };
593
594 /** @type {Map<string, StoredFlow>} */
595 const latestById = new Map();
596 for (const flow of vault.flows) {
597 if (!filterScopes.has(flow.scope)) continue;
598 if (tag && !(flow.tags ?? []).includes(tag)) continue;
599 const parsed = parseSemver(flow.version);
600 if (!parsed) continue;
601 const existing = latestById.get(flow.flow_id);
602 if (!existing) {
603 latestById.set(flow.flow_id, flow);
604 continue;
605 }
606 const existingParsed = parseSemver(existing.version);
607 if (existingParsed && compareSemver(parsed, existingParsed) > 0) {
608 latestById.set(flow.flow_id, flow);
609 }
610 }
611
612 let candidates = [...latestById.values()].sort((a, b) => {
613 const t = Date.parse(b.updated) - Date.parse(a.updated);
614 if (t !== 0) return t;
615 return a.flow_id.localeCompare(b.flow_id);
616 });
617
618 const totalMatching = candidates.length;
619 let truncated = totalMatching > limit;
620 if (candidates.length > limit) {
621 candidates = candidates.slice(0, limit);
622 }
623
624 const flows = candidates.map((flow) => flowSummaryForClient(flow, countStepsForFlow(vault, flow.flow_id, flow.version)));
625
626 return {
627 schema: 'knowtation.flow_list/v0',
628 vault_id: vaultId,
629 effective_scope: query.effectiveScope,
630 flows,
631 truncated,
632 };
633 }
634
635 /**
636 * Resolve the latest stored version of a flow **regardless of reader scope**.
637 *
638 * Used by the authoring write-back path (approve→apply reconcile and the
639 * propose-time concurrency precheck), where the server compares against the
640 * actual canonical state, not a reader-filtered projection.
641 *
642 * @param {VaultFlowStore} vault
643 * @param {string} flowId
644 * @returns {{ flow: StoredFlow, steps: StoredFlowStep[] } | null}
645 */
646 export function latestStoredFlow(vault, flowId) {
647 if (!vault) return null;
648 const matching = vault.flows.filter((f) => f.flow_id === flowId);
649 if (matching.length === 0) return null;
650 let flow = matching[0];
651 for (const candidate of matching) {
652 const a = parseSemver(candidate.version);
653 const b = parseSemver(flow.version);
654 if (a && b && compareSemver(a, b) > 0) flow = candidate;
655 }
656 const steps = stepsForFlowVersion(vault, flowId, flow.version);
657 return { flow, steps };
658 }
659
660 /**
661 * Reconcile a validated bundle into the Flow index as a **new (flow_id, version)
662 * row** (Phase 7A-L1b; the only index write besides seed).
663 *
664 * Carry-forward constraint (FLOW-AUTHORING-WRITEBACK-CONTRACT-7A-L1 §4): an edit
665 * is reconciled as a new version record — an existing version row is never
666 * mutated in place. The flow row is upserted by `(flow_id, version)` so prior
667 * versions stay pinnable. Step bodies are keyed by `(flow_id, flow_version,
668 * step_id)` (7A-10c) so divergent step text across versions is preserved.
669 * Writes atomically (tmp + rename) so a failed reconcile leaves zero partial state.
670 *
671 * @param {string} dataDir
672 * @param {string} vaultId
673 * @param {StoredFlow} flow - validated flow record (from `validateFlowBundle`).
674 * @param {StoredFlowStep[]} steps - validated ordered steps.
675 * @returns {{ created: boolean, version: string }}
676 */
677 export function upsertFlowVersion(dataDir, vaultId, flow, steps) {
678 const store = loadFlowStore(dataDir);
679 if (!store.vaults[vaultId]) {
680 store.vaults[vaultId] = { flows: [], steps: [], runs: [], candidates: [], projections: [] };
681 }
682 const vault = store.vaults[vaultId];
683
684 const idx = vault.flows.findIndex((f) => f.flow_id === flow.flow_id && f.version === flow.version);
685 const created = idx === -1;
686 if (created) {
687 vault.flows.push(flow);
688 } else {
689 vault.flows[idx] = flow;
690 }
691
692 vault.steps = vault.steps.filter(
693 (s) => !(s.flow_id === flow.flow_id && s.flow_version === flow.version),
694 );
695 for (const step of stampStepsForStore(steps, flow.version)) {
696 vault.steps.push(step);
697 }
698
699 saveFlowStore(dataDir, store);
700 return { created, version: flow.version };
701 }
702
703 /**
704 * Get one flow definition + ordered steps, or null when missing/invisible.
705 *
706 * @param {string} dataDir
707 * @param {string} vaultId
708 * @param {string} flowId
709 * @param {{
710 * visibleScopes?: Set<FlowScope>,
711 * filterScopes?: Set<FlowScope>,
712 * version?: string,
713 * starterDir?: string,
714 * }} query
715 * @returns {{ schema: 'knowtation.flow_get/v0', vault_id: string, flow: object, steps: object[] } | null}
716 */
717 export function getFlow(dataDir, vaultId, flowId, query) {
718 if (!FLOW_ID_RE.test(flowId)) {
719 return null;
720 }
721
722 ensureStarterSeed(dataDir, vaultId, { starterDir: query.starterDir });
723
724 const filterScopes = query.filterScopes ?? query.visibleScopes ?? new Set(['personal']);
725 const pinnedVersion = typeof query.version === 'string' && query.version.trim() ? query.version.trim() : '';
726
727 if (pinnedVersion && !SEMVER_RE.test(pinnedVersion)) {
728 return null;
729 }
730
731 const store = loadFlowStore(dataDir);
732 const vault = store.vaults[vaultId];
733 if (!vault) return null;
734
735 const matching = vault.flows.filter((f) => {
736 if (f.flow_id !== flowId) return false;
737 if (!filterScopes.has(f.scope)) return false;
738 if (pinnedVersion) return f.version === pinnedVersion;
739 return true;
740 });
741
742 if (matching.length === 0) return null;
743
744 let flow = matching[0];
745 if (!pinnedVersion) {
746 for (const candidate of matching) {
747 const a = parseSemver(candidate.version);
748 const b = parseSemver(flow.version);
749 if (a && b && compareSemver(a, b) > 0) {
750 flow = candidate;
751 }
752 }
753 }
754
755 let steps = stepsForFlowVersion(vault, flowId, flow.version);
756
757 let truncated = false;
758 if (steps.length > MAX_STEPS_PER_FLOW) {
759 steps = steps.slice(0, MAX_STEPS_PER_FLOW);
760 truncated = true;
761 }
762
763 const client = flowDefinitionForClient(
764 truncated ? { ...flow, truncated: true } : flow,
765 steps,
766 );
767
768 return {
769 schema: 'knowtation.flow_get/v0',
770 vault_id: vaultId,
771 flow: client.flow,
772 steps: client.steps,
773 };
774 }
775
776 /**
777 * Upsert a `knowtation.flow_candidate/v0` record (latest row wins by candidate_id).
778 *
779 * @param {string} dataDir
780 * @param {string} vaultId
781 * @param {object} candidate
782 * @returns {object}
783 */
784 export function upsertCandidate(dataDir, vaultId, candidate) {
785 const store = loadFlowStore(dataDir);
786 if (!store.vaults[vaultId]) {
787 store.vaults[vaultId] = { flows: [], steps: [], runs: [], candidates: [], projections: [] };
788 }
789 const vault = store.vaults[vaultId];
790 const idx = vault.candidates.findIndex((c) => c.candidate_id === candidate.candidate_id);
791 const row = { ...candidate, updated: candidate.updated ?? new Date().toISOString() };
792 if (idx === -1) {
793 vault.candidates.push(row);
794 } else {
795 vault.candidates[idx] = row;
796 }
797 saveFlowStore(dataDir, store);
798 return row;
799 }
800
801 /**
802 * Get one candidate when readable in caller scope, or null (no existence leak).
803 *
804 * @param {string} dataDir
805 * @param {string} vaultId
806 * @param {string} candidateId
807 * @param {Set<import('./flow-scope.mjs').FlowScope>} visibleScopes
808 * @returns {object|null}
809 */
810 export function getCandidate(dataDir, vaultId, candidateId, visibleScopes) {
811 if (!FLOW_CANDIDATE_ID_RE.test(candidateId)) return null;
812 const store = loadFlowStore(dataDir);
813 const vault = store.vaults[vaultId];
814 if (!vault) return null;
815 const row = vault.candidates.find((c) => c.candidate_id === candidateId);
816 if (!row) return null;
817 if (!visibleScopes.has(row.scope_hint)) return null;
818 return row;
819 }
820
821 /**
822 * List candidates in a vault (content-minimized rows).
823 *
824 * @param {string} dataDir
825 * @param {string} vaultId
826 * @param {{ limit?: number, statusFilter?: string }} [query]
827 * @returns {{ candidates: object[], truncated: boolean }}
828 */
829 export function listCandidatesInVault(dataDir, vaultId, query = {}) {
830 let limit = typeof query.limit === 'number' ? query.limit : 50;
831 if (!Number.isInteger(limit) || limit < 1) limit = 50;
832 if (limit > 50) limit = 50;
833
834 const store = loadFlowStore(dataDir);
835 const vault = store.vaults[vaultId] ?? { candidates: [] };
836 let rows = [...(vault.candidates ?? [])];
837 if (query.statusFilter) {
838 rows = rows.filter((c) => c.status === query.statusFilter);
839 }
840 rows.sort((a, b) => Date.parse(b.updated ?? 0) - Date.parse(a.updated ?? 0));
841 const truncated = rows.length > limit;
842 if (rows.length > limit) rows = rows.slice(0, limit);
843 return { candidates: rows, truncated };
844 }
845
846 /**
847 * Update candidate terminal/non-terminal status.
848 *
849 * @param {string} dataDir
850 * @param {string} vaultId
851 * @param {string} candidateId
852 * @param {string} status
853 * @returns {object|null}
854 */
855 export function updateCandidateStatus(dataDir, vaultId, candidateId, status) {
856 const store = loadFlowStore(dataDir);
857 const vault = store.vaults[vaultId];
858 if (!vault) return null;
859 const idx = vault.candidates.findIndex((c) => c.candidate_id === candidateId);
860 if (idx === -1) return null;
861 const prev = vault.candidates[idx].status;
862 if (prev !== 'pending_review') return null;
863 vault.candidates[idx] = {
864 ...vault.candidates[idx],
865 status,
866 updated: new Date().toISOString(),
867 };
868 saveFlowStore(dataDir, store);
869 return vault.candidates[idx];
870 }
File History 1 commit
sha256:cfe8c8cf68336f6d46318bd40610c18d9ff7df231df2fb190af1f5a9c4f4f93b fix(flow-store): versioned step keying for multi-version fl… Human minor 7 hours ago