flow-authoring.mjs
sha256:8915fe406161f95c1681f9469375e7bae5b28c884f00bedbdef65e4b0cd0738d
docs(flow): commit FLOW-V0-SPEC.md hygiene for 7A-INT merge
Human
13 hours ago
| 1 | /** |
| 2 | * Flow authoring write-back facade (Phase 7A-L1b). |
| 3 | * |
| 4 | * A typed facade over the existing `/proposals` lifecycle (SD-4): drafting, |
| 5 | * editing, or importing a Flow becomes a standard proposal targeting the Flow's |
| 6 | * mirror note. There is **no second write path** — review/evaluation/approve/ |
| 7 | * apply and the optimistic-concurrency check are the same machinery notes use. |
| 8 | * The Flow index changes **only** at approve→apply, by reconciling the approved |
| 9 | * mirror back into the store as a new `(flow_id, version)` row. |
| 10 | * |
| 11 | * `FLOW_AUTHORING_WRITES` defaults **off**; when off every propose/import returns |
| 12 | * `403 FLOW_AUTHORING_DISABLED` and no write path is reachable. |
| 13 | * |
| 14 | * @see docs/FLOW-AUTHORING-WRITEBACK-CONTRACT-7A-L1.md |
| 15 | * @see docs/FLOW-STORE-CONTRACT-7A-10.md |
| 16 | */ |
| 17 | |
| 18 | import fs from 'fs'; |
| 19 | import path from 'path'; |
| 20 | |
| 21 | import { fnv1a64Hex, stableStringify } from '../note-state-id.mjs'; |
| 22 | import { |
| 23 | validateFlowBundle, |
| 24 | flowDefinitionForClient, |
| 25 | latestStoredFlow, |
| 26 | upsertFlowVersion, |
| 27 | loadFlowStore, |
| 28 | parseSemver, |
| 29 | compareSemver, |
| 30 | FLOW_ID_RE, |
| 31 | SEMVER_RE, |
| 32 | } from './flow-store.mjs'; |
| 33 | import { |
| 34 | resolveFlowVisibleScopes, |
| 35 | resolveFlowWriteAuthority, |
| 36 | } from './flow-scope.mjs'; |
| 37 | import { |
| 38 | readVaultExternalAgentPolicy, |
| 39 | validateImportExternalTools, |
| 40 | } from './external-agent.mjs'; |
| 41 | import { validateImportAutomatableSteps } from './flow-execution.mjs'; |
| 42 | |
| 43 | export const FLOW_STATE_ID_PREFIX = 'flowst1_'; |
| 44 | export const FLOW_AUTHORING_POLICY_FILE = 'hub_flow_authoring_policy.json'; |
| 45 | export const FLOW_PROPOSAL_SCHEMA = 'knowtation.flow_proposal/v0'; |
| 46 | export const FLOW_PROPOSAL_SOURCE = 'flow'; |
| 47 | export const FLOW_REVIEW_QUEUE = 'flow-authoring'; |
| 48 | |
| 49 | /** @typedef {import('./flow-scope.mjs').FlowScope} FlowScope */ |
| 50 | /** @typedef {'new'|'edit'|'import'} FlowProposeKind */ |
| 51 | |
| 52 | /** |
| 53 | * Canonicalize a flow record to the stable subset used by `flowStateId`. |
| 54 | * Mirrors `flowDefinitionForClient` so a token computed from a `flow get` |
| 55 | * payload reproduces server-side byte-for-byte. |
| 56 | * |
| 57 | * @param {Record<string, unknown>} flow |
| 58 | * @returns {Record<string, unknown>} |
| 59 | */ |
| 60 | function canonicalFlowForState(flow) { |
| 61 | return { |
| 62 | schema: 'knowtation.flow/v0', |
| 63 | flow_id: flow.flow_id, |
| 64 | title: flow.title, |
| 65 | version: flow.version, |
| 66 | scope: flow.scope, |
| 67 | summary: flow.summary, |
| 68 | tags: Array.isArray(flow.tags) ? flow.tags : [], |
| 69 | steps: Array.isArray(flow.steps) ? flow.steps : [], |
| 70 | inputs: Array.isArray(flow.inputs) ? flow.inputs : [], |
| 71 | vault_mirror_path: typeof flow.vault_mirror_path === 'string' ? flow.vault_mirror_path : null, |
| 72 | updated: flow.updated, |
| 73 | truncated: flow.truncated === true, |
| 74 | }; |
| 75 | } |
| 76 | |
| 77 | /** |
| 78 | * @param {Record<string, unknown>} step |
| 79 | * @returns {Record<string, unknown>} |
| 80 | */ |
| 81 | function canonicalStepForState(step) { |
| 82 | return { |
| 83 | schema: 'knowtation.flow_step/v0', |
| 84 | step_id: step.step_id, |
| 85 | flow_id: step.flow_id, |
| 86 | ordinal: step.ordinal, |
| 87 | owned_job: step.owned_job, |
| 88 | instruction: step.instruction, |
| 89 | trigger: step.trigger, |
| 90 | when_not_to_run: step.when_not_to_run, |
| 91 | requires: Array.isArray(step.requires) ? step.requires : [], |
| 92 | boundaries: Array.isArray(step.boundaries) ? step.boundaries : [], |
| 93 | skill_refs: Array.isArray(step.skill_refs) ? step.skill_refs : [], |
| 94 | inputs: Array.isArray(step.inputs) ? step.inputs : [], |
| 95 | outputs: Array.isArray(step.outputs) ? step.outputs : [], |
| 96 | output_shape: step.output_shape, |
| 97 | verification: step.verification, |
| 98 | automatable: step.automatable, |
| 99 | }; |
| 100 | } |
| 101 | |
| 102 | /** |
| 103 | * Deterministic optimistic-concurrency token over a flow definition + ordered |
| 104 | * steps. `flowst1_<16 hex>` = FNV-1a 64-bit over the key-sorted canonical |
| 105 | * content. Reuses `fnv1a64Hex` + `stableStringify` from `lib/note-state-id.mjs`. |
| 106 | * |
| 107 | * @param {Record<string, unknown>} flow |
| 108 | * @param {Record<string, unknown>[]} steps |
| 109 | * @returns {string} |
| 110 | */ |
| 111 | export function flowStateId(flow, steps) { |
| 112 | const orderedSteps = [...(Array.isArray(steps) ? steps : [])] |
| 113 | .map((s) => canonicalStepForState(s)) |
| 114 | .sort((a, b) => Number(a.ordinal) - Number(b.ordinal)); |
| 115 | const payload = stableStringify({ |
| 116 | flow: canonicalFlowForState(flow || {}), |
| 117 | steps: orderedSteps, |
| 118 | }); |
| 119 | return FLOW_STATE_ID_PREFIX + fnv1a64Hex(Buffer.from(payload, 'utf8')); |
| 120 | } |
| 121 | |
| 122 | /** |
| 123 | * State token for a flow that must still be **absent** (propose-new). Mirrors |
| 124 | * the note `absentNoteStateId` sentinel. |
| 125 | * |
| 126 | * @returns {string} |
| 127 | */ |
| 128 | export function absentFlowStateId() { |
| 129 | return FLOW_STATE_ID_PREFIX + fnv1a64Hex(Buffer.from([0x00])); |
| 130 | } |
| 131 | |
| 132 | /** @param {unknown} v */ |
| 133 | function envTriState(v) { |
| 134 | if (v === '1' || v === 'true') return true; |
| 135 | if (v === '0' || v === 'false') return false; |
| 136 | return null; |
| 137 | } |
| 138 | |
| 139 | /** |
| 140 | * @param {string} dataDir |
| 141 | * @returns {{ flow_authoring_writes_enabled?: boolean, flow_authoring_forbidden?: boolean }} |
| 142 | */ |
| 143 | export function readFlowAuthoringPolicyFile(dataDir) { |
| 144 | if (!dataDir) return {}; |
| 145 | const fp = path.join(dataDir, FLOW_AUTHORING_POLICY_FILE); |
| 146 | try { |
| 147 | if (!fs.existsSync(fp)) return {}; |
| 148 | const j = JSON.parse(fs.readFileSync(fp, 'utf8')); |
| 149 | if (!j || typeof j !== 'object') return {}; |
| 150 | const out = {}; |
| 151 | if (typeof j.flow_authoring_writes_enabled === 'boolean') { |
| 152 | out.flow_authoring_writes_enabled = j.flow_authoring_writes_enabled; |
| 153 | } |
| 154 | if (typeof j.flow_authoring_forbidden === 'boolean') { |
| 155 | out.flow_authoring_forbidden = j.flow_authoring_forbidden; |
| 156 | } |
| 157 | return out; |
| 158 | } catch { |
| 159 | return {}; |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | /** |
| 164 | * Whether durable Flow authoring writes are enabled (tri-state, default OFF). |
| 165 | * Precedence: explicit `FLOW_AUTHORING_WRITES` env (1/true|0/false) overrides the |
| 166 | * policy file; else file; else default `false`. |
| 167 | * |
| 168 | * @param {string} dataDir |
| 169 | * @returns {boolean} |
| 170 | */ |
| 171 | export function getFlowAuthoringWritesEnabled(dataDir) { |
| 172 | const fromEnv = envTriState(process.env.FLOW_AUTHORING_WRITES); |
| 173 | if (fromEnv !== null) return fromEnv; |
| 174 | return readFlowAuthoringPolicyFile(dataDir).flow_authoring_writes_enabled === true; |
| 175 | } |
| 176 | |
| 177 | /** |
| 178 | * Whether an org/classroom policy forbids authoring entirely (default false). |
| 179 | * |
| 180 | * @param {string} dataDir |
| 181 | * @returns {boolean} |
| 182 | */ |
| 183 | export function getFlowAuthoringForbidden(dataDir) { |
| 184 | const fromEnv = envTriState(process.env.FLOW_AUTHORING_FORBIDDEN); |
| 185 | if (fromEnv !== null) return fromEnv; |
| 186 | return readFlowAuthoringPolicyFile(dataDir).flow_authoring_forbidden === true; |
| 187 | } |
| 188 | |
| 189 | /** |
| 190 | * Server-derive `auto_approvable` from the bundle's verification kinds. A draft |
| 191 | * has no `auto_approvable` field; any `human_review` step ⇒ `false` so a draft |
| 192 | * can never self-authorize. |
| 193 | * |
| 194 | * @param {{ verification?: { kind?: string } }[]} steps |
| 195 | * @returns {boolean} |
| 196 | */ |
| 197 | export function deriveAutoApprovable(steps) { |
| 198 | if (!Array.isArray(steps) || steps.length === 0) return false; |
| 199 | return steps.every((s) => s?.verification?.kind && s.verification.kind !== 'human_review'); |
| 200 | } |
| 201 | |
| 202 | /** |
| 203 | * @param {string} flowId |
| 204 | * @returns {string} |
| 205 | */ |
| 206 | function defaultMirrorPath(flowId) { |
| 207 | const slug = flowId.replace(/^flow_/, '').replace(/_/g, '-'); |
| 208 | return `meta/flows/${slug}.md`; |
| 209 | } |
| 210 | |
| 211 | /** |
| 212 | * @param {Set<FlowScope>} [a] |
| 213 | * @param {Set<FlowScope>} [b] |
| 214 | * @returns {Set<FlowScope>} |
| 215 | */ |
| 216 | function unionScopes(a, b) { |
| 217 | const out = new Set(); |
| 218 | if (a) for (const s of a) out.add(s); |
| 219 | if (b) for (const s of b) out.add(s); |
| 220 | if (out.size === 0) out.add('personal'); |
| 221 | return out; |
| 222 | } |
| 223 | |
| 224 | /** |
| 225 | * @param {object} input |
| 226 | * @returns {{ visibleScopes: Set<FlowScope>, ambiguous: boolean }} |
| 227 | */ |
| 228 | function resolveWriteScopes(input) { |
| 229 | if (input.ambiguous === true) { |
| 230 | return { visibleScopes: new Set(['personal']), ambiguous: true }; |
| 231 | } |
| 232 | if (input.visibleScopes instanceof Set) { |
| 233 | return { visibleScopes: input.visibleScopes, ambiguous: false }; |
| 234 | } |
| 235 | return resolveFlowVisibleScopes({ |
| 236 | dataDir: input.dataDir, |
| 237 | userId: input.userId, |
| 238 | vaultId: input.vaultId, |
| 239 | role: input.role, |
| 240 | cliScopes: input.cliScopes, |
| 241 | }); |
| 242 | } |
| 243 | |
| 244 | /** |
| 245 | * @param {number} status |
| 246 | * @param {string} code |
| 247 | * @param {string} [error] |
| 248 | */ |
| 249 | function refuse(status, code, error) { |
| 250 | return { ok: false, status, error: error ?? code, code }; |
| 251 | } |
| 252 | |
| 253 | /** |
| 254 | * THE one handler — MCP `flow_propose`/`flow_import`, Hub `POST /api/v1/flows` |
| 255 | * (+`/{id}/proposals`, `/import`), and CLI `flow propose|import` all converge |
| 256 | * here. Validates the bundle, resolves write authority server-side, runs the |
| 257 | * propose-time concurrency precheck, and delegates to the proposal create |
| 258 | * lifecycle. Never writes the Flow index (that happens only at approve→apply). |
| 259 | * |
| 260 | * @param {{ |
| 261 | * dataDir: string, |
| 262 | * vaultId: string, |
| 263 | * userId?: string, |
| 264 | * role?: string, |
| 265 | * cliScopes?: FlowScope[], |
| 266 | * visibleScopes?: Set<FlowScope>, |
| 267 | * ambiguous?: boolean, |
| 268 | * kind: FlowProposeKind, |
| 269 | * flow?: unknown, |
| 270 | * steps?: unknown, |
| 271 | * bundle?: { flow?: unknown, steps?: unknown }, |
| 272 | * intent?: unknown, |
| 273 | * flowId?: string, |
| 274 | * baseVersion?: string, |
| 275 | * baseStateId?: string, |
| 276 | * externalRef?: string, |
| 277 | * sourceVaultHint?: string, |
| 278 | * createProposal: (dataDir: string, input: object) => { proposal_id: string }, |
| 279 | * starterDir?: string, |
| 280 | * }} input |
| 281 | * @returns {{ ok: true, payload: object } | { ok: false, status: number, error: string, code: string }} |
| 282 | */ |
| 283 | export function handleFlowProposeRequest(input) { |
| 284 | const isImport = input.kind === 'import'; |
| 285 | const malformedCode = isImport ? 'FLOW_IMPORT_BUNDLE_MALFORMED' : 'FLOW_DRAFT_INVALID'; |
| 286 | |
| 287 | // Gating — fail closed before any work. |
| 288 | if (getFlowAuthoringForbidden(input.dataDir)) { |
| 289 | return refuse(403, 'FLOW_AUTHORING_POLICY_FORBIDDEN', 'Flow authoring forbidden by policy'); |
| 290 | } |
| 291 | if (!getFlowAuthoringWritesEnabled(input.dataDir)) { |
| 292 | return refuse(403, 'FLOW_AUTHORING_DISABLED', 'Flow authoring writes are disabled'); |
| 293 | } |
| 294 | |
| 295 | if (typeof input.createProposal !== 'function') { |
| 296 | return refuse(500, 'RUNTIME_ERROR', 'createProposal is required'); |
| 297 | } |
| 298 | |
| 299 | // Intent — required, untrusted, recorded verbatim. |
| 300 | const intent = typeof input.intent === 'string' ? input.intent.trim() : ''; |
| 301 | if (!intent) { |
| 302 | return refuse(400, malformedCode, 'intent is required'); |
| 303 | } |
| 304 | |
| 305 | // Bundle shape + anatomy completeness. |
| 306 | const rawBundle = isImport |
| 307 | ? input.bundle && typeof input.bundle === 'object' |
| 308 | ? input.bundle |
| 309 | : {} |
| 310 | : { flow: input.flow, steps: input.steps }; |
| 311 | const validated = validateFlowBundle(rawBundle); |
| 312 | if (!validated.ok) { |
| 313 | return refuse(400, malformedCode, validated.reason); |
| 314 | } |
| 315 | const { flow, steps } = validated; |
| 316 | |
| 317 | // For an edit, the request flow_id must match the bundle. |
| 318 | if (input.kind === 'edit') { |
| 319 | const requestedId = typeof input.flowId === 'string' ? input.flowId.trim() : ''; |
| 320 | if (requestedId && requestedId !== flow.flow_id) { |
| 321 | return refuse(400, 'FLOW_DRAFT_INVALID', 'flow_id mismatch between path and bundle'); |
| 322 | } |
| 323 | } |
| 324 | |
| 325 | // Scope resolution (deny-by-default; ambiguous fails closed). |
| 326 | const resolved = resolveWriteScopes(input); |
| 327 | if (resolved.ambiguous) { |
| 328 | return refuse(400, 'FLOW_SCOPE_AMBIGUOUS', 'Ambiguous flow scope'); |
| 329 | } |
| 330 | |
| 331 | // Write authority — scope × role, server-side; no scope widening from inside. |
| 332 | const authority = resolveFlowWriteAuthority(resolved.visibleScopes, flow.scope); |
| 333 | if (!authority.ok) { |
| 334 | const code = isImport && authority.code === 'FLOW_SCOPE_DENIED' |
| 335 | ? 'FLOW_IMPORT_SCOPE_DENIED' |
| 336 | : authority.code; |
| 337 | return refuse(authority.status, code, authority.error); |
| 338 | } |
| 339 | |
| 340 | // Import sandbox: external_tool refs must be in vault allowlist (FLOW-V0-SPEC §6 item 3). |
| 341 | if (isImport) { |
| 342 | const vaultPolicy = readVaultExternalAgentPolicy(input.dataDir); |
| 343 | const externalCheck = validateImportExternalTools(steps, vaultPolicy.allowedTools); |
| 344 | if (!externalCheck.ok && vaultPolicy.importPolicy === 'reject') { |
| 345 | return refuse(403, 'FLOW_IMPORT_EXTERNAL_TOOL_DENIED', 'Import declares tools outside allowlist'); |
| 346 | } |
| 347 | const automatableCheck = validateImportAutomatableSteps(steps, input.dataDir); |
| 348 | if (!automatableCheck.ok) { |
| 349 | return refuse(403, 'FLOW_IMPORT_AUTOMATABLE_DENIED', 'Import declares automatable steps where policy forbids'); |
| 350 | } |
| 351 | } |
| 352 | |
| 353 | // Optimistic concurrency precheck (fast fail; approve re-checks authoritatively). |
| 354 | const store = loadFlowStore(input.dataDir); |
| 355 | const vault = store.vaults[input.vaultId]; |
| 356 | const current = vault ? latestStoredFlow(vault, flow.flow_id) : null; |
| 357 | // Only flows in a scope the actor may read are "visible"; others are absent to them. |
| 358 | const currentVisible = |
| 359 | current && resolved.visibleScopes.has(current.flow.scope) ? current : null; |
| 360 | |
| 361 | let proposalBaseStateId; |
| 362 | let proposalBaseVersion = null; |
| 363 | |
| 364 | if (input.kind === 'edit') { |
| 365 | const baseVersion = typeof input.baseVersion === 'string' ? input.baseVersion.trim() : ''; |
| 366 | const baseStateId = typeof input.baseStateId === 'string' ? input.baseStateId.trim() : ''; |
| 367 | if (!baseVersion || !SEMVER_RE.test(baseVersion) || !baseStateId.startsWith(FLOW_STATE_ID_PREFIX)) { |
| 368 | return refuse(400, 'FLOW_DRAFT_INVALID', 'edit requires base_version + base_state_id'); |
| 369 | } |
| 370 | // No existence leak: an unreadable/missing flow is uniformly unknown_flow. |
| 371 | if (!currentVisible) { |
| 372 | return refuse(404, 'unknown_flow', 'unknown_flow'); |
| 373 | } |
| 374 | const canonical = flowDefinitionForClient(currentVisible.flow, currentVisible.steps); |
| 375 | const serverStateId = flowStateId(canonical.flow, canonical.steps); |
| 376 | if (currentVisible.flow.version !== baseVersion || serverStateId !== baseStateId) { |
| 377 | return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow changed since edit was based'); |
| 378 | } |
| 379 | const next = parseSemver(flow.version); |
| 380 | const base = parseSemver(baseVersion); |
| 381 | if (!next || !base || compareSemver(next, base) <= 0) { |
| 382 | return refuse(400, 'FLOW_DRAFT_INVALID', 'flow.version must be greater than base_version'); |
| 383 | } |
| 384 | proposalBaseStateId = baseStateId; |
| 385 | proposalBaseVersion = baseVersion; |
| 386 | } else { |
| 387 | // New (and import-as-new): the flow_id must still be absent in the actor's scope. |
| 388 | if (currentVisible) { |
| 389 | return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow_id already exists in scope'); |
| 390 | } |
| 391 | proposalBaseStateId = absentFlowStateId(); |
| 392 | } |
| 393 | |
| 394 | const autoApprovable = deriveAutoApprovable(steps); |
| 395 | |
| 396 | // Build the mirror-note proposal (review-before-write; no index write here). |
| 397 | const mirrorPath = flow.vault_mirror_path || defaultMirrorPath(flow.flow_id); |
| 398 | const body = JSON.stringify({ flow, steps }, null, 2); |
| 399 | const frontmatter = { |
| 400 | type: 'flow', |
| 401 | flow_id: flow.flow_id, |
| 402 | flow_version: flow.version, |
| 403 | scope: flow.scope, |
| 404 | }; |
| 405 | const externalRef = buildExternalRef(input); |
| 406 | |
| 407 | const proposal = input.createProposal(input.dataDir, { |
| 408 | path: mirrorPath, |
| 409 | body, |
| 410 | frontmatter, |
| 411 | intent, |
| 412 | base_state_id: proposalBaseStateId, |
| 413 | external_ref: externalRef || undefined, |
| 414 | source: FLOW_PROPOSAL_SOURCE, |
| 415 | vault_id: input.vaultId, |
| 416 | proposed_by: typeof input.userId === 'string' && input.userId.trim() ? input.userId.trim() : undefined, |
| 417 | review_queue: FLOW_REVIEW_QUEUE, |
| 418 | flow_meta: { |
| 419 | kind: isImport ? 'import' : input.kind, |
| 420 | base_version: proposalBaseVersion, |
| 421 | base_state_id: proposalBaseStateId, |
| 422 | }, |
| 423 | }); |
| 424 | |
| 425 | return { |
| 426 | ok: true, |
| 427 | payload: { |
| 428 | schema: FLOW_PROPOSAL_SCHEMA, |
| 429 | proposal_id: proposal.proposal_id, |
| 430 | flow_id: flow.flow_id, |
| 431 | base_version: proposalBaseVersion, |
| 432 | base_state_id: input.kind === 'edit' ? proposalBaseStateId : null, |
| 433 | scope: flow.scope, |
| 434 | auto_approvable: autoApprovable, |
| 435 | status: 'proposed', |
| 436 | review_queue: FLOW_REVIEW_QUEUE, |
| 437 | }, |
| 438 | }; |
| 439 | } |
| 440 | |
| 441 | /** |
| 442 | * Build a pointer-only lineage `external_ref` for an import (labels/pointers |
| 443 | * only — never content or secrets). |
| 444 | * |
| 445 | * @param {{ kind: FlowProposeKind, externalRef?: string, sourceVaultHint?: string }} input |
| 446 | * @returns {string} |
| 447 | */ |
| 448 | function buildExternalRef(input) { |
| 449 | if (input.kind !== 'import') return ''; |
| 450 | const parts = []; |
| 451 | if (typeof input.externalRef === 'string' && input.externalRef.trim()) { |
| 452 | parts.push(input.externalRef.trim().slice(0, 256)); |
| 453 | } |
| 454 | if (typeof input.sourceVaultHint === 'string' && input.sourceVaultHint.trim()) { |
| 455 | parts.push(`source_vault_hint=${input.sourceVaultHint.trim().slice(0, 128)}`); |
| 456 | } |
| 457 | return parts.join(' ').slice(0, 512); |
| 458 | } |
| 459 | |
| 460 | /** |
| 461 | * Approve-time **authoritative** concurrency re-check + bundle parse for a Flow |
| 462 | * proposal (the binding check). Run BEFORE the mirror note is written so a |
| 463 | * conflict short-circuits with no partial state. |
| 464 | * |
| 465 | * @param {string} dataDir |
| 466 | * @param {object} proposal - the stored proposal (source === 'flow'). |
| 467 | * @returns {{ ok: true, vaultId: string, flow: object, steps: object[] } | { ok: false, status: number, error: string, code: string }} |
| 468 | */ |
| 469 | export function precheckApprovedFlowProposal(dataDir, proposal) { |
| 470 | let parsed; |
| 471 | try { |
| 472 | parsed = JSON.parse(typeof proposal.body === 'string' ? proposal.body : ''); |
| 473 | } catch { |
| 474 | return refuse(400, 'FLOW_DRAFT_INVALID', 'flow proposal body is not valid JSON'); |
| 475 | } |
| 476 | const validated = validateFlowBundle(parsed); |
| 477 | if (!validated.ok) { |
| 478 | return refuse(400, 'FLOW_DRAFT_INVALID', validated.reason); |
| 479 | } |
| 480 | const { flow, steps } = validated; |
| 481 | const vaultId = typeof proposal.vault_id === 'string' && proposal.vault_id.trim() |
| 482 | ? proposal.vault_id.trim() |
| 483 | : 'default'; |
| 484 | const meta = proposal.flow_meta && typeof proposal.flow_meta === 'object' ? proposal.flow_meta : {}; |
| 485 | const kind = meta.kind === 'edit' ? 'edit' : 'new'; |
| 486 | |
| 487 | const store = loadFlowStore(dataDir); |
| 488 | const vault = store.vaults[vaultId]; |
| 489 | const current = vault ? latestStoredFlow(vault, flow.flow_id) : null; |
| 490 | |
| 491 | if (kind === 'edit') { |
| 492 | if (!current) { |
| 493 | return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow disappeared before approve'); |
| 494 | } |
| 495 | const baseVersion = typeof meta.base_version === 'string' ? meta.base_version : ''; |
| 496 | const baseStateId = typeof meta.base_state_id === 'string' ? meta.base_state_id : ''; |
| 497 | const canonical = flowDefinitionForClient(current.flow, current.steps); |
| 498 | const serverStateId = flowStateId(canonical.flow, canonical.steps); |
| 499 | if (current.flow.version !== baseVersion || serverStateId !== baseStateId) { |
| 500 | return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow changed since edit was based'); |
| 501 | } |
| 502 | const next = parseSemver(flow.version); |
| 503 | const base = parseSemver(baseVersion); |
| 504 | if (!next || !base || compareSemver(next, base) <= 0) { |
| 505 | return refuse(400, 'FLOW_DRAFT_INVALID', 'flow.version must be greater than base_version'); |
| 506 | } |
| 507 | } else if (current) { |
| 508 | return refuse(409, 'FLOW_LINEAGE_CONFLICT', 'flow_id already exists'); |
| 509 | } |
| 510 | |
| 511 | return { ok: true, vaultId, flow, steps }; |
| 512 | } |
| 513 | |
| 514 | /** |
| 515 | * Apply a pre-checked Flow bundle into the index as a new `(flow_id, version)` |
| 516 | * row. Call AFTER the mirror note write succeeds; the bundle has already been |
| 517 | * validated by {@link precheckApprovedFlowProposal} so this cannot fail on shape. |
| 518 | * |
| 519 | * @param {string} dataDir |
| 520 | * @param {string} vaultId |
| 521 | * @param {object} flow |
| 522 | * @param {object[]} steps |
| 523 | * @returns {{ created: boolean, version: string }} |
| 524 | */ |
| 525 | export function applyFlowProposalToIndex(dataDir, vaultId, flow, steps) { |
| 526 | return upsertFlowVersion(dataDir, vaultId, flow, steps); |
| 527 | } |
| 528 | |
| 529 | export { FLOW_ID_RE }; |
File History
1 commit
sha256:8915fe406161f95c1681f9469375e7bae5b28c884f00bedbdef65e4b0cd0738d
docs(flow): commit FLOW-V0-SPEC.md hygiene for 7A-INT merge
Human
13 hours ago