//! Pipeline execution state management //! //! Manages state during pipeline execution, including: //! - Input parameters //! - Step outputs //! - Loop variables (item, index) //! - Custom variables use std::collections::HashMap; use serde_json::Value; use regex::Regex; /// Execution context for a running pipeline #[derive(Debug, Clone)] pub struct ExecutionContext { /// Pipeline input values inputs: HashMap, /// Step outputs (step_id -> output) steps_output: HashMap, /// Custom variables (set by set_var action) variables: HashMap, /// Loop context (item, index for parallel/each) loop_context: Option, /// Expression parser expr_regex: Regex, } /// Loop context for parallel/each iterations #[derive(Debug, Clone)] pub struct LoopContext { /// Current item pub item: Value, /// Current index pub index: usize, /// Parent loop context (for nested loops) pub parent: Option>, } impl ExecutionContext { /// Create a new execution context with inputs pub fn new(inputs: HashMap) -> Self { Self { inputs, steps_output: HashMap::new(), variables: HashMap::new(), loop_context: None, expr_regex: Regex::new(r"\$\{([^}]+)\}").unwrap(), } } /// Create from JSON value pub fn from_value(inputs: Value) -> Self { let inputs_map = if let Value::Object(obj) = inputs { obj.into_iter().collect() } else { HashMap::new() }; Self::new(inputs_map) } /// Create from parent context data (for parallel execution) pub fn from_parent( inputs: HashMap, steps_output: HashMap, variables: HashMap, ) -> Self { Self { inputs, steps_output, variables, loop_context: None, expr_regex: Regex::new(r"\$\{([^}]+)\}").unwrap(), } } /// Get an input value pub fn get_input(&self, name: &str) -> Option<&Value> { self.inputs.get(name) } /// Set a step output pub fn set_output(&mut self, step_id: &str, value: Value) { self.steps_output.insert(step_id.to_string(), value); } /// Get a step output pub fn get_output(&self, step_id: &str) -> Option<&Value> { self.steps_output.get(step_id) } /// Set a variable pub fn set_var(&mut self, name: &str, value: Value) { self.variables.insert(name.to_string(), value); } /// Get a variable pub fn get_var(&self, name: &str) -> Option<&Value> { self.variables.get(name) } /// Set loop context pub fn set_loop_context(&mut self, item: Value, index: usize) { self.loop_context = Some(LoopContext { item, index, parent: self.loop_context.take().map(Box::new), }); } /// Clear loop context pub fn clear_loop_context(&mut self) { if let Some(ctx) = self.loop_context.take() { self.loop_context = ctx.parent.map(|b| *b); } } /// Resolve an expression to a value /// /// Supported expressions: /// - `${inputs.topic}` - Input parameter /// - `${steps.step_id.output}` - Step output /// - `${steps.step_id.output.field}` - Nested field access /// - `${item}` - Current loop item /// - `${index}` - Current loop index /// - `${var.name}` - Custom variable pub fn resolve(&self, expr: &str) -> Result { // If not an expression, return as-is if !expr.contains("${") { return Ok(Value::String(expr.to_string())); } // Replace all expressions let result = self.expr_regex.replace_all(expr, |caps: ®ex::Captures| { let path = &caps[1]; match self.resolve_path(path) { Ok(value) => value_to_string(&value), Err(_) => caps[0].to_string(), // Keep original if not found } }); // If the result is a valid JSON value, parse it if result.starts_with('{') || result.starts_with('[') || result.starts_with('"') { if let Ok(value) = serde_json::from_str(&result) { return Ok(value); } } // If the entire string was an expression, try to return the actual value if expr.starts_with("${") && expr.ends_with("}") { let path = &expr[2..expr.len()-1]; return self.resolve_path(path); } Ok(Value::String(result.to_string())) } /// Resolve a path like "inputs.topic" or "steps.step1.output.field" fn resolve_path(&self, path: &str) -> Result { let parts: Vec<&str> = path.split('.').collect(); if parts.is_empty() { return Err(StateError::InvalidPath(path.to_string())); } let first = parts[0]; let rest = &parts[1..]; match first { "inputs" => self.resolve_from_map(&self.inputs, rest, path), "steps" => { // Handle "output" as a special key for step outputs // steps.step_id.output.field -> steps_output["step_id"].field // steps.step_id.field -> steps_output["step_id"].field (also supported) if rest.len() >= 2 && rest[1] == "output" { // Skip "output" in the path: [step_id, "output", ...rest] -> [step_id, ...rest] let step_id = rest[0]; let actual_rest = &rest[2..]; let step_value = self.steps_output.get(step_id) .ok_or_else(|| StateError::VariableNotFound(step_id.to_string()))?; if actual_rest.is_empty() { Ok(step_value.clone()) } else { self.resolve_from_value(step_value, actual_rest, path) } } else { self.resolve_from_map(&self.steps_output, rest, path) } } "vars" | "var" => self.resolve_from_map(&self.variables, rest, path), "item" => { if let Some(ctx) = &self.loop_context { if rest.is_empty() { Ok(ctx.item.clone()) } else { self.resolve_from_value(&ctx.item, rest, path) } } else { Err(StateError::VariableNotFound("item".to_string())) } } "index" => { if let Some(ctx) = &self.loop_context { Ok(Value::Number(ctx.index.into())) } else { Err(StateError::VariableNotFound("index".to_string())) } } _ => Err(StateError::InvalidPath(path.to_string())), } } /// Resolve a path from a map fn resolve_from_map( &self, map: &HashMap, path_parts: &[&str], full_path: &str, ) -> Result { if path_parts.is_empty() { return Err(StateError::InvalidPath(full_path.to_string())); } let key = path_parts[0]; let value = map.get(key) .ok_or_else(|| StateError::VariableNotFound(key.to_string()))?; if path_parts.len() == 1 { Ok(value.clone()) } else { self.resolve_from_value(value, &path_parts[1..], full_path) } } /// Resolve a path from a value (nested access) fn resolve_from_value( &self, value: &Value, path_parts: &[&str], full_path: &str, ) -> Result { let mut current = value; for part in path_parts { current = match current { Value::Object(map) => map.get(*part) .ok_or_else(|| StateError::FieldNotFound(part.to_string()))?, Value::Array(arr) => { // Try to parse as index if let Ok(idx) = part.parse::() { arr.get(idx) .ok_or_else(|| StateError::IndexOutOfBounds(idx))? } else { return Err(StateError::InvalidPath(full_path.to_string())); } } _ => return Err(StateError::InvalidPath(full_path.to_string())), }; } Ok(current.clone()) } /// Resolve multiple expressions in a map pub fn resolve_map(&self, input: &HashMap) -> Result, StateError> { let mut result = HashMap::new(); for (key, expr) in input { let value = self.resolve(expr)?; result.insert(key.clone(), value); } Ok(result) } /// Get all step outputs pub fn all_outputs(&self) -> &HashMap { &self.steps_output } /// Get all inputs pub fn inputs(&self) -> &HashMap { &self.inputs } /// Get all variables pub fn all_vars(&self) -> &HashMap { &self.variables } /// Extract final outputs from the context pub fn extract_outputs(&self, output_defs: &HashMap) -> Result, StateError> { let mut outputs = HashMap::new(); for (name, expr) in output_defs { let value = self.resolve(expr)?; outputs.insert(name.clone(), value); } Ok(outputs) } } /// Convert a value to string for template replacement fn value_to_string(value: &Value) -> String { match value { Value::String(s) => s.clone(), Value::Number(n) => n.to_string(), Value::Bool(b) => b.to_string(), Value::Null => String::new(), Value::Array(arr) => { serde_json::to_string(arr).unwrap_or_default() } Value::Object(obj) => { serde_json::to_string(obj).unwrap_or_default() } } } /// State errors #[derive(Debug, thiserror::Error)] pub enum StateError { #[error("Invalid path: {0}")] InvalidPath(String), #[error("Variable not found: {0}")] VariableNotFound(String), #[error("Field not found: {0}")] FieldNotFound(String), #[error("Index out of bounds: {0}")] IndexOutOfBounds(usize), #[error("Type error: {0}")] TypeError(String), } #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn test_resolve_input() { let ctx = ExecutionContext::new( vec![("topic".to_string(), json!("physics"))] .into_iter() .collect() ); let result = ctx.resolve("${inputs.topic}").unwrap(); assert_eq!(result, json!("physics")); } #[test] fn test_resolve_step_output() { let mut ctx = ExecutionContext::new(HashMap::new()); ctx.set_output("step1", json!({"result": "hello", "count": 42})); let result = ctx.resolve("${steps.step1.output.result}").unwrap(); assert_eq!(result, json!("hello")); let count = ctx.resolve("${steps.step1.output.count}").unwrap(); assert_eq!(count, json!(42)); } #[test] fn test_resolve_loop_context() { let mut ctx = ExecutionContext::new(HashMap::new()); ctx.set_loop_context(json!({"name": "item1"}), 2); let item = ctx.resolve("${item}").unwrap(); assert_eq!(item, json!({"name": "item1"})); let index = ctx.resolve("${index}").unwrap(); assert_eq!(index, json!(2)); let name = ctx.resolve("${item.name}").unwrap(); assert_eq!(name, json!("item1")); } #[test] fn test_resolve_array_access() { let mut ctx = ExecutionContext::new(HashMap::new()); ctx.set_output("step1", json!({"items": ["a", "b", "c"]})); let result = ctx.resolve("${steps.step1.output.items.0}").unwrap(); assert_eq!(result, json!("a")); let result = ctx.resolve("${steps.step1.output.items.2}").unwrap(); assert_eq!(result, json!("c")); } #[test] fn test_resolve_mixed_string() { let ctx = ExecutionContext::new( vec![("name".to_string(), json!("World"))] .into_iter() .collect() ); let result = ctx.resolve("Hello, ${inputs.name}!").unwrap(); assert_eq!(result, json!("Hello, World!")); } #[test] fn test_extract_outputs() { let mut ctx = ExecutionContext::new(HashMap::new()); ctx.set_output("render", json!({"id": "classroom-123", "url": "/preview"})); let outputs = vec![ ("classroom_id".to_string(), "${steps.render.output.id}".to_string()), ("preview_url".to_string(), "${steps.render.output.url}".to_string()), ].into_iter().collect(); let result = ctx.extract_outputs(&outputs).unwrap(); assert_eq!(result.get("classroom_id").unwrap(), &json!("classroom-123")); assert_eq!(result.get("preview_url").unwrap(), &json!("/preview")); } }