//! Workflow engine — multi-step agent pipeline execution. //! //! A workflow defines a sequence of steps where each step routes //! a task to a specific agent. Steps can: //! - Pass their output as input to the next step //! - Run in sequence (pipeline) or in parallel (fan-out) //! - Conditionally skip based on previous output //! - Loop until a condition is met //! - Store outputs in named variables for later reference //! //! Workflows are defined as Rust structs or loaded from JSON. use chrono::{DateTime, Utc}; use openfang_types::agent::AgentId; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use tracing::{debug, info, warn}; use uuid::Uuid; /// Unique identifier for a workflow definition. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct WorkflowId(pub Uuid); impl WorkflowId { pub fn new() -> Self { Self(Uuid::new_v4()) } } impl Default for WorkflowId { fn default() -> Self { Self::new() } } impl std::fmt::Display for WorkflowId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } } /// Unique identifier for a running workflow instance. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct WorkflowRunId(pub Uuid); impl WorkflowRunId { pub fn new() -> Self { Self(Uuid::new_v4()) } } impl Default for WorkflowRunId { fn default() -> Self { Self::new() } } impl std::fmt::Display for WorkflowRunId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } } /// A workflow definition — a named sequence of steps. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Workflow { /// Unique identifier. pub id: WorkflowId, /// Human-readable name. pub name: String, /// Description of what this workflow does. pub description: String, /// The steps in execution order. pub steps: Vec, /// Created at. pub created_at: DateTime, } /// A single step in a workflow. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowStep { /// Step name for logging/display. pub name: String, /// Which agent to route this step to. pub agent: StepAgent, /// The prompt template. Use `{{input}}` for previous output, `{{var_name}}` for variables. pub prompt_template: String, /// Execution mode for this step. pub mode: StepMode, /// Maximum time for this step in seconds (default: 120). #[serde(default = "default_timeout")] pub timeout_secs: u64, /// Error handling mode for this step (default: Fail). #[serde(default)] pub error_mode: ErrorMode, /// Optional variable name to store this step's output in. #[serde(default)] pub output_var: Option, } fn default_timeout() -> u64 { 120 } /// How to identify the agent for a step. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum StepAgent { /// Reference an agent by UUID. ById { id: String }, /// Reference an agent by name (first match). ByName { name: String }, } /// Execution mode for a workflow step. #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum StepMode { /// Execute sequentially — this step runs after the previous completes. #[default] Sequential, /// Fan-out — this step runs in parallel with subsequent FanOut steps until Collect. FanOut, /// Collect results from all preceding fan-out steps. Collect, /// Conditional — skip this step if previous output doesn't contain `condition` (case-insensitive). Conditional { condition: String }, /// Loop — repeat this step until output contains `until` or `max_iterations` reached. Loop { max_iterations: u32, until: String }, } /// Error handling mode for a workflow step. #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum ErrorMode { /// Abort the workflow on error (default). #[default] Fail, /// Skip this step on error and continue. Skip, /// Retry the step up to N times before failing. Retry { max_retries: u32 }, } /// The current state of a workflow run. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum WorkflowRunState { Pending, Running, Completed, Failed, } /// A running workflow instance. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowRun { /// Run instance ID. pub id: WorkflowRunId, /// The workflow being run. pub workflow_id: WorkflowId, /// Workflow name (copied for quick access). pub workflow_name: String, /// Initial input to the workflow. pub input: String, /// Current state. pub state: WorkflowRunState, /// Results from each completed step. pub step_results: Vec, /// Final output (set when workflow completes). pub output: Option, /// Error message if failed. pub error: Option, /// Started at. pub started_at: DateTime, /// Completed at. pub completed_at: Option>, } /// Result from a single workflow step. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StepResult { /// Step name. pub step_name: String, /// Agent that executed this step. pub agent_id: String, /// Agent name. pub agent_name: String, /// Output from this step. pub output: String, /// Token usage. pub input_tokens: u64, pub output_tokens: u64, /// Duration in milliseconds. pub duration_ms: u64, } /// The workflow engine — manages definitions and executes pipeline runs. pub struct WorkflowEngine { /// Registered workflow definitions. workflows: Arc>>, /// Active and completed workflow runs. runs: Arc>>, } impl WorkflowEngine { /// Create a new workflow engine. pub fn new() -> Self { Self { workflows: Arc::new(RwLock::new(HashMap::new())), runs: Arc::new(RwLock::new(HashMap::new())), } } /// Register a new workflow definition. pub async fn register(&self, workflow: Workflow) -> WorkflowId { let id = workflow.id; self.workflows.write().await.insert(id, workflow); info!(workflow_id = %id, "Workflow registered"); id } /// List all registered workflows. pub async fn list_workflows(&self) -> Vec { self.workflows.read().await.values().cloned().collect() } /// Get a specific workflow by ID. pub async fn get_workflow(&self, id: WorkflowId) -> Option { self.workflows.read().await.get(&id).cloned() } /// Remove a workflow definition. pub async fn remove_workflow(&self, id: WorkflowId) -> bool { self.workflows.write().await.remove(&id).is_some() } /// Maximum number of retained workflow runs. Oldest completed/failed /// runs are evicted when this limit is exceeded. const MAX_RETAINED_RUNS: usize = 200; /// Start a workflow run. Returns the run ID and a handle to check progress. /// /// The actual execution is driven externally by calling `execute_run()` /// with the kernel handle, since the workflow engine doesn't own the kernel. pub async fn create_run( &self, workflow_id: WorkflowId, input: String, ) -> Option { let workflow = self.workflows.read().await.get(&workflow_id)?.clone(); let run_id = WorkflowRunId::new(); let run = WorkflowRun { id: run_id, workflow_id, workflow_name: workflow.name, input, state: WorkflowRunState::Pending, step_results: Vec::new(), output: None, error: None, started_at: Utc::now(), completed_at: None, }; let mut runs = self.runs.write().await; runs.insert(run_id, run); // Evict oldest completed/failed runs when we exceed the cap if runs.len() > Self::MAX_RETAINED_RUNS { let mut evictable: Vec<(WorkflowRunId, DateTime)> = runs .iter() .filter(|(_, r)| { matches!( r.state, WorkflowRunState::Completed | WorkflowRunState::Failed ) }) .map(|(id, r)| (*id, r.started_at)) .collect(); // Sort oldest first evictable.sort_by_key(|(_, t)| *t); let to_remove = runs.len() - Self::MAX_RETAINED_RUNS; for (id, _) in evictable.into_iter().take(to_remove) { runs.remove(&id); debug!(run_id = %id, "Evicted old workflow run"); } } Some(run_id) } /// Get the current state of a workflow run. pub async fn get_run(&self, run_id: WorkflowRunId) -> Option { self.runs.read().await.get(&run_id).cloned() } /// List all workflow runs (optionally filtered by state). pub async fn list_runs(&self, state_filter: Option<&str>) -> Vec { self.runs .read() .await .values() .filter(|r| { state_filter .map(|f| match f { "pending" => matches!(r.state, WorkflowRunState::Pending), "running" => matches!(r.state, WorkflowRunState::Running), "completed" => matches!(r.state, WorkflowRunState::Completed), "failed" => matches!(r.state, WorkflowRunState::Failed), _ => true, }) .unwrap_or(true) }) .cloned() .collect() } /// Replace `{{var_name}}` references in a template with stored variable values. fn expand_variables(template: &str, input: &str, vars: &HashMap) -> String { let mut result = template.replace("{{input}}", input); for (key, value) in vars { result = result.replace(&format!("{{{{{key}}}}}"), value); } result } /// Execute a single step with error mode handling. Returns (output, input_tokens, output_tokens). async fn execute_step_with_error_mode( step: &WorkflowStep, agent_id: AgentId, prompt: String, send_message: &F, ) -> Result, String> where F: Fn(AgentId, String) -> Fut, Fut: std::future::Future>, { let timeout_dur = std::time::Duration::from_secs(step.timeout_secs); match &step.error_mode { ErrorMode::Fail => { let result = tokio::time::timeout(timeout_dur, send_message(agent_id, prompt)) .await .map_err(|_| { format!( "Step '{}' timed out after {}s", step.name, step.timeout_secs ) })? .map_err(|e| format!("Step '{}' failed: {}", step.name, e))?; Ok(Some(result)) } ErrorMode::Skip => { match tokio::time::timeout(timeout_dur, send_message(agent_id, prompt)).await { Ok(Ok(result)) => Ok(Some(result)), Ok(Err(e)) => { warn!("Step '{}' failed (skipping): {e}", step.name); Ok(None) } Err(_) => { warn!( "Step '{}' timed out (skipping) after {}s", step.name, step.timeout_secs ); Ok(None) } } } ErrorMode::Retry { max_retries } => { let mut last_err = String::new(); for attempt in 0..=*max_retries { match tokio::time::timeout(timeout_dur, send_message(agent_id, prompt.clone())) .await { Ok(Ok(result)) => return Ok(Some(result)), Ok(Err(e)) => { last_err = e.to_string(); if attempt < *max_retries { warn!( "Step '{}' attempt {} failed: {e}, retrying", step.name, attempt + 1 ); } } Err(_) => { last_err = format!("timed out after {}s", step.timeout_secs); if attempt < *max_retries { warn!( "Step '{}' attempt {} timed out, retrying", step.name, attempt + 1 ); } } } } Err(format!( "Step '{}' failed after {} retries: {last_err}", step.name, max_retries )) } } } /// Execute a workflow run step-by-step. /// /// This method takes a closure that sends messages to agents, /// so the workflow engine remains decoupled from the kernel. pub async fn execute_run( &self, run_id: WorkflowRunId, agent_resolver: impl Fn(&StepAgent) -> Option<(AgentId, String)>, send_message: F, ) -> Result where F: Fn(AgentId, String) -> Fut, Fut: std::future::Future>, { // Get the run and workflow let (workflow, input) = { let mut runs = self.runs.write().await; let run = runs.get_mut(&run_id).ok_or("Workflow run not found")?; run.state = WorkflowRunState::Running; let workflow = self .workflows .read() .await .get(&run.workflow_id) .ok_or("Workflow definition not found")? .clone(); (workflow, run.input.clone()) }; info!( run_id = %run_id, workflow = %workflow.name, steps = workflow.steps.len(), "Starting workflow execution" ); let mut current_input = input; let mut all_outputs: Vec = Vec::new(); let mut variables: HashMap = HashMap::new(); let mut i = 0; while i < workflow.steps.len() { let step = &workflow.steps[i]; debug!( step = i + 1, name = %step.name, "Executing workflow step" ); match &step.mode { StepMode::Sequential => { let (agent_id, agent_name) = agent_resolver(&step.agent) .ok_or_else(|| format!("Agent not found for step '{}'", step.name))?; let prompt = Self::expand_variables(&step.prompt_template, ¤t_input, &variables); let start = std::time::Instant::now(); let result = Self::execute_step_with_error_mode(step, agent_id, prompt, &send_message) .await; let duration_ms = start.elapsed().as_millis() as u64; match result { Ok(Some((output, input_tokens, output_tokens))) => { let step_result = StepResult { step_name: step.name.clone(), agent_id: agent_id.to_string(), agent_name, output: output.clone(), input_tokens, output_tokens, duration_ms, }; if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.step_results.push(step_result); } if let Some(ref var) = step.output_var { variables.insert(var.clone(), output.clone()); } all_outputs.push(output.clone()); current_input = output; info!(step = i + 1, name = %step.name, duration_ms, "Step completed"); } Ok(None) => { // Step was skipped (ErrorMode::Skip) info!(step = i + 1, name = %step.name, "Step skipped"); } Err(e) => { if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.state = WorkflowRunState::Failed; r.error = Some(e.clone()); r.completed_at = Some(Utc::now()); } return Err(e); } } } StepMode::FanOut => { // Collect consecutive FanOut steps and run them in parallel let mut fan_out_steps = vec![(i, step)]; let mut j = i + 1; while j < workflow.steps.len() { if matches!(workflow.steps[j].mode, StepMode::FanOut) { fan_out_steps.push((j, &workflow.steps[j])); j += 1; } else { break; } } // Build all futures let mut futures = Vec::new(); let mut step_infos = Vec::new(); for (idx, fan_step) in &fan_out_steps { let (agent_id, agent_name) = agent_resolver(&fan_step.agent).ok_or_else(|| { format!("Agent not found for step '{}'", fan_step.name) })?; let prompt = Self::expand_variables( &fan_step.prompt_template, ¤t_input, &variables, ); let timeout_dur = std::time::Duration::from_secs(fan_step.timeout_secs); step_infos.push((*idx, fan_step.name.clone(), agent_id, agent_name)); futures.push(tokio::time::timeout( timeout_dur, send_message(agent_id, prompt), )); } let start = std::time::Instant::now(); let results = futures::future::join_all(futures).await; let duration_ms = start.elapsed().as_millis() as u64; for (k, result) in results.into_iter().enumerate() { let (_, ref step_name, agent_id, ref agent_name) = step_infos[k]; let fan_step = fan_out_steps[k].1; match result { Ok(Ok((output, input_tokens, output_tokens))) => { let step_result = StepResult { step_name: step_name.clone(), agent_id: agent_id.to_string(), agent_name: agent_name.clone(), output: output.clone(), input_tokens, output_tokens, duration_ms, }; if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.step_results.push(step_result); } if let Some(ref var) = fan_step.output_var { variables.insert(var.clone(), output.clone()); } all_outputs.push(output.clone()); current_input = output; } Ok(Err(e)) => { let error_msg = format!("FanOut step '{}' failed: {}", step_name, e); warn!(%error_msg); if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.state = WorkflowRunState::Failed; r.error = Some(error_msg.clone()); r.completed_at = Some(Utc::now()); } return Err(error_msg); } Err(_) => { let error_msg = format!( "FanOut step '{}' timed out after {}s", step_name, fan_step.timeout_secs ); warn!(%error_msg); if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.state = WorkflowRunState::Failed; r.error = Some(error_msg.clone()); r.completed_at = Some(Utc::now()); } return Err(error_msg); } } } info!( count = fan_out_steps.len(), duration_ms, "FanOut steps completed" ); // Skip past the fan-out steps we just processed i = j; continue; } StepMode::Collect => { current_input = all_outputs.join("\n\n---\n\n"); all_outputs.clear(); all_outputs.push(current_input.clone()); if let Some(ref var) = step.output_var { variables.insert(var.clone(), current_input.clone()); } } StepMode::Conditional { condition } => { let prev_lower = current_input.to_lowercase(); let cond_lower = condition.to_lowercase(); if !prev_lower.contains(&cond_lower) { info!( step = i + 1, name = %step.name, condition, "Conditional step skipped (condition not met)" ); i += 1; continue; } // Condition met — execute like sequential let (agent_id, agent_name) = agent_resolver(&step.agent) .ok_or_else(|| format!("Agent not found for step '{}'", step.name))?; let prompt = Self::expand_variables(&step.prompt_template, ¤t_input, &variables); let start = std::time::Instant::now(); let result = Self::execute_step_with_error_mode(step, agent_id, prompt, &send_message) .await; let duration_ms = start.elapsed().as_millis() as u64; match result { Ok(Some((output, input_tokens, output_tokens))) => { let step_result = StepResult { step_name: step.name.clone(), agent_id: agent_id.to_string(), agent_name, output: output.clone(), input_tokens, output_tokens, duration_ms, }; if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.step_results.push(step_result); } if let Some(ref var) = step.output_var { variables.insert(var.clone(), output.clone()); } all_outputs.push(output.clone()); current_input = output; } Ok(None) => {} Err(e) => { if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.state = WorkflowRunState::Failed; r.error = Some(e.clone()); r.completed_at = Some(Utc::now()); } return Err(e); } } } StepMode::Loop { max_iterations, until, } => { let (agent_id, agent_name) = agent_resolver(&step.agent) .ok_or_else(|| format!("Agent not found for step '{}'", step.name))?; let until_lower = until.to_lowercase(); for loop_iter in 0..*max_iterations { let prompt = Self::expand_variables( &step.prompt_template, ¤t_input, &variables, ); let start = std::time::Instant::now(); let result = Self::execute_step_with_error_mode( step, agent_id, prompt, &send_message, ) .await; let duration_ms = start.elapsed().as_millis() as u64; match result { Ok(Some((output, input_tokens, output_tokens))) => { let step_result = StepResult { step_name: format!("{} (iter {})", step.name, loop_iter + 1), agent_id: agent_id.to_string(), agent_name: agent_name.clone(), output: output.clone(), input_tokens, output_tokens, duration_ms, }; if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.step_results.push(step_result); } current_input = output.clone(); if output.to_lowercase().contains(&until_lower) { info!( step = i + 1, name = %step.name, iterations = loop_iter + 1, "Loop terminated (until condition met)" ); break; } if loop_iter + 1 == *max_iterations { info!( step = i + 1, name = %step.name, "Loop terminated (max iterations reached)" ); } } Ok(None) => break, Err(e) => { if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.state = WorkflowRunState::Failed; r.error = Some(e.clone()); r.completed_at = Some(Utc::now()); } return Err(e); } } } if let Some(ref var) = step.output_var { variables.insert(var.clone(), current_input.clone()); } all_outputs.push(current_input.clone()); } } i += 1; } // Mark workflow as completed let final_output = current_input.clone(); if let Some(r) = self.runs.write().await.get_mut(&run_id) { r.state = WorkflowRunState::Completed; r.output = Some(final_output.clone()); r.completed_at = Some(Utc::now()); } info!(run_id = %run_id, "Workflow completed successfully"); Ok(final_output) } } impl Default for WorkflowEngine { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; fn test_workflow() -> Workflow { Workflow { id: WorkflowId::new(), name: "test-pipeline".to_string(), description: "A test pipeline".to_string(), steps: vec![ WorkflowStep { name: "analyze".to_string(), agent: StepAgent::ByName { name: "analyst".to_string(), }, prompt_template: "Analyze this: {{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 30, error_mode: ErrorMode::Fail, output_var: None, }, WorkflowStep { name: "summarize".to_string(), agent: StepAgent::ByName { name: "writer".to_string(), }, prompt_template: "Summarize this analysis: {{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 30, error_mode: ErrorMode::Fail, output_var: None, }, ], created_at: Utc::now(), } } fn mock_resolver(agent: &StepAgent) -> Option<(AgentId, String)> { let _ = agent; Some((AgentId::new(), "mock-agent".to_string())) } #[tokio::test] async fn test_register_workflow() { let engine = WorkflowEngine::new(); let wf = test_workflow(); let id = engine.register(wf.clone()).await; assert_eq!(id, wf.id); let retrieved = engine.get_workflow(id).await; assert!(retrieved.is_some()); assert_eq!(retrieved.unwrap().name, "test-pipeline"); } #[tokio::test] async fn test_create_run() { let engine = WorkflowEngine::new(); let wf = test_workflow(); let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "test input".to_string()).await; assert!(run_id.is_some()); let run = engine.get_run(run_id.unwrap()).await.unwrap(); assert_eq!(run.input, "test input"); assert!(matches!(run.state, WorkflowRunState::Pending)); } #[tokio::test] async fn test_list_workflows() { let engine = WorkflowEngine::new(); let wf = test_workflow(); engine.register(wf).await; let list = engine.list_workflows().await; assert_eq!(list.len(), 1); } #[tokio::test] async fn test_remove_workflow() { let engine = WorkflowEngine::new(); let wf = test_workflow(); let id = engine.register(wf).await; assert!(engine.remove_workflow(id).await); assert!(engine.get_workflow(id).await.is_none()); } #[tokio::test] async fn test_execute_pipeline() { let engine = WorkflowEngine::new(); let wf = test_workflow(); let wf_id = engine.register(wf).await; let run_id = engine .create_run(wf_id, "raw data".to_string()) .await .unwrap(); let sender = |_id: AgentId, msg: String| async move { Ok((format!("Processed: {msg}"), 100u64, 50u64)) }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); let output = result.unwrap(); assert!(output.contains("Processed:")); let run = engine.get_run(run_id).await.unwrap(); assert!(matches!(run.state, WorkflowRunState::Completed)); assert_eq!(run.step_results.len(), 2); assert!(run.output.is_some()); } #[tokio::test] async fn test_conditional_skip() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "conditional-test".to_string(), description: "".to_string(), steps: vec![ WorkflowStep { name: "first".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, WorkflowStep { name: "only-if-error".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "Fix: {{input}}".to_string(), mode: StepMode::Conditional { condition: "ERROR".to_string(), }, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, ], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine .create_run(wf_id, "all good".to_string()) .await .unwrap(); let sender = |_id: AgentId, msg: String| async move { Ok((format!("OK: {msg}"), 10u64, 5u64)) }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); let run = engine.get_run(run_id).await.unwrap(); // Only 1 step executed (conditional was skipped) assert_eq!(run.step_results.len(), 1); } #[tokio::test] async fn test_conditional_executes() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "conditional-test".to_string(), description: "".to_string(), steps: vec![ WorkflowStep { name: "first".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, WorkflowStep { name: "only-if-error".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "Fix: {{input}}".to_string(), mode: StepMode::Conditional { condition: "ERROR".to_string(), }, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, ], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "data".to_string()).await.unwrap(); // This sender returns output containing "ERROR" let sender = |_id: AgentId, _msg: String| async move { Ok(("Found an ERROR in the data".to_string(), 10u64, 5u64)) }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); let run = engine.get_run(run_id).await.unwrap(); // Both steps executed assert_eq!(run.step_results.len(), 2); } #[tokio::test] async fn test_loop_until_condition() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "loop-test".to_string(), description: "".to_string(), steps: vec![WorkflowStep { name: "refine".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "Refine: {{input}}".to_string(), mode: StepMode::Loop { max_iterations: 5, until: "DONE".to_string(), }, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "draft".to_string()).await.unwrap(); let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0)); let cc = call_count.clone(); let sender = move |_id: AgentId, _msg: String| { let cc = cc.clone(); async move { let n = cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst); if n >= 2 { Ok(("Result: DONE".to_string(), 10u64, 5u64)) } else { Ok(("Still working...".to_string(), 10u64, 5u64)) } } }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); assert!(result.unwrap().contains("DONE")); assert_eq!(call_count.load(std::sync::atomic::Ordering::SeqCst), 3); } #[tokio::test] async fn test_loop_max_iterations() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "loop-max-test".to_string(), description: "".to_string(), steps: vec![WorkflowStep { name: "refine".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Loop { max_iterations: 3, until: "NEVER_MATCH".to_string(), }, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "data".to_string()).await.unwrap(); let sender = |_id: AgentId, _msg: String| async move { Ok(("iteration output".to_string(), 10u64, 5u64)) }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); let run = engine.get_run(run_id).await.unwrap(); assert_eq!(run.step_results.len(), 3); // max_iterations } #[tokio::test] async fn test_error_mode_skip() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "skip-test".to_string(), description: "".to_string(), steps: vec![ WorkflowStep { name: "will-fail".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Skip, output_var: None, }, WorkflowStep { name: "succeeds".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, ], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "data".to_string()).await.unwrap(); let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0)); let cc = call_count.clone(); let sender = move |_id: AgentId, _msg: String| { let cc = cc.clone(); async move { let n = cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst); if n == 0 { Err("simulated error".to_string()) } else { Ok(("success".to_string(), 10u64, 5u64)) } } }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); let run = engine.get_run(run_id).await.unwrap(); // Only 1 step result (the first was skipped due to error) assert_eq!(run.step_results.len(), 1); assert!(matches!(run.state, WorkflowRunState::Completed)); } #[tokio::test] async fn test_error_mode_retry() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "retry-test".to_string(), description: "".to_string(), steps: vec![WorkflowStep { name: "flaky".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Retry { max_retries: 2 }, output_var: None, }], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "data".to_string()).await.unwrap(); let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0)); let cc = call_count.clone(); let sender = move |_id: AgentId, _msg: String| { let cc = cc.clone(); async move { let n = cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst); if n < 2 { Err("transient error".to_string()) } else { Ok(("finally worked".to_string(), 10u64, 5u64)) } } }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); assert_eq!(result.unwrap(), "finally worked"); assert_eq!(call_count.load(std::sync::atomic::Ordering::SeqCst), 3); } #[tokio::test] async fn test_output_variables() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "vars-test".to_string(), description: "".to_string(), steps: vec![ WorkflowStep { name: "produce".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: Some("first_result".to_string()), }, WorkflowStep { name: "transform".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "{{input}}".to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: Some("second_result".to_string()), }, WorkflowStep { name: "combine".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "First: {{first_result}} | Second: {{second_result}}" .to_string(), mode: StepMode::Sequential, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, ], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "start".to_string()).await.unwrap(); let call_count = Arc::new(std::sync::atomic::AtomicU32::new(0)); let cc = call_count.clone(); let sender = move |_id: AgentId, msg: String| { let cc = cc.clone(); async move { let n = cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst); match n { 0 => Ok(("alpha".to_string(), 10u64, 5u64)), 1 => Ok(("beta".to_string(), 10u64, 5u64)), _ => Ok((format!("Combined: {msg}"), 10u64, 5u64)), } } }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); let output = result.unwrap(); // The third step receives "First: alpha | Second: beta" as its prompt assert!(output.contains("First: alpha")); assert!(output.contains("Second: beta")); } #[tokio::test] async fn test_fan_out_parallel() { let engine = WorkflowEngine::new(); let wf = Workflow { id: WorkflowId::new(), name: "fanout-test".to_string(), description: "".to_string(), steps: vec![ WorkflowStep { name: "task-a".to_string(), agent: StepAgent::ByName { name: "a".to_string(), }, prompt_template: "Task A: {{input}}".to_string(), mode: StepMode::FanOut, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, WorkflowStep { name: "task-b".to_string(), agent: StepAgent::ByName { name: "b".to_string(), }, prompt_template: "Task B: {{input}}".to_string(), mode: StepMode::FanOut, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, WorkflowStep { name: "collect".to_string(), agent: StepAgent::ByName { name: "c".to_string(), }, prompt_template: "unused".to_string(), mode: StepMode::Collect, timeout_secs: 10, error_mode: ErrorMode::Fail, output_var: None, }, ], created_at: Utc::now(), }; let wf_id = engine.register(wf).await; let run_id = engine.create_run(wf_id, "data".to_string()).await.unwrap(); let sender = |_id: AgentId, msg: String| async move { Ok((format!("Done: {msg}"), 10u64, 5u64)) }; let result = engine.execute_run(run_id, mock_resolver, sender).await; assert!(result.is_ok()); let output = result.unwrap(); // Collect step joins all outputs assert!(output.contains("Done: Task A")); assert!(output.contains("Done: Task B")); assert!(output.contains("---")); } #[tokio::test] async fn test_expand_variables() { let mut vars = HashMap::new(); vars.insert("name".to_string(), "Alice".to_string()); vars.insert("task".to_string(), "code review".to_string()); let result = WorkflowEngine::expand_variables( "Hello {{name}}, please do {{task}} on {{input}}", "main.rs", &vars, ); assert_eq!(result, "Hello Alice, please do code review on main.rs"); } #[tokio::test] async fn test_error_mode_serialization() { let fail_json = serde_json::to_string(&ErrorMode::Fail).unwrap(); assert_eq!(fail_json, "\"fail\""); let skip_json = serde_json::to_string(&ErrorMode::Skip).unwrap(); assert_eq!(skip_json, "\"skip\""); let retry_json = serde_json::to_string(&ErrorMode::Retry { max_retries: 3 }).unwrap(); let retry: ErrorMode = serde_json::from_str(&retry_json).unwrap(); assert!(matches!(retry, ErrorMode::Retry { max_retries: 3 })); } #[tokio::test] async fn test_step_mode_conditional_serialization() { let mode = StepMode::Conditional { condition: "error".to_string(), }; let json = serde_json::to_string(&mode).unwrap(); let parsed: StepMode = serde_json::from_str(&json).unwrap(); assert!(matches!(parsed, StepMode::Conditional { condition } if condition == "error")); } #[tokio::test] async fn test_step_mode_loop_serialization() { let mode = StepMode::Loop { max_iterations: 5, until: "done".to_string(), }; let json = serde_json::to_string(&mode).unwrap(); let parsed: StepMode = serde_json::from_str(&json).unwrap(); assert!(matches!(parsed, StepMode::Loop { max_iterations: 5, until } if until == "done")); } }