//! Pipeline Executor //! //! Executes pipelines step by step, managing state and calling actions. use std::sync::Arc; use std::collections::HashMap; use tokio::sync::RwLock; use serde_json::Value; use uuid::Uuid; use chrono::Utc; use futures::stream::{self, StreamExt}; use futures::future::{BoxFuture, FutureExt}; use crate::types::{Pipeline, PipelineRun, PipelineProgress, RunStatus, PipelineStep, Action, ExportFormat}; use crate::state::{ExecutionContext, StateError}; use crate::actions::ActionRegistry; /// Pipeline execution errors #[derive(Debug, thiserror::Error)] pub enum ExecuteError { #[error("State error: {0}")] State(#[from] StateError), #[error("Action error: {0}")] Action(String), #[error("Step not found: {0}")] StepNotFound(String), #[error("Timeout exceeded")] Timeout, #[error("Cancelled")] Cancelled, #[error("Condition not met: {0}")] ConditionNotMet(String), #[error("IO error: {0}")] Io(#[from] std::io::Error), } /// Pipeline executor pub struct PipelineExecutor { /// Action registry action_registry: Arc, /// Active runs (run_id -> run state) runs: RwLock>, /// Cancellation flags cancellations: RwLock>, } impl PipelineExecutor { /// Create a new executor pub fn new(action_registry: Arc) -> Self { Self { action_registry, runs: RwLock::new(HashMap::new()), cancellations: RwLock::new(HashMap::new()), } } /// Execute a pipeline with auto-generated run ID pub async fn execute( &self, pipeline: &Pipeline, inputs: HashMap, ) -> Result { let run_id = Uuid::new_v4().to_string(); self.execute_with_id(pipeline, inputs, &run_id).await } /// Execute a pipeline with a specific run ID /// /// Use this when you need to know the run_id before execution starts, /// e.g., for async spawning where the caller needs to track progress. pub async fn execute_with_id( &self, pipeline: &Pipeline, inputs: HashMap, run_id: &str, ) -> Result { let pipeline_id = pipeline.metadata.name.clone(); let run_id = run_id.to_string(); // Create run record let run = PipelineRun { id: run_id.clone(), pipeline_id: pipeline_id.clone(), status: RunStatus::Running, inputs: serde_json::to_value(&inputs).unwrap_or(Value::Null), current_step: None, step_results: HashMap::new(), outputs: None, error: None, started_at: Utc::now(), ended_at: None, }; // Store run self.runs.write().await.insert(run_id.clone(), run); // Create execution context let mut context = ExecutionContext::new(inputs); // Execute steps let result = self.execute_steps(pipeline, &mut context, &run_id).await; // Update run state let mut runs = self.runs.write().await; if let Some(run) = runs.get_mut(&run_id) { match result { Ok(outputs) => { run.status = RunStatus::Completed; run.outputs = Some(serde_json::to_value(&outputs).unwrap_or(Value::Null)); } Err(e) => { run.status = RunStatus::Failed; run.error = Some(e.to_string()); } } run.ended_at = Some(Utc::now()); return Ok(run.clone()); } Err(ExecuteError::Action("执行后未找到运行记录".to_string())) } /// Execute pipeline steps async fn execute_steps( &self, pipeline: &Pipeline, context: &mut ExecutionContext, run_id: &str, ) -> Result, ExecuteError> { let total_steps = pipeline.spec.steps.len(); for (idx, step) in pipeline.spec.steps.iter().enumerate() { // Check cancellation if *self.cancellations.read().await.get(run_id).unwrap_or(&false) { return Err(ExecuteError::Cancelled); } // Update current step if let Some(run) = self.runs.write().await.get_mut(run_id) { run.current_step = Some(step.id.clone()); } // Check condition if let Some(condition) = &step.when { let should_execute = self.evaluate_condition(condition, context)?; if !should_execute { tracing::info!("Skipping step {} (condition not met)", step.id); continue; } } tracing::info!("Executing step {} ({}/{})", step.id, idx + 1, total_steps); // Execute action let result = self.execute_action(&step.action, context).await?; // Store result context.set_output(&step.id, result.clone()); // Update step results in run if let Some(run) = self.runs.write().await.get_mut(run_id) { run.step_results.insert(step.id.clone(), result); } } // Extract outputs Ok(context.extract_outputs(&pipeline.spec.outputs) .map_err(ExecuteError::State)?) } /// Execute a single action (returns BoxFuture for recursion support) fn execute_action<'a>( &'a self, action: &'a Action, context: &'a mut ExecutionContext, ) -> BoxFuture<'a, Result> { async move { match action { Action::LlmGenerate { template, input, model, temperature, max_tokens, json_mode } => { tracing::debug!(target: "pipeline_executor", "LlmGenerate action called"); tracing::debug!(target: "pipeline_executor", "input keys: {:?}", input.keys().collect::>()); // First resolve the template itself (handles ${inputs.xxx}, ${item.xxx}, etc.) let resolved_template = context.resolve(template)?; let resolved_template_str = resolved_template.as_str().unwrap_or(template).to_string(); tracing::debug!(target: "pipeline_executor", "Resolved template ({} chars, first 100): {}", resolved_template_str.len(), &resolved_template_str[..resolved_template_str.len().min(100)]); let resolved_input = context.resolve_map(input)?; tracing::debug!(target: "pipeline_executor", "Resolved input keys: {:?}", resolved_input.keys().collect::>()); self.action_registry.execute_llm( &resolved_template_str, resolved_input, model.clone(), *temperature, *max_tokens, *json_mode, ).await.map_err(|e| ExecuteError::Action(e.to_string())) } Action::Parallel { each, step, max_workers } => { let items = context.resolve(each)?; let items_array = items.as_array() .ok_or_else(|| ExecuteError::Action("并行执行 'each' 必须解析为数组".to_string()))?; let workers = max_workers.unwrap_or(4); let results = self.execute_parallel(step, items_array.clone(), workers, context).await?; Ok(Value::Array(results)) } Action::Sequential { steps } => { let mut last_result = Value::Null; for step in steps { last_result = self.execute_action(&step.action, context).await?; context.set_output(&step.id, last_result.clone()); } Ok(last_result) } Action::Condition { branches, default, .. } => { for branch in branches { if self.evaluate_condition(&branch.when, context)? { return self.execute_action(&branch.then.action, context).await; } } if let Some(default_step) = default { return self.execute_action(&default_step.action, context).await; } Ok(Value::Null) } Action::Skill { skill_id, input } => { let resolved_input = context.resolve_map(input)?; self.action_registry.execute_skill(skill_id, resolved_input) .await .map_err(|e| ExecuteError::Action(e.to_string())) } Action::Hand { hand_id, hand_action, params } => { let resolved_params = context.resolve_map(params)?; self.action_registry.execute_hand(hand_id, hand_action, resolved_params) .await .map_err(|e| ExecuteError::Action(e.to_string())) } Action::ClassroomRender { input } => { let data = context.resolve(input)?; self.action_registry.render_classroom(&data) .await .map_err(|e| ExecuteError::Action(e.to_string())) } Action::FileExport { formats, input, output_dir } => { let data = context.resolve(input)?; let dir = match output_dir { Some(s) => { let resolved = context.resolve(s)?; resolved.as_str().map(|s| s.to_string()) } None => None, }; // Resolve formats expression and parse as array let resolved_formats = context.resolve(formats)?; let format_strings: Vec = if resolved_formats.is_array() { resolved_formats.as_array() .ok_or_else(|| ExecuteError::Action("formats must be an array".to_string()))? .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect() } else if resolved_formats.is_string() { // Try to parse as JSON array string let s = resolved_formats.as_str() .ok_or_else(|| ExecuteError::Action("formats must be a string or array".to_string()))?; serde_json::from_str(s) .unwrap_or_else(|_| vec![s.to_string()]) } else { return Err(ExecuteError::Action("formats must be a string or array".to_string())); }; // Convert strings to ExportFormat let export_formats: Vec = format_strings .iter() .filter_map(|s| match s.to_lowercase().as_str() { "pptx" => Some(ExportFormat::Pptx), "html" => Some(ExportFormat::Html), "pdf" => Some(ExportFormat::Pdf), "markdown" | "md" => Some(ExportFormat::Markdown), "json" => Some(ExportFormat::Json), _ => None, }) .collect(); self.action_registry.export_files(&export_formats, &data, dir.as_deref()) .await .map_err(|e| ExecuteError::Action(e.to_string())) } Action::HttpRequest { url, method, headers, body } => { let resolved_url = context.resolve(url)?; let url_str = resolved_url.as_str() .ok_or_else(|| ExecuteError::Action("URL must be a string".to_string()))?; let resolved_body = match body { Some(b) => Some(context.resolve(b)?), None => None, }; self.action_registry.http_request( url_str, method, headers, resolved_body.as_ref(), ).await .map_err(|e| ExecuteError::Action(e.to_string())) } Action::SetVar { name, value } => { let resolved = context.resolve(value)?; context.set_var(name, resolved.clone()); Ok(resolved) } Action::Delay { ms } => { tokio::time::sleep(tokio::time::Duration::from_millis(*ms)).await; Ok(Value::Null) } Action::SkillOrchestration { graph_id, graph, input } => { let resolved_input = context.resolve_map(input)?; self.action_registry.execute_orchestration( graph_id.as_deref(), graph.as_ref(), resolved_input, ).await .map_err(|e| ExecuteError::Action(e.to_string())) } } }.boxed() } /// Execute parallel steps async fn execute_parallel( &self, step: &PipelineStep, items: Vec, max_workers: usize, parent_context: &ExecutionContext, ) -> Result, ExecuteError> { let action_registry = self.action_registry.clone(); let action = step.action.clone(); // Clone parent context data for child contexts let parent_inputs = parent_context.inputs().clone(); let parent_outputs = parent_context.all_outputs().clone(); let parent_vars = parent_context.all_vars().clone(); let results: Vec> = stream::iter(items.into_iter().enumerate()) .map(|(index, item)| { let action_registry = action_registry.clone(); let action = action.clone(); let parent_inputs = parent_inputs.clone(); let parent_outputs = parent_outputs.clone(); let parent_vars = parent_vars.clone(); async move { // Create child context with parent data and loop variables let mut child_ctx = ExecutionContext::from_parent( parent_inputs, parent_outputs, parent_vars, ); child_ctx.set_loop_context(item, index); // Execute the step's action let executor = PipelineExecutor::new(action_registry); executor.execute_action(&action, &mut child_ctx).await } }) .buffer_unordered(max_workers) .collect() .await; let mut outputs = Vec::new(); for result in results { outputs.push(result?); } Ok(outputs) } /// Evaluate a condition expression fn evaluate_condition(&self, condition: &str, context: &ExecutionContext) -> Result { let resolved = context.resolve(condition)?; // If resolved to a boolean, return it if let Value::Bool(b) = resolved { return Ok(b); } // Handle string "true" / "false" as boolean values if let Value::String(s) = &resolved { match s.as_str() { "true" => return Ok(true), "false" => return Ok(false), _ => {} } } // Check for comparison operators let condition = condition.trim(); // Equality check if let Some(eq_pos) = condition.find("==") { let left = condition[..eq_pos].trim(); let right = condition[eq_pos + 2..].trim(); let left_val = context.resolve(left)?; // Handle quoted string literals for right side let right_val = if right.starts_with('\'') && right.ends_with('\'') { // Remove quotes and return as string value Value::String(right[1..right.len()-1].to_string()) } else if right.starts_with('"') && right.ends_with('"') { // Remove double quotes and return as string value Value::String(right[1..right.len()-1].to_string()) } else { context.resolve(right)? }; return Ok(left_val == right_val); } // Inequality check if let Some(ne_pos) = condition.find("!=") { let left = condition[..ne_pos].trim(); let right = condition[ne_pos + 2..].trim(); let left_val = context.resolve(left)?; let right_val = context.resolve(right)?; return Ok(left_val != right_val); } // Default: treat as truthy check Ok(!resolved.is_null()) } /// Get run status pub async fn get_run(&self, run_id: &str) -> Option { self.runs.read().await.get(run_id).cloned() } /// Get run progress pub async fn get_progress(&self, run_id: &str) -> Option { let run = self.runs.read().await.get(run_id)?.clone(); let (current_step, percentage) = if run.step_results.is_empty() { ("starting".to_string(), 0) } else if let Some(step) = &run.current_step { (step.clone(), 50) } else { ("completed".to_string(), 100) }; Some(PipelineProgress { run_id: run.id, current_step, message: run.current_step.clone().unwrap_or_default(), percentage, status: run.status, }) } /// Cancel a run pub async fn cancel(&self, run_id: &str) { self.cancellations.write().await.insert(run_id.to_string(), true); } /// List all runs pub async fn list_runs(&self) -> Vec { self.runs.read().await.values().cloned().collect() } } #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn test_evaluate_condition_bool() { let registry = Arc::new(ActionRegistry::new()); let executor = PipelineExecutor::new(registry); let ctx = ExecutionContext::new(HashMap::new()); assert!(executor.evaluate_condition("true", &ctx).unwrap()); assert!(!executor.evaluate_condition("false", &ctx).unwrap()); } #[test] fn test_evaluate_condition_equality() { let registry = Arc::new(ActionRegistry::new()); let executor = PipelineExecutor::new(registry); let ctx = ExecutionContext::new( vec![("type".to_string(), json!("video"))] .into_iter() .collect() ); assert!(executor.evaluate_condition("${inputs.type} == 'video'", &ctx).unwrap()); assert!(!executor.evaluate_condition("${inputs.type} == 'text'", &ctx).unwrap()); } }