flow-authoring.mjs
529 lines 18.7 KB
Raw
sha256:8915fe406161f95c1681f9469375e7bae5b28c884f00bedbdef65e4b0cd0738d docs(flow): commit FLOW-V0-SPEC.md hygiene for 7A-INT merge Human 13 hours ago
1 /**
2 * Flow authoring write-back facade (Phase 7A-L1b).
3 *
4 * A typed facade over the existing `/proposals` lifecycle (SD-4): drafting,
5 * editing, or importing a Flow becomes a standard proposal targeting the Flow's
6 * mirror note. There is **no second write path** — review/evaluation/approve/
7 * apply and the optimistic-concurrency check are the same machinery notes use.
8 * The Flow index changes **only** at approve→apply, by reconciling the approved
9 * mirror back into the store as a new `(flow_id, version)` row.
10 *
11 * `FLOW_AUTHORING_WRITES` defaults **off**; when off every propose/import returns
12 * `403 FLOW_AUTHORING_DISABLED` and no write path is reachable.
13 *
14 * @see docs/FLOW-AUTHORING-WRITEBACK-CONTRACT-7A-L1.md
15 * @see docs/FLOW-STORE-CONTRACT-7A-10.md
16 */
17
18 import fs from 'fs';
19 import path from 'path';
20
21 import { fnv1a64Hex, stableStringify } from '../note-state-id.mjs';
22 import {
23 validateFlowBundle,
24 flowDefinitionForClient,
25 latestStoredFlow,
26 upsertFlowVersion,
27 loadFlowStore,
28 parseSemver,
29 compareSemver,
30 FLOW_ID_RE,
31 SEMVER_RE,
32 } from './flow-store.mjs';
33 import {
34 resolveFlowVisibleScopes,
35 resolveFlowWriteAuthority,
36 } from './flow-scope.mjs';
37 import {
38 readVaultExternalAgentPolicy,
39 validateImportExternalTools,
40 } from './external-agent.mjs';
41 import { validateImportAutomatableSteps } from './flow-execution.mjs';
42
43 export const FLOW_STATE_ID_PREFIX = 'flowst1_';
44 export const FLOW_AUTHORING_POLICY_FILE = 'hub_flow_authoring_policy.json';
45 export const FLOW_PROPOSAL_SCHEMA = 'knowtation.flow_proposal/v0';
46 export const FLOW_PROPOSAL_SOURCE = 'flow';
47 export const FLOW_REVIEW_QUEUE = 'flow-authoring';
48
49 /** @typedef {import('./flow-scope.mjs').FlowScope} FlowScope */
50 /** @typedef {'new'|'edit'|'import'} FlowProposeKind */
51
52 /**
53 * Canonicalize a flow record to the stable subset used by `flowStateId`.
54 * Mirrors `flowDefinitionForClient` so a token computed from a `flow get`
55 * payload reproduces server-side byte-for-byte.
56 *
57 * @param {Record<string, unknown>} flow
58 * @returns {Record<string, unknown>}
59 */
60 function canonicalFlowForState(flow) {
61 return {
62 schema: 'knowtation.flow/v0',
63 flow_id: flow.flow_id,
64 title: flow.title,
65 version: flow.version,
66 scope: flow.scope,
67 summary: flow.summary,
68 tags: Array.isArray(flow.tags) ? flow.tags : [],
69 steps: Array.isArray(flow.steps) ? flow.steps : [],
70 inputs: Array.isArray(flow.inputs) ? flow.inputs : [],
71 vault_mirror_path: typeof flow.vault_mirror_path === 'string' ? flow.vault_mirror_path : null,
72 updated: flow.updated,
73 truncated: flow.truncated === true,
74 };
75 }
76
77 /**
78 * @param {Record<string, unknown>} step
79 * @returns {Record<string, unknown>}
80 */
81 function canonicalStepForState(step) {
82 return {
83 schema: 'knowtation.flow_step/v0',
84 step_id: step.step_id,
85 flow_id: step.flow_id,
86 ordinal: step.ordinal,
87 owned_job: step.owned_job,
88 instruction: step.instruction,
89 trigger: step.trigger,
90 when_not_to_run: step.when_not_to_run,
91 requires: Array.isArray(step.requires) ? step.requires : [],
92 boundaries: Array.isArray(step.boundaries) ? step.boundaries : [],
93 skill_refs: Array.isArray(step.skill_refs) ? step.skill_refs : [],
94 inputs: Array.isArray(step.inputs) ? step.inputs : [],
95 outputs: Array.isArray(step.outputs) ? step.outputs : [],
96 output_shape: step.output_shape,
97 verification: step.verification,
98 automatable: step.automatable,
99 };
100 }
101
102 /**
103 * Deterministic optimistic-concurrency token over a flow definition + ordered
104 * steps. `flowst1_<16 hex>` = FNV-1a 64-bit over the key-sorted canonical
105 * content. Reuses `fnv1a64Hex` + `stableStringify` from `lib/note-state-id.mjs`.
106 *
107 * @param {Record<string, unknown>} flow
108 * @param {Record<string, unknown>[]} steps
109 * @returns {string}
110 */
111 export function flowStateId(flow, steps) {
112 const orderedSteps = [...(Array.isArray(steps) ? steps : [])]
113 .map((s) => canonicalStepForState(s))
114 .sort((a, b) => Number(a.ordinal) - Number(b.ordinal));
115 const payload = stableStringify({
116 flow: canonicalFlowForState(flow || {}),
117 steps: orderedSteps,
118 });
119 return FLOW_STATE_ID_PREFIX + fnv1a64Hex(Buffer.from(payload, 'utf8'));
120 }
121
122 /**
123 * State token for a flow that must still be **absent** (propose-new). Mirrors
124 * the note `absentNoteStateId` sentinel.
125 *
126 * @returns {string}
127 */
128 export function absentFlowStateId() {
129 return FLOW_STATE_ID_PREFIX + fnv1a64Hex(Buffer.from([0x00]));
130 }
131
132 /** @param {unknown} v */
133 function envTriState(v) {
134 if (v === '1' || v === 'true') return true;
135 if (v === '0' || v === 'false') return false;
136 return null;
137 }
138
139 /**
140 * @param {string} dataDir
141 * @returns {{ flow_authoring_writes_enabled?: boolean, flow_authoring_forbidden?: boolean }}
142 */
143 export function readFlowAuthoringPolicyFile(dataDir) {
144 if (!dataDir) return {};
145 const fp = path.join(dataDir, FLOW_AUTHORING_POLICY_FILE);
146 try {
147 if (!fs.existsSync(fp)) return {};
148 const j = JSON.parse(fs.readFileSync(fp, 'utf8'));
149 if (!j || typeof j !== 'object') return {};
150 const out = {};
151 if (typeof j.flow_authoring_writes_enabled === 'boolean') {
152 out.flow_authoring_writes_enabled = j.flow_authoring_writes_enabled;
153 }
154 if (typeof j.flow_authoring_forbidden === 'boolean') {
155 out.flow_authoring_forbidden = j.flow_authoring_forbidden;
156 }
157 return out;
158 } catch {
159 return {};
160 }
161 }
162
163 /**
164 * Whether durable Flow authoring writes are enabled (tri-state, default OFF).
165 * Precedence: explicit `FLOW_AUTHORING_WRITES` env (1/true|0/false) overrides the
166 * policy file; else file; else default `false`.
167 *
168 * @param {string} dataDir
169 * @returns {boolean}
170 */
171 export function getFlowAuthoringWritesEnabled(dataDir) {
172 const fromEnv = envTriState(process.env.FLOW_AUTHORING_WRITES);
173 if (fromEnv !== null) return fromEnv;
174 return readFlowAuthoringPolicyFile(dataDir).flow_authoring_writes_enabled === true;
175 }
176
177 /**
178 * Whether an org/classroom policy forbids authoring entirely (default false).
179 *
180 * @param {string} dataDir
181 * @returns {boolean}
182 */
183 export function getFlowAuthoringForbidden(dataDir) {
184 const fromEnv = envTriState(process.env.FLOW_AUTHORING_FORBIDDEN);
185 if (fromEnv !== null) return fromEnv;
186 return readFlowAuthoringPolicyFile(dataDir).flow_authoring_forbidden === true;
187 }
188
189 /**
190 * Server-derive `auto_approvable` from the bundle's verification kinds. A draft
191 * has no `auto_approvable` field; any `human_review` step ⇒ `false` so a draft
192 * can never self-authorize.
193 *
194 * @param {{ verification?: { kind?: string } }[]} steps
195 * @returns {boolean}
196 */
197 export function deriveAutoApprovable(steps) {
198 if (!Array.isArray(steps) || steps.length === 0) return false;
199 return steps.every((s) => s?.verification?.kind && s.verification.kind !== 'human_review');
200 }
201
202 /**
203 * @param {string} flowId
204 * @returns {string}
205 */
206 function defaultMirrorPath(flowId) {
207 const slug = flowId.replace(/^flow_/, '').replace(/_/g, '-');
208 return `meta/flows/${slug}.md`;
209 }
210
211 /**
212 * @param {Set<FlowScope>} [a]
213 * @param {Set<FlowScope>} [b]
214 * @returns {Set<FlowScope>}
215 */
216 function unionScopes(a, b) {
217 const out = new Set();
218 if (a) for (const s of a) out.add(s);
219 if (b) for (const s of b) out.add(s);
220 if (out.size === 0) out.add('personal');
221 return out;
222 }
223
224 /**
225 * @param {object} input
226 * @returns {{ visibleScopes: Set<FlowScope>, ambiguous: boolean }}
227 */
228 function resolveWriteScopes(input) {
229 if (input.ambiguous === true) {
230 return { visibleScopes: new Set(['personal']), ambiguous: true };
231 }
232 if (input.visibleScopes instanceof Set) {
233 return { visibleScopes: input.visibleScopes, ambiguous: false };
234 }
235 return resolveFlowVisibleScopes({
236 dataDir: input.dataDir,
237 userId: input.userId,
238 vaultId: input.vaultId,
239 role: input.role,
240 cliScopes: input.cliScopes,
241 });
242 }
243
244 /**
245 * @param {number} status
246 * @param {string} code
247 * @param {string} [error]
248 */
249 function refuse(status, code, error) {
250 return { ok: false, status, error: error ?? code, code };
251 }
252
253 /**
254 * THE one handler — MCP `flow_propose`/`flow_import`, Hub `POST /api/v1/flows`
255 * (+`/{id}/proposals`, `/import`), and CLI `flow propose|import` all converge
256 * here. Validates the bundle, resolves write authority server-side, runs the
257 * propose-time concurrency precheck, and delegates to the proposal create
258 * lifecycle. Never writes the Flow index (that happens only at approve→apply).
259 *
260 * @param {{
261 * dataDir: string,
262 * vaultId: string,
263 * userId?: string,
264 * role?: string,
265 * cliScopes?: FlowScope[],
266 * visibleScopes?: Set<FlowScope>,
267 * ambiguous?: boolean,
268 * kind: FlowProposeKind,
269 * flow?: unknown,
270 * steps?: unknown,
271 * bundle?: { flow?: unknown, steps?: unknown },
272 * intent?: unknown,
273 * flowId?: string,
274 * baseVersion?: string,
275 * baseStateId?: string,
276 * externalRef?: string,
277 * sourceVaultHint?: string,
278 * createProposal: (dataDir: string, input: object) => { proposal_id: string },
279 * starterDir?: string,
280 * }} input
281 * @returns {{ ok: true, payload: object } | { ok: false, status: number, error: string, code: string }}
282 */
283 export function handleFlowProposeRequest(input) {
284 const isImport = input.kind === 'import';
285 const malformedCode = isImport ? 'FLOW_IMPORT_BUNDLE_MALFORMED' : 'FLOW_DRAFT_INVALID';
286
287 // Gating — fail closed before any work.
288 if (getFlowAuthoringForbidden(input.dataDir)) {
289 return refuse(403, 'FLOW_AUTHORING_POLICY_FORBIDDEN', 'Flow authoring forbidden by policy');
290 }
291 if (!getFlowAuthoringWritesEnabled(input.dataDir)) {
292 return refuse(403, 'FLOW_AUTHORING_DISABLED', 'Flow authoring writes are disabled');
293 }
294
295 if (typeof input.createProposal !== 'function') {
296 return refuse(500, 'RUNTIME_ERROR', 'createProposal is required');
297 }
298
299 // Intent — required, untrusted, recorded verbatim.
300 const intent = typeof input.intent === 'string' ? input.intent.trim() : '';
301 if (!intent) {
302 return refuse(400, malformedCode, 'intent is required');
303 }
304
305 // Bundle shape + anatomy completeness.
306 const rawBundle = isImport
307 ? input.bundle && typeof input.bundle === 'object'
308 ? input.bundle
309 : {}
310 : { flow: input.flow, steps: input.steps };
311 const validated = validateFlowBundle(rawBundle);
312 if (!validated.ok) {
313 return refuse(400, malformedCode, validated.reason);
314 }
315 const { flow, steps } = validated;
316
317 // For an edit, the request flow_id must match the bundle.
318 if (input.kind === 'edit') {
319 const requestedId = typeof input.flowId === 'string' ? input.flowId.trim() : '';
320 if (requestedId && requestedId !== flow.flow_id) {
321 return refuse(400, 'FLOW_DRAFT_INVALID', 'flow_id mismatch between path and bundle');
322 }
323 }
324
325 // Scope resolution (deny-by-default; ambiguous fails closed).
326 const resolved = resolveWriteScopes(input);
327 if (resolved.ambiguous) {
328 return refuse(400, 'FLOW_SCOPE_AMBIGUOUS', 'Ambiguous flow scope');
329 }
330
331 // Write authority — scope × role, server-side; no scope widening from inside.
332 const authority = resolveFlowWriteAuthority(resolved.visibleScopes, flow.scope);
333 if (!authority.ok) {
334 const code = isImport && authority.code === 'FLOW_SCOPE_DENIED'
335 ? 'FLOW_IMPORT_SCOPE_DENIED'
336 : authority.code;
337 return refuse(authority.status, code, authority.error);
338 }
339
340 // Import sandbox: external_tool refs must be in vault allowlist (FLOW-V0-SPEC §6 item 3).
341 if (isImport) {
342 const vaultPolicy = readVaultExternalAgentPolicy(input.dataDir);
343 const externalCheck = validateImportExternalTools(steps, vaultPolicy.allowedTools);
344 if (!externalCheck.ok && vaultPolicy.importPolicy === 'reject') {
345 return refuse(403, 'FLOW_IMPORT_EXTERNAL_TOOL_DENIED', 'Import declares tools outside allowlist');
346 }
347 const automatableCheck = validateImportAutomatableSteps(steps, input.dataDir);
348 if (!automatableCheck.ok) {
349 return refuse(403, 'FLOW_IMPORT_AUTOMATABLE_DENIED', 'Import declares automatable steps where policy forbids');
350 }
351 }
352
353 // Optimistic concurrency precheck (fast fail; approve re-checks authoritatively).
354 const store = loadFlowStore(input.dataDir);
355 const vault = store.vaults[input.vaultId];
356 const current = vault ? latestStoredFlow(vault, flow.flow_id) : null;
357 // Only flows in a scope the actor may read are "visible"; others are absent to them.
358 const currentVisible =
359 current && resolved.visibleScopes.has(current.flow.scope) ? current : null;
360
361 let proposalBaseStateId;
362 let proposalBaseVersion = null;
363
364 if (input.kind === 'edit') {
365 const baseVersion = typeof input.baseVersion === 'string' ? input.baseVersion.trim() : '';
366 const baseStateId = typeof input.baseStateId === 'string' ? input.baseStateId.trim() : '';
367 if (!baseVersion || !SEMVER_RE.test(baseVersion) || !baseStateId.startsWith(FLOW_STATE_ID_PREFIX)) {
368 return refuse(400, 'FLOW_DRAFT_INVALID', 'edit requires base_version + base_state_id');
369 }
370 // No existence leak: an unreadable/missing flow is uniformly unknown_flow.
371 if (!currentVisible) {
372 return refuse(404, 'unknown_flow', 'unknown_flow');
373 }
374 const canonical = flowDefinitionForClient(currentVisible.flow, currentVisible.steps);
375 const serverStateId = flowStateId(canonical.flow, canonical.steps);
376 if (currentVisible.flow.version !== baseVersion || serverStateId !== baseStateId) {
377 return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow changed since edit was based');
378 }
379 const next = parseSemver(flow.version);
380 const base = parseSemver(baseVersion);
381 if (!next || !base || compareSemver(next, base) <= 0) {
382 return refuse(400, 'FLOW_DRAFT_INVALID', 'flow.version must be greater than base_version');
383 }
384 proposalBaseStateId = baseStateId;
385 proposalBaseVersion = baseVersion;
386 } else {
387 // New (and import-as-new): the flow_id must still be absent in the actor's scope.
388 if (currentVisible) {
389 return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow_id already exists in scope');
390 }
391 proposalBaseStateId = absentFlowStateId();
392 }
393
394 const autoApprovable = deriveAutoApprovable(steps);
395
396 // Build the mirror-note proposal (review-before-write; no index write here).
397 const mirrorPath = flow.vault_mirror_path || defaultMirrorPath(flow.flow_id);
398 const body = JSON.stringify({ flow, steps }, null, 2);
399 const frontmatter = {
400 type: 'flow',
401 flow_id: flow.flow_id,
402 flow_version: flow.version,
403 scope: flow.scope,
404 };
405 const externalRef = buildExternalRef(input);
406
407 const proposal = input.createProposal(input.dataDir, {
408 path: mirrorPath,
409 body,
410 frontmatter,
411 intent,
412 base_state_id: proposalBaseStateId,
413 external_ref: externalRef || undefined,
414 source: FLOW_PROPOSAL_SOURCE,
415 vault_id: input.vaultId,
416 proposed_by: typeof input.userId === 'string' && input.userId.trim() ? input.userId.trim() : undefined,
417 review_queue: FLOW_REVIEW_QUEUE,
418 flow_meta: {
419 kind: isImport ? 'import' : input.kind,
420 base_version: proposalBaseVersion,
421 base_state_id: proposalBaseStateId,
422 },
423 });
424
425 return {
426 ok: true,
427 payload: {
428 schema: FLOW_PROPOSAL_SCHEMA,
429 proposal_id: proposal.proposal_id,
430 flow_id: flow.flow_id,
431 base_version: proposalBaseVersion,
432 base_state_id: input.kind === 'edit' ? proposalBaseStateId : null,
433 scope: flow.scope,
434 auto_approvable: autoApprovable,
435 status: 'proposed',
436 review_queue: FLOW_REVIEW_QUEUE,
437 },
438 };
439 }
440
441 /**
442 * Build a pointer-only lineage `external_ref` for an import (labels/pointers
443 * only — never content or secrets).
444 *
445 * @param {{ kind: FlowProposeKind, externalRef?: string, sourceVaultHint?: string }} input
446 * @returns {string}
447 */
448 function buildExternalRef(input) {
449 if (input.kind !== 'import') return '';
450 const parts = [];
451 if (typeof input.externalRef === 'string' && input.externalRef.trim()) {
452 parts.push(input.externalRef.trim().slice(0, 256));
453 }
454 if (typeof input.sourceVaultHint === 'string' && input.sourceVaultHint.trim()) {
455 parts.push(`source_vault_hint=${input.sourceVaultHint.trim().slice(0, 128)}`);
456 }
457 return parts.join(' ').slice(0, 512);
458 }
459
460 /**
461 * Approve-time **authoritative** concurrency re-check + bundle parse for a Flow
462 * proposal (the binding check). Run BEFORE the mirror note is written so a
463 * conflict short-circuits with no partial state.
464 *
465 * @param {string} dataDir
466 * @param {object} proposal - the stored proposal (source === 'flow').
467 * @returns {{ ok: true, vaultId: string, flow: object, steps: object[] } | { ok: false, status: number, error: string, code: string }}
468 */
469 export function precheckApprovedFlowProposal(dataDir, proposal) {
470 let parsed;
471 try {
472 parsed = JSON.parse(typeof proposal.body === 'string' ? proposal.body : '');
473 } catch {
474 return refuse(400, 'FLOW_DRAFT_INVALID', 'flow proposal body is not valid JSON');
475 }
476 const validated = validateFlowBundle(parsed);
477 if (!validated.ok) {
478 return refuse(400, 'FLOW_DRAFT_INVALID', validated.reason);
479 }
480 const { flow, steps } = validated;
481 const vaultId = typeof proposal.vault_id === 'string' && proposal.vault_id.trim()
482 ? proposal.vault_id.trim()
483 : 'default';
484 const meta = proposal.flow_meta && typeof proposal.flow_meta === 'object' ? proposal.flow_meta : {};
485 const kind = meta.kind === 'edit' ? 'edit' : 'new';
486
487 const store = loadFlowStore(dataDir);
488 const vault = store.vaults[vaultId];
489 const current = vault ? latestStoredFlow(vault, flow.flow_id) : null;
490
491 if (kind === 'edit') {
492 if (!current) {
493 return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow disappeared before approve');
494 }
495 const baseVersion = typeof meta.base_version === 'string' ? meta.base_version : '';
496 const baseStateId = typeof meta.base_state_id === 'string' ? meta.base_state_id : '';
497 const canonical = flowDefinitionForClient(current.flow, current.steps);
498 const serverStateId = flowStateId(canonical.flow, canonical.steps);
499 if (current.flow.version !== baseVersion || serverStateId !== baseStateId) {
500 return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow changed since edit was based');
501 }
502 const next = parseSemver(flow.version);
503 const base = parseSemver(baseVersion);
504 if (!next || !base || compareSemver(next, base) <= 0) {
505 return refuse(400, 'FLOW_DRAFT_INVALID', 'flow.version must be greater than base_version');
506 }
507 } else if (current) {
508 return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow_id already exists');
509 }
510
511 return { ok: true, vaultId, flow, steps };
512 }
513
514 /**
515 * Apply a pre-checked Flow bundle into the index as a new `(flow_id, version)`
516 * row. Call AFTER the mirror note write succeeds; the bundle has already been
517 * validated by {@link precheckApprovedFlowProposal} so this cannot fail on shape.
518 *
519 * @param {string} dataDir
520 * @param {string} vaultId
521 * @param {object} flow
522 * @param {object[]} steps
523 * @returns {{ created: boolean, version: string }}
524 */
525 export function applyFlowProposalToIndex(dataDir, vaultId, flow, steps) {
526 return upsertFlowVersion(dataDir, vaultId, flow, steps);
527 }
528
529 export { FLOW_ID_RE };
File History 1 commit
sha256:8915fe406161f95c1681f9469375e7bae5b28c884f00bedbdef65e4b0cd0738d docs(flow): commit FLOW-V0-SPEC.md hygiene for 7A-INT merge Human 13 hours ago