feat(pipeline): implement Pipeline DSL system for automated workflows
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Add complete Pipeline DSL system including:
- Rust backend (zclaw-pipeline crate) with parser, executor, and state management
- Frontend components: PipelinesPanel, PipelineResultPreview, ClassroomPreviewer
- Pipeline recommender for Agent conversation integration
- 5 pipeline templates: education, marketing, legal, research, productivity
- Documentation for Pipeline DSL architecture
Pipeline DSL enables declarative workflow definitions with:
- YAML-based configuration
- Expression resolution (${inputs.topic}, ${steps.step1.output})
- LLM integration, parallel execution, file export
- Agent smart recommendations in conversations
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
428
crates/zclaw-pipeline/src/executor.rs
Normal file
428
crates/zclaw-pipeline/src/executor.rs
Normal file
@@ -0,0 +1,428 @@
|
||||
//! Pipeline Executor
|
||||
//!
|
||||
//! Executes pipelines step by step, managing state and calling actions.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::RwLock;
|
||||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
use chrono::Utc;
|
||||
use futures::stream::{self, StreamExt};
|
||||
use futures::future::{BoxFuture, FutureExt};
|
||||
|
||||
use crate::types::{Pipeline, PipelineRun, PipelineProgress, RunStatus, PipelineStep, Action};
|
||||
use crate::state::{ExecutionContext, StateError};
|
||||
use crate::actions::ActionRegistry;
|
||||
|
||||
/// Pipeline execution errors
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ExecuteError {
|
||||
#[error("State error: {0}")]
|
||||
State(#[from] StateError),
|
||||
|
||||
#[error("Action error: {0}")]
|
||||
Action(String),
|
||||
|
||||
#[error("Step not found: {0}")]
|
||||
StepNotFound(String),
|
||||
|
||||
#[error("Timeout exceeded")]
|
||||
Timeout,
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error("Condition not met: {0}")]
|
||||
ConditionNotMet(String),
|
||||
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
/// Pipeline executor
|
||||
pub struct PipelineExecutor {
|
||||
/// Action registry
|
||||
action_registry: Arc<ActionRegistry>,
|
||||
|
||||
/// Active runs (run_id -> run state)
|
||||
runs: RwLock<HashMap<String, PipelineRun>>,
|
||||
|
||||
/// Cancellation flags
|
||||
cancellations: RwLock<HashMap<String, bool>>,
|
||||
}
|
||||
|
||||
impl PipelineExecutor {
|
||||
/// Create a new executor
|
||||
pub fn new(action_registry: Arc<ActionRegistry>) -> Self {
|
||||
Self {
|
||||
action_registry,
|
||||
runs: RwLock::new(HashMap::new()),
|
||||
cancellations: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a pipeline
|
||||
pub async fn execute(
|
||||
&self,
|
||||
pipeline: &Pipeline,
|
||||
inputs: HashMap<String, Value>,
|
||||
) -> Result<PipelineRun, ExecuteError> {
|
||||
let run_id = Uuid::new_v4().to_string();
|
||||
let pipeline_id = pipeline.metadata.name.clone();
|
||||
|
||||
// Create run record
|
||||
let run = PipelineRun {
|
||||
id: run_id.clone(),
|
||||
pipeline_id: pipeline_id.clone(),
|
||||
status: RunStatus::Running,
|
||||
inputs: serde_json::to_value(&inputs).unwrap_or(Value::Null),
|
||||
current_step: None,
|
||||
step_results: HashMap::new(),
|
||||
outputs: None,
|
||||
error: None,
|
||||
started_at: Utc::now(),
|
||||
ended_at: None,
|
||||
};
|
||||
|
||||
// Store run
|
||||
self.runs.write().await.insert(run_id.clone(), run);
|
||||
|
||||
// Create execution context
|
||||
let mut context = ExecutionContext::new(inputs);
|
||||
|
||||
// Execute steps
|
||||
let result = self.execute_steps(pipeline, &mut context, &run_id).await;
|
||||
|
||||
// Update run state
|
||||
let mut runs = self.runs.write().await;
|
||||
if let Some(run) = runs.get_mut(&run_id) {
|
||||
match result {
|
||||
Ok(outputs) => {
|
||||
run.status = RunStatus::Completed;
|
||||
run.outputs = Some(serde_json::to_value(&outputs).unwrap_or(Value::Null));
|
||||
}
|
||||
Err(e) => {
|
||||
run.status = RunStatus::Failed;
|
||||
run.error = Some(e.to_string());
|
||||
}
|
||||
}
|
||||
run.ended_at = Some(Utc::now());
|
||||
return Ok(run.clone());
|
||||
}
|
||||
|
||||
Err(ExecuteError::Action("Run not found after execution".to_string()))
|
||||
}
|
||||
|
||||
/// Execute pipeline steps
|
||||
async fn execute_steps(
|
||||
&self,
|
||||
pipeline: &Pipeline,
|
||||
context: &mut ExecutionContext,
|
||||
run_id: &str,
|
||||
) -> Result<HashMap<String, Value>, ExecuteError> {
|
||||
let total_steps = pipeline.spec.steps.len();
|
||||
|
||||
for (idx, step) in pipeline.spec.steps.iter().enumerate() {
|
||||
// Check cancellation
|
||||
if *self.cancellations.read().await.get(run_id).unwrap_or(&false) {
|
||||
return Err(ExecuteError::Cancelled);
|
||||
}
|
||||
|
||||
// Update current step
|
||||
if let Some(run) = self.runs.write().await.get_mut(run_id) {
|
||||
run.current_step = Some(step.id.clone());
|
||||
}
|
||||
|
||||
// Check condition
|
||||
if let Some(condition) = &step.when {
|
||||
let should_execute = self.evaluate_condition(condition, context)?;
|
||||
if !should_execute {
|
||||
tracing::info!("Skipping step {} (condition not met)", step.id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Executing step {} ({}/{})", step.id, idx + 1, total_steps);
|
||||
|
||||
// Execute action
|
||||
let result = self.execute_action(&step.action, context).await?;
|
||||
|
||||
// Store result
|
||||
context.set_output(&step.id, result.clone());
|
||||
|
||||
// Update step results in run
|
||||
if let Some(run) = self.runs.write().await.get_mut(run_id) {
|
||||
run.step_results.insert(step.id.clone(), result);
|
||||
}
|
||||
}
|
||||
|
||||
// Extract outputs
|
||||
Ok(context.extract_outputs(&pipeline.spec.outputs)
|
||||
.map_err(ExecuteError::State)?)
|
||||
}
|
||||
|
||||
/// Execute a single action (returns BoxFuture for recursion support)
|
||||
fn execute_action<'a>(
|
||||
&'a self,
|
||||
action: &'a Action,
|
||||
context: &'a mut ExecutionContext,
|
||||
) -> BoxFuture<'a, Result<Value, ExecuteError>> {
|
||||
async move {
|
||||
match action {
|
||||
Action::LlmGenerate { template, input, model, temperature, max_tokens, json_mode } => {
|
||||
let resolved_input = context.resolve_map(input)?;
|
||||
self.action_registry.execute_llm(
|
||||
template,
|
||||
resolved_input,
|
||||
model.clone(),
|
||||
*temperature,
|
||||
*max_tokens,
|
||||
*json_mode,
|
||||
).await.map_err(|e| ExecuteError::Action(e.to_string()))
|
||||
}
|
||||
|
||||
Action::Parallel { each, step, max_workers } => {
|
||||
let items = context.resolve(each)?;
|
||||
let items_array = items.as_array()
|
||||
.ok_or_else(|| ExecuteError::Action("Parallel 'each' must resolve to an array".to_string()))?;
|
||||
|
||||
let workers = max_workers.unwrap_or(4);
|
||||
let results = self.execute_parallel(step, items_array.clone(), workers).await?;
|
||||
|
||||
Ok(Value::Array(results))
|
||||
}
|
||||
|
||||
Action::Sequential { steps } => {
|
||||
let mut last_result = Value::Null;
|
||||
for step in steps {
|
||||
last_result = self.execute_action(&step.action, context).await?;
|
||||
context.set_output(&step.id, last_result.clone());
|
||||
}
|
||||
Ok(last_result)
|
||||
}
|
||||
|
||||
Action::Condition { branches, default, .. } => {
|
||||
for branch in branches {
|
||||
if self.evaluate_condition(&branch.when, context)? {
|
||||
return self.execute_action(&branch.then.action, context).await;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(default_step) = default {
|
||||
return self.execute_action(&default_step.action, context).await;
|
||||
}
|
||||
|
||||
Ok(Value::Null)
|
||||
}
|
||||
|
||||
Action::Skill { skill_id, input } => {
|
||||
let resolved_input = context.resolve_map(input)?;
|
||||
self.action_registry.execute_skill(skill_id, resolved_input)
|
||||
.await
|
||||
.map_err(|e| ExecuteError::Action(e.to_string()))
|
||||
}
|
||||
|
||||
Action::Hand { hand_id, hand_action, params } => {
|
||||
let resolved_params = context.resolve_map(params)?;
|
||||
self.action_registry.execute_hand(hand_id, hand_action, resolved_params)
|
||||
.await
|
||||
.map_err(|e| ExecuteError::Action(e.to_string()))
|
||||
}
|
||||
|
||||
Action::ClassroomRender { input } => {
|
||||
let data = context.resolve(input)?;
|
||||
self.action_registry.render_classroom(&data)
|
||||
.await
|
||||
.map_err(|e| ExecuteError::Action(e.to_string()))
|
||||
}
|
||||
|
||||
Action::FileExport { formats, input, output_dir } => {
|
||||
let data = context.resolve(input)?;
|
||||
let dir = match output_dir {
|
||||
Some(s) => {
|
||||
let resolved = context.resolve(s)?;
|
||||
resolved.as_str().map(|s| s.to_string())
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
self.action_registry.export_files(formats, &data, dir.as_deref())
|
||||
.await
|
||||
.map_err(|e| ExecuteError::Action(e.to_string()))
|
||||
}
|
||||
|
||||
Action::HttpRequest { url, method, headers, body } => {
|
||||
let resolved_url = context.resolve(url)?;
|
||||
let url_str = resolved_url.as_str()
|
||||
.ok_or_else(|| ExecuteError::Action("URL must be a string".to_string()))?;
|
||||
|
||||
let resolved_body = match body {
|
||||
Some(b) => Some(context.resolve(b)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
self.action_registry.http_request(
|
||||
url_str,
|
||||
method,
|
||||
headers,
|
||||
resolved_body.as_ref(),
|
||||
).await
|
||||
.map_err(|e| ExecuteError::Action(e.to_string()))
|
||||
}
|
||||
|
||||
Action::SetVar { name, value } => {
|
||||
let resolved = context.resolve(value)?;
|
||||
context.set_var(name, resolved.clone());
|
||||
Ok(resolved)
|
||||
}
|
||||
|
||||
Action::Delay { ms } => {
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(*ms)).await;
|
||||
Ok(Value::Null)
|
||||
}
|
||||
}
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
/// Execute parallel steps
|
||||
async fn execute_parallel(
|
||||
&self,
|
||||
step: &PipelineStep,
|
||||
items: Vec<Value>,
|
||||
max_workers: usize,
|
||||
) -> Result<Vec<Value>, ExecuteError> {
|
||||
let action_registry = self.action_registry.clone();
|
||||
let action = step.action.clone();
|
||||
|
||||
let results: Vec<Result<Value, ExecuteError>> = stream::iter(items.into_iter().enumerate())
|
||||
.map(|(index, item)| {
|
||||
let action_registry = action_registry.clone();
|
||||
let action = action.clone();
|
||||
|
||||
async move {
|
||||
// Create child context with loop variables
|
||||
let mut child_ctx = ExecutionContext::new(HashMap::new());
|
||||
child_ctx.set_loop_context(item, index);
|
||||
|
||||
// Execute the step's action
|
||||
let executor = PipelineExecutor::new(action_registry);
|
||||
executor.execute_action(&action, &mut child_ctx).await
|
||||
}
|
||||
})
|
||||
.buffer_unordered(max_workers)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let mut outputs = Vec::new();
|
||||
for result in results {
|
||||
outputs.push(result?);
|
||||
}
|
||||
|
||||
Ok(outputs)
|
||||
}
|
||||
|
||||
/// Evaluate a condition expression
|
||||
fn evaluate_condition(&self, condition: &str, context: &ExecutionContext) -> Result<bool, ExecuteError> {
|
||||
let resolved = context.resolve(condition)?;
|
||||
|
||||
// If resolved to a boolean, return it
|
||||
if let Value::Bool(b) = resolved {
|
||||
return Ok(b);
|
||||
}
|
||||
|
||||
// Check for comparison operators
|
||||
let condition = condition.trim();
|
||||
|
||||
// Equality check
|
||||
if let Some(eq_pos) = condition.find("==") {
|
||||
let left = condition[..eq_pos].trim();
|
||||
let right = condition[eq_pos + 2..].trim();
|
||||
|
||||
let left_val = context.resolve(left)?;
|
||||
let right_val = context.resolve(right)?;
|
||||
|
||||
return Ok(left_val == right_val);
|
||||
}
|
||||
|
||||
// Inequality check
|
||||
if let Some(ne_pos) = condition.find("!=") {
|
||||
let left = condition[..ne_pos].trim();
|
||||
let right = condition[ne_pos + 2..].trim();
|
||||
|
||||
let left_val = context.resolve(left)?;
|
||||
let right_val = context.resolve(right)?;
|
||||
|
||||
return Ok(left_val != right_val);
|
||||
}
|
||||
|
||||
// Default: treat as truthy check
|
||||
Ok(!resolved.is_null())
|
||||
}
|
||||
|
||||
/// Get run status
|
||||
pub async fn get_run(&self, run_id: &str) -> Option<PipelineRun> {
|
||||
self.runs.read().await.get(run_id).cloned()
|
||||
}
|
||||
|
||||
/// Get run progress
|
||||
pub async fn get_progress(&self, run_id: &str) -> Option<PipelineProgress> {
|
||||
let run = self.runs.read().await.get(run_id)?.clone();
|
||||
|
||||
let (current_step, percentage) = if run.step_results.is_empty() {
|
||||
("starting".to_string(), 0)
|
||||
} else if let Some(step) = &run.current_step {
|
||||
(step.clone(), 50)
|
||||
} else {
|
||||
("completed".to_string(), 100)
|
||||
};
|
||||
|
||||
Some(PipelineProgress {
|
||||
run_id: run.id,
|
||||
current_step,
|
||||
message: run.current_step.clone().unwrap_or_default(),
|
||||
percentage,
|
||||
status: run.status,
|
||||
})
|
||||
}
|
||||
|
||||
/// Cancel a run
|
||||
pub async fn cancel(&self, run_id: &str) {
|
||||
self.cancellations.write().await.insert(run_id.to_string(), true);
|
||||
}
|
||||
|
||||
/// List all runs
|
||||
pub async fn list_runs(&self) -> Vec<PipelineRun> {
|
||||
self.runs.read().await.values().cloned().collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn test_evaluate_condition_bool() {
|
||||
let registry = Arc::new(ActionRegistry::new());
|
||||
let executor = PipelineExecutor::new(registry);
|
||||
let ctx = ExecutionContext::new(HashMap::new());
|
||||
|
||||
assert!(executor.evaluate_condition("true", &ctx).unwrap());
|
||||
assert!(!executor.evaluate_condition("false", &ctx).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_evaluate_condition_equality() {
|
||||
let registry = Arc::new(ActionRegistry::new());
|
||||
let executor = PipelineExecutor::new(registry);
|
||||
let ctx = ExecutionContext::new(
|
||||
vec![("type".to_string(), json!("video"))]
|
||||
.into_iter()
|
||||
.collect()
|
||||
);
|
||||
|
||||
assert!(executor.evaluate_condition("${inputs.type} == 'video'", &ctx).unwrap());
|
||||
assert!(!executor.evaluate_condition("${inputs.type} == 'text'", &ctx).unwrap());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user