/** * YAML Converter for Workflow Builder * * Bidirectional conversion between WorkflowCanvas (visual representation) * and Pipeline YAML (execution format). */ import * as yaml from 'js-yaml'; import type { Edge } from '@xyflow/react'; import dagre from '@dagrejs/dagre'; import type { WorkflowCanvas, WorkflowNode, WorkflowNodeData, InputNodeData, LlmNodeData, SkillNodeData, HandNodeData, ConditionNodeData, ParallelNodeData, ExportNodeData, PipelineYaml, PipelineStepYaml, ValidationError, ValidationResult, } from './types'; // ============================================================================= // Canvas to YAML Conversion // ============================================================================= /** * Convert a WorkflowCanvas to Pipeline YAML string */ export function canvasToYaml(canvas: WorkflowCanvas): string { const pipeline: PipelineYaml = { apiVersion: 'zclaw/v1', kind: 'Pipeline', metadata: { name: canvas.name, description: canvas.description, tags: canvas.metadata.tags, }, spec: { input: extractInputs(canvas.nodes), steps: nodesToSteps(canvas.nodes, canvas.edges), output: extractOutputs(canvas.nodes), }, }; return yaml.dump(pipeline, { indent: 2, lineWidth: -1, noRefs: true, sortKeys: false, }); } /** * Extract input definitions from input nodes */ function extractInputs(nodes: WorkflowNode[]): Record | undefined { const inputs: Record = {}; for (const node of nodes) { if (node.data.type === 'input') { const data = node.data as InputNodeData; inputs[data.variableName] = data.defaultValue ?? null; } } return Object.keys(inputs).length > 0 ? inputs : undefined; } /** * Extract output mappings from the last nodes or explicit output nodes */ function extractOutputs(nodes: WorkflowNode[]): Record | undefined { const outputs: Record = {}; for (const node of nodes) { if (node.data.type === 'export') { // Export nodes define outputs outputs[`${node.id}_export`] = `\${steps.${node.id}.output}`; } } return Object.keys(outputs).length > 0 ? outputs : undefined; } /** * Convert nodes and edges to pipeline steps */ function nodesToSteps(nodes: WorkflowNode[], edges: Edge[]): PipelineStepYaml[] { // Topological sort to get execution order const sortedNodes = topologicalSort(nodes, edges); return sortedNodes .filter(node => node.data.type !== 'input') // Skip input nodes .map(node => nodeToStep(node)) .filter((step): step is PipelineStepYaml => step !== null); } /** * Convert a single node to a pipeline step */ function nodeToStep(node: WorkflowNode): PipelineStepYaml | null { const data = node.data; const label = data.label as string | undefined; const base: PipelineStepYaml = { id: node.id, name: label, action: {}, }; const nodeType = data.type as string; switch (nodeType) { case 'llm': { const llmData = data as LlmNodeData; base.action = { llm_generate: { template: llmData.template, input: mapExpressionsToObject(llmData.template), model: llmData.model, temperature: llmData.temperature, max_tokens: llmData.maxTokens, json_mode: llmData.jsonMode, }, }; break; } case 'skill': { const skillData = data as SkillNodeData; base.action = { skill: { skill_id: skillData.skillId, input: skillData.inputMappings, }, }; break; } case 'hand': { const handData = data as HandNodeData; base.action = { hand: { hand_id: handData.handId, hand_action: handData.action, params: handData.params, }, }; break; } case 'orchestration': { const orchData = data as { graphId?: string; graph?: Record; inputMappings?: Record }; base.action = { skill_orchestration: { graph_id: orchData.graphId, graph: orchData.graph, input: orchData.inputMappings, }, }; break; } case 'condition': { const condData = data as ConditionNodeData; base.action = { condition: { condition: condData.condition, branches: condData.branches.map((b: { when: string }) => ({ when: b.when, then: { /* Will be filled by connected nodes */ }, })), }, }; break; } case 'parallel': { const parData = data as ParallelNodeData; base.action = { parallel: { each: parData.each, step: { /* Will be filled by child nodes */ }, max_workers: parData.maxWorkers, }, }; break; } case 'loop': { const loopData = data as { each: string; itemVar: string; indexVar: string }; base.action = { loop: { each: loopData.each, item_var: loopData.itemVar, index_var: loopData.indexVar, step: { /* Will be filled by child nodes */ }, }, }; break; } case 'export': { const exportData = data as ExportNodeData; base.action = { file_export: { formats: exportData.formats, input: `\${steps.${node.id}.input}`, output_dir: exportData.outputDir, }, }; break; } case 'http': { const httpData = data as { url: string; method: string; headers: Record; body?: string }; base.action = { http_request: { url: httpData.url, method: httpData.method, headers: httpData.headers, body: httpData.body, }, }; break; } case 'setVar': { const varData = data as { variableName: string; value: string }; base.action = { set_var: { name: varData.variableName, value: varData.value, }, }; break; } case 'delay': { const delayData = data as { ms: number }; base.action = { delay: { ms: delayData.ms, }, }; break; } case 'input': // Input nodes don't become steps return null; default: console.warn(`Unknown node type: ${nodeType}`); return null; } return base; } /** * Topological sort of nodes based on edges */ function topologicalSort(nodes: WorkflowNode[], edges: Edge[]): WorkflowNode[] { const nodeMap = new Map(nodes.map(n => [n.id, n])); const inDegree = new Map(); const adjacency = new Map(); // Initialize for (const node of nodes) { inDegree.set(node.id, 0); adjacency.set(node.id, []); } // Build graph for (const edge of edges) { const current = adjacency.get(edge.source) || []; current.push(edge.target); adjacency.set(edge.source, current); inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1); } // Kahn's algorithm const queue: string[] = []; const result: WorkflowNode[] = []; for (const [nodeId, degree] of inDegree) { if (degree === 0) { queue.push(nodeId); } } while (queue.length > 0) { const nodeId = queue.shift()!; const node = nodeMap.get(nodeId); if (node) { result.push(node); } const neighbors = adjacency.get(nodeId) || []; for (const neighbor of neighbors) { const newDegree = (inDegree.get(neighbor) || 0) - 1; inDegree.set(neighbor, newDegree); if (newDegree === 0) { queue.push(neighbor); } } } return result; } /** * Extract variable references from a template string */ function mapExpressionsToObject(template: string): Record { const regex = /\$\{([^}]+)\}/g; const matches = template.match(regex) || []; const result: Record = {}; for (const match of matches) { const expr = match.slice(2, -1); // Remove ${ and } const parts = expr.split('.'); if (parts.length >= 2) { result[parts[parts.length - 1]] = match; } } return result; } // ============================================================================= // YAML to Canvas Conversion // ============================================================================= /** * Parse Pipeline YAML string to WorkflowCanvas */ export function yamlToCanvas(yamlString: string): WorkflowCanvas { const pipeline = yaml.load(yamlString) as PipelineYaml; const nodes: WorkflowNode[] = []; const edges: Edge[] = []; // Create input nodes from spec.input if (pipeline.spec.input) { let y = 50; for (const [varName, defaultValue] of Object.entries(pipeline.spec.input)) { nodes.push({ id: `input_${varName}`, type: 'input', position: { x: 50, y }, data: { type: 'input', label: varName, variableName: varName, defaultValue, }, }); y += 100; } } // Convert steps to nodes if (pipeline.spec.steps) { const x = 300; let y = 50; for (const step of pipeline.spec.steps) { const node = stepToNode(step, x, y); if (node) { nodes.push(node); y += 150; } } } // Auto-layout with dagre const layoutedNodes = applyDagreLayout(nodes, edges); return { id: `workflow_${Date.now()}`, name: pipeline.metadata?.name || 'Imported Workflow', description: pipeline.metadata?.description, category: 'imported', nodes: layoutedNodes, edges, viewport: { x: 0, y: 0, zoom: 1 }, metadata: { createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), tags: pipeline.metadata?.tags || [], version: '1.0.0', }, }; } /** * Convert a pipeline step to a workflow node */ function stepToNode(step: PipelineStepYaml, x: number, y: number): WorkflowNode | null { const action = step.action; const actionType = Object.keys(action)[0]; const actionData = action[actionType]; const baseData = { label: step.name || step.id, }; switch (actionType) { case 'llm_generate': return { id: step.id, type: 'llm', position: { x, y }, data: { type: 'llm', ...baseData, template: (actionData as { template?: string }).template || '', isTemplateFile: false, model: (actionData as { model?: string }).model, temperature: (actionData as { temperature?: number }).temperature, maxTokens: (actionData as { max_tokens?: number }).max_tokens, jsonMode: (actionData as { json_mode?: boolean }).json_mode || false, } as WorkflowNodeData, }; case 'skill': return { id: step.id, type: 'skill', position: { x, y }, data: { type: 'skill', ...baseData, skillId: (actionData as { skill_id?: string }).skill_id || '', inputMappings: (actionData as { input?: Record }).input || {}, } as WorkflowNodeData, }; case 'hand': return { id: step.id, type: 'hand', position: { x, y }, data: { type: 'hand', ...baseData, handId: (actionData as { hand_id?: string }).hand_id || '', action: (actionData as { hand_action?: string }).hand_action || '', params: (actionData as { params?: Record }).params || {}, } as WorkflowNodeData, }; case 'skill_orchestration': return { id: step.id, type: 'orchestration', position: { x, y }, data: { type: 'orchestration', ...baseData, graphId: (actionData as { graph_id?: string }).graph_id, graph: (actionData as { graph?: Record }).graph, inputMappings: (actionData as { input?: Record }).input || {}, } as WorkflowNodeData, }; case 'condition': return { id: step.id, type: 'condition', position: { x, y }, data: { type: 'condition', ...baseData, condition: (actionData as { condition?: string }).condition || '', branches: ((actionData as { branches?: Array<{ when: string }> }).branches || []).map(b => ({ when: b.when, label: b.when.slice(0, 20), })), hasDefault: true, } as WorkflowNodeData, }; case 'parallel': return { id: step.id, type: 'parallel', position: { x, y }, data: { type: 'parallel', ...baseData, each: (actionData as { each?: string }).each || '', maxWorkers: (actionData as { max_workers?: number }).max_workers || 4, } as WorkflowNodeData, }; case 'file_export': return { id: step.id, type: 'export', position: { x, y }, data: { type: 'export', ...baseData, formats: (actionData as { formats?: string[] }).formats || [], outputDir: (actionData as { output_dir?: string }).output_dir, } as WorkflowNodeData, }; case 'http_request': return { id: step.id, type: 'http', position: { x, y }, data: { type: 'http', ...baseData, url: (actionData as { url?: string }).url || '', method: ((actionData as { method?: string }).method || 'GET') as 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH', headers: (actionData as { headers?: Record }).headers || {}, body: (actionData as { body?: string }).body, } as WorkflowNodeData, }; case 'set_var': return { id: step.id, type: 'setVar', position: { x, y }, data: { type: 'setVar', ...baseData, variableName: (actionData as { name?: string }).name || '', value: (actionData as { value?: string }).value || '', } as WorkflowNodeData, }; case 'delay': return { id: step.id, type: 'delay', position: { x, y }, data: { type: 'delay', ...baseData, ms: (actionData as { ms?: number }).ms || 0, } as WorkflowNodeData, }; default: console.warn(`Unknown action type: ${actionType}`); return null; } } // ============================================================================= // Layout Utilities // ============================================================================= /** * Apply dagre layout to nodes */ export function applyDagreLayout(nodes: WorkflowNode[], edges: Edge[]): WorkflowNode[] { const dagreGraph = new dagre.graphlib.Graph(); dagreGraph.setDefaultEdgeLabel(() => ({})); dagreGraph.setGraph({ rankdir: 'LR', nodesep: 100, ranksep: 150, marginx: 50, marginy: 50, }); // Add nodes to dagre for (const node of nodes) { dagreGraph.setNode(node.id, { width: 250, height: 100, }); } // Add edges to dagre for (const edge of edges) { dagreGraph.setEdge(edge.source, edge.target); } // Apply layout dagre.layout(dagreGraph); // Update node positions return nodes.map(node => { const dagreNode = dagreGraph.node(node.id); if (dagreNode) { return { ...node, position: { x: dagreNode.x - dagreNode.width / 2, y: dagreNode.y - dagreNode.height / 2, }, }; } return node; }); } // ============================================================================= // Validation // ============================================================================= /** * Validate a workflow canvas */ export function validateCanvas(canvas: WorkflowCanvas): ValidationResult { const errors: ValidationError[] = []; const warnings: ValidationError[] = []; // Check for empty canvas if (canvas.nodes.length === 0) { errors.push({ nodeId: 'canvas', message: 'Workflow is empty', severity: 'error', }); return { valid: false, errors, warnings }; } // Check for input nodes const hasInput = canvas.nodes.some(n => n.data.type === 'input'); if (!hasInput) { warnings.push({ nodeId: 'canvas', message: 'No input nodes defined', severity: 'warning', }); } // Check for disconnected nodes const connectedNodeIds = new Set(); for (const edge of canvas.edges) { connectedNodeIds.add(edge.source); connectedNodeIds.add(edge.target); } for (const node of canvas.nodes) { if (canvas.nodes.length > 1 && !connectedNodeIds.has(node.id) && node.data.type !== 'input') { warnings.push({ nodeId: node.id, message: `Node "${node.data.label}" is not connected`, severity: 'warning', }); } } // Validate individual nodes for (const node of canvas.nodes) { const nodeErrors = validateNode(node); errors.push(...nodeErrors); } // Check for cycles (basic check) if (hasCycle(canvas.nodes, canvas.edges)) { errors.push({ nodeId: 'canvas', message: 'Workflow contains a cycle', severity: 'error', }); } return { valid: errors.length === 0, errors, warnings, }; } /** * Validate a single node */ function validateNode(node: WorkflowNode): ValidationError[] { const errors: ValidationError[] = []; const data = node.data; switch (data.type) { case 'llm': if (!data.template) { errors.push({ nodeId: node.id, field: 'template', message: 'Template is required', severity: 'error', }); } break; case 'skill': if (!data.skillId) { errors.push({ nodeId: node.id, field: 'skillId', message: 'Skill ID is required', severity: 'error', }); } break; case 'hand': if (!data.handId) { errors.push({ nodeId: node.id, field: 'handId', message: 'Hand ID is required', severity: 'error', }); } if (!data.action) { errors.push({ nodeId: node.id, field: 'action', message: 'Action is required', severity: 'error', }); } break; case 'http': if (!data.url) { errors.push({ nodeId: node.id, field: 'url', message: 'URL is required', severity: 'error', }); } break; case 'input': if (!data.variableName) { errors.push({ nodeId: node.id, field: 'variableName', message: 'Variable name is required', severity: 'error', }); } break; } return errors; } /** * Check if the graph has a cycle */ function hasCycle(nodes: WorkflowNode[], edges: Edge[]): boolean { const adjacency = new Map(); const visited = new Set(); const recStack = new Set(); // Build adjacency list for (const node of nodes) { adjacency.set(node.id, []); } for (const edge of edges) { const neighbors = adjacency.get(edge.source) || []; neighbors.push(edge.target); adjacency.set(edge.source, neighbors); } // DFS cycle detection function dfs(nodeId: string): boolean { visited.add(nodeId); recStack.add(nodeId); const neighbors = adjacency.get(nodeId) || []; for (const neighbor of neighbors) { if (!visited.has(neighbor)) { if (dfs(neighbor)) return true; } else if (recStack.has(neighbor)) { return true; } } recStack.delete(nodeId); return false; } for (const node of nodes) { if (!visited.has(node.id)) { if (dfs(node.id)) return true; } } return false; }