Files
zclaw_openfang/crates/zclaw-pipeline/src/executor.rs
iven f3fb5340b5 fix: 发布前审计 Batch 1 — Pipeline 内存泄漏/超时 + Director 死锁 + Rate Limit Worker
Pipeline executor:
- 添加 cleanup() 方法,MAX_COMPLETED_RUNS=100 上限淘汰旧记录
- 每步执行添加 tokio::time::timeout(使用 PipelineSpec.timeout_secs,默认 300s)
- Delay ms 上限 60000,超出 warn 并截断

Director send_to_agent:
- 重构为 oneshot::channel 响应模式,避免 inbox + pending_requests 锁竞争
- 添加 ensure_inbox_reader() 独立任务分发响应到对应 oneshot sender

cleanup_rate_limit Worker:
- 实现 Worker body: DELETE FROM rate_limit_events WHERE created_at < NOW() - INTERVAL '1 hour'

651 tests passed, 0 failed
2026-04-18 14:09:16 +08:00

609 lines
23 KiB
Rust

//! Pipeline Executor
//!
//! Executes pipelines step by step, managing state and calling actions.
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::RwLock;
use serde_json::Value;
use uuid::Uuid;
use chrono::Utc;
use futures::stream::{self, StreamExt};
use futures::future::{BoxFuture, FutureExt};
use crate::types::{Pipeline, PipelineRun, PipelineProgress, RunStatus, PipelineStep, Action, ExportFormat};
use crate::state::{ExecutionContext, StateError};
use crate::actions::ActionRegistry;
/// Pipeline execution errors
#[derive(Debug, thiserror::Error)]
pub enum ExecuteError {
#[error("State error: {0}")]
State(#[from] StateError),
#[error("Action error: {0}")]
Action(String),
#[error("Step not found: {0}")]
StepNotFound(String),
#[error("Timeout exceeded")]
Timeout,
#[error("Cancelled")]
Cancelled,
#[error("Condition not met: {0}")]
ConditionNotMet(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
/// Maximum completed/failed/cancelled runs to keep in memory
const MAX_COMPLETED_RUNS: usize = 100;
/// Maximum allowed delay in milliseconds (60 seconds)
const MAX_DELAY_MS: u64 = 60_000;
/// Default per-step timeout (5 minutes)
const DEFAULT_STEP_TIMEOUT_SECS: u64 = 300;
/// Pipeline executor
pub struct PipelineExecutor {
/// Action registry
action_registry: Arc<ActionRegistry>,
/// Active runs (run_id -> run state)
runs: RwLock<HashMap<String, PipelineRun>>,
/// Cancellation flags
cancellations: RwLock<HashMap<String, bool>>,
}
impl PipelineExecutor {
/// Create a new executor
pub fn new(action_registry: Arc<ActionRegistry>) -> Self {
Self {
action_registry,
runs: RwLock::new(HashMap::new()),
cancellations: RwLock::new(HashMap::new()),
}
}
/// Execute a pipeline with auto-generated run ID
pub async fn execute(
&self,
pipeline: &Pipeline,
inputs: HashMap<String, Value>,
) -> Result<PipelineRun, ExecuteError> {
let run_id = Uuid::new_v4().to_string();
self.execute_with_id(pipeline, inputs, &run_id).await
}
/// Execute a pipeline with a specific run ID
///
/// Use this when you need to know the run_id before execution starts,
/// e.g., for async spawning where the caller needs to track progress.
pub async fn execute_with_id(
&self,
pipeline: &Pipeline,
inputs: HashMap<String, Value>,
run_id: &str,
) -> Result<PipelineRun, ExecuteError> {
let pipeline_id = pipeline.metadata.name.clone();
let run_id = run_id.to_string();
// Create run record
let total_steps = pipeline.spec.steps.len();
let run = PipelineRun {
id: run_id.clone(),
pipeline_id: pipeline_id.clone(),
status: RunStatus::Running,
inputs: serde_json::to_value(&inputs).unwrap_or(Value::Null),
current_step: None,
step_results: HashMap::new(),
outputs: None,
error: None,
total_steps,
started_at: Utc::now(),
ended_at: None,
};
// Store run
self.runs.write().await.insert(run_id.clone(), run);
// Create execution context
let mut context = ExecutionContext::new(inputs);
// Determine per-step timeout from pipeline spec (0 means use default)
let step_timeout = if pipeline.spec.timeout_secs > 0 {
pipeline.spec.timeout_secs
} else {
DEFAULT_STEP_TIMEOUT_SECS
};
// Execute steps
let result = self.execute_steps(pipeline, &mut context, &run_id, step_timeout).await;
// Update run state
let return_value = {
let mut runs = self.runs.write().await;
if let Some(run) = runs.get_mut(&run_id) {
match result {
Ok(outputs) => {
run.status = RunStatus::Completed;
run.outputs = Some(serde_json::to_value(&outputs).unwrap_or(Value::Null));
}
Err(e) => {
run.status = RunStatus::Failed;
run.error = Some(e.to_string());
}
}
run.ended_at = Some(Utc::now());
Ok(run.clone())
} else {
Err(ExecuteError::Action("执行后未找到运行记录".to_string()))
}
};
// Auto-cleanup old completed runs (after releasing the write lock)
self.cleanup().await;
return_value
}
/// Execute pipeline steps with per-step timeout
async fn execute_steps(
&self,
pipeline: &Pipeline,
context: &mut ExecutionContext,
run_id: &str,
step_timeout_secs: u64,
) -> Result<HashMap<String, Value>, ExecuteError> {
let total_steps = pipeline.spec.steps.len();
for (idx, step) in pipeline.spec.steps.iter().enumerate() {
// Check cancellation
if *self.cancellations.read().await.get(run_id).unwrap_or(&false) {
return Err(ExecuteError::Cancelled);
}
// Update current step
if let Some(run) = self.runs.write().await.get_mut(run_id) {
run.current_step = Some(step.id.clone());
}
// Check condition
if let Some(condition) = &step.when {
let should_execute = self.evaluate_condition(condition, context)?;
if !should_execute {
tracing::info!("Skipping step {} (condition not met)", step.id);
continue;
}
}
tracing::info!("Executing step {} ({}/{})", step.id, idx + 1, total_steps);
// Execute action with per-step timeout
let timeout_duration = std::time::Duration::from_secs(step_timeout_secs);
let result = tokio::time::timeout(
timeout_duration,
self.execute_action(&step.action, context),
).await.map_err(|_| {
tracing::error!("Step {} timed out after {}s", step.id, step_timeout_secs);
ExecuteError::Timeout
})??;
// Store result
context.set_output(&step.id, result.clone());
// Update step results in run
if let Some(run) = self.runs.write().await.get_mut(run_id) {
run.step_results.insert(step.id.clone(), result);
}
}
// Extract outputs
Ok(context.extract_outputs(&pipeline.spec.outputs)
.map_err(ExecuteError::State)?)
}
/// Execute a single action (returns BoxFuture for recursion support)
fn execute_action<'a>(
&'a self,
action: &'a Action,
context: &'a mut ExecutionContext,
) -> BoxFuture<'a, Result<Value, ExecuteError>> {
async move {
match action {
Action::LlmGenerate { template, input, model, temperature, max_tokens, json_mode } => {
tracing::debug!(target: "pipeline_executor", "LlmGenerate action called");
tracing::debug!(target: "pipeline_executor", "input keys: {:?}", input.keys().collect::<Vec<_>>());
// First resolve the template itself (handles ${inputs.xxx}, ${item.xxx}, etc.)
let resolved_template = context.resolve(template)?;
let resolved_template_str = resolved_template.as_str().unwrap_or(template).to_string();
tracing::debug!(target: "pipeline_executor", "Resolved template ({} chars, first 100): {}",
resolved_template_str.len(),
&resolved_template_str[..resolved_template_str.len().min(100)]);
let resolved_input = context.resolve_map(input)?;
tracing::debug!(target: "pipeline_executor", "Resolved input keys: {:?}", resolved_input.keys().collect::<Vec<_>>());
self.action_registry.execute_llm(
&resolved_template_str,
resolved_input,
model.clone(),
*temperature,
*max_tokens,
*json_mode,
).await.map_err(|e| ExecuteError::Action(e.to_string()))
}
Action::Parallel { each, step, max_workers } => {
let items = context.resolve(each)?;
let items_array = items.as_array()
.ok_or_else(|| ExecuteError::Action("并行执行 'each' 必须解析为数组".to_string()))?;
let workers = max_workers.unwrap_or(4);
let results = self.execute_parallel(step, items_array.clone(), workers, context).await?;
Ok(Value::Array(results))
}
Action::Sequential { steps } => {
let mut last_result = Value::Null;
for step in steps {
last_result = self.execute_action(&step.action, context).await?;
context.set_output(&step.id, last_result.clone());
}
Ok(last_result)
}
Action::Condition { branches, default, .. } => {
for branch in branches {
if self.evaluate_condition(&branch.when, context)? {
return self.execute_action(&branch.then.action, context).await;
}
}
if let Some(default_step) = default {
return self.execute_action(&default_step.action, context).await;
}
Ok(Value::Null)
}
Action::Skill { skill_id, input } => {
let resolved_input = context.resolve_map(input)?;
self.action_registry.execute_skill(skill_id, resolved_input)
.await
.map_err(|e| ExecuteError::Action(e.to_string()))
}
Action::Hand { hand_id, hand_action, params } => {
let resolved_params = context.resolve_map(params)?;
self.action_registry.execute_hand(hand_id, hand_action, resolved_params)
.await
.map_err(|e| ExecuteError::Action(e.to_string()))
}
Action::ClassroomRender { input } => {
let data = context.resolve(input)?;
self.action_registry.render_classroom(&data)
.await
.map_err(|e| ExecuteError::Action(e.to_string()))
}
Action::FileExport { formats, input, output_dir } => {
let data = context.resolve(input)?;
let dir = match output_dir {
Some(s) => {
let resolved = context.resolve(s)?;
resolved.as_str().map(|s| s.to_string())
}
None => None,
};
// Resolve formats expression and parse as array
let resolved_formats = context.resolve(formats)?;
let format_strings: Vec<String> = if resolved_formats.is_array() {
resolved_formats.as_array()
.ok_or_else(|| ExecuteError::Action("formats must be an array".to_string()))?
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
} else if resolved_formats.is_string() {
// Try to parse as JSON array string
let s = resolved_formats.as_str()
.ok_or_else(|| ExecuteError::Action("formats must be a string or array".to_string()))?;
serde_json::from_str(s)
.unwrap_or_else(|_| vec![s.to_string()])
} else {
return Err(ExecuteError::Action("formats must be a string or array".to_string()));
};
// Convert strings to ExportFormat
let export_formats: Vec<ExportFormat> = format_strings
.iter()
.filter_map(|s| match s.to_lowercase().as_str() {
"pptx" => Some(ExportFormat::Pptx),
"html" => Some(ExportFormat::Html),
"pdf" => Some(ExportFormat::Pdf),
"markdown" | "md" => Some(ExportFormat::Markdown),
"json" => Some(ExportFormat::Json),
_ => None,
})
.collect();
self.action_registry.export_files(&export_formats, &data, dir.as_deref())
.await
.map_err(|e| ExecuteError::Action(e.to_string()))
}
Action::HttpRequest { url, method, headers, body } => {
let resolved_url = context.resolve(url)?;
let url_str = resolved_url.as_str()
.ok_or_else(|| ExecuteError::Action("URL must be a string".to_string()))?;
let resolved_body = match body {
Some(b) => Some(context.resolve(b)?),
None => None,
};
self.action_registry.http_request(
url_str,
method,
headers,
resolved_body.as_ref(),
).await
.map_err(|e| ExecuteError::Action(e.to_string()))
}
Action::SetVar { name, value } => {
let resolved = context.resolve(value)?;
context.set_var(name, resolved.clone());
Ok(resolved)
}
Action::Delay { ms } => {
let capped_ms = if *ms > MAX_DELAY_MS {
tracing::warn!(
"Delay ms {} exceeds max {}, capping to {}",
ms, MAX_DELAY_MS, MAX_DELAY_MS
);
MAX_DELAY_MS
} else {
*ms
};
tokio::time::sleep(tokio::time::Duration::from_millis(capped_ms)).await;
Ok(Value::Null)
}
Action::SkillOrchestration { graph_id, graph, input } => {
let resolved_input = context.resolve_map(input)?;
self.action_registry.execute_orchestration(
graph_id.as_deref(),
graph.as_ref(),
resolved_input,
).await
.map_err(|e| ExecuteError::Action(e.to_string()))
}
}
}.boxed()
}
/// Execute parallel steps
async fn execute_parallel(
&self,
step: &PipelineStep,
items: Vec<Value>,
max_workers: usize,
parent_context: &ExecutionContext,
) -> Result<Vec<Value>, ExecuteError> {
let action_registry = self.action_registry.clone();
let action = step.action.clone();
// Clone parent context data for child contexts
let parent_inputs = parent_context.inputs().clone();
let parent_outputs = parent_context.all_outputs().clone();
let parent_vars = parent_context.all_vars().clone();
let results: Vec<Result<Value, ExecuteError>> = stream::iter(items.into_iter().enumerate())
.map(|(index, item)| {
let action_registry = action_registry.clone();
let action = action.clone();
let parent_inputs = parent_inputs.clone();
let parent_outputs = parent_outputs.clone();
let parent_vars = parent_vars.clone();
async move {
// Create child context with parent data and loop variables
let mut child_ctx = ExecutionContext::from_parent(
parent_inputs,
parent_outputs,
parent_vars,
);
child_ctx.set_loop_context(item, index);
// Execute the step's action
let executor = PipelineExecutor::new(action_registry);
executor.execute_action(&action, &mut child_ctx).await
}
})
.buffer_unordered(max_workers)
.collect()
.await;
let mut outputs = Vec::new();
for result in results {
outputs.push(result?);
}
Ok(outputs)
}
/// Evaluate a condition expression
fn evaluate_condition(&self, condition: &str, context: &ExecutionContext) -> Result<bool, ExecuteError> {
let resolved = context.resolve(condition)?;
// If resolved to a boolean, return it
if let Value::Bool(b) = resolved {
return Ok(b);
}
// Handle string "true" / "false" as boolean values
if let Value::String(s) = &resolved {
match s.as_str() {
"true" => return Ok(true),
"false" => return Ok(false),
_ => {}
}
}
// Check for comparison operators
let condition = condition.trim();
// Equality check
if let Some(eq_pos) = condition.find("==") {
let left = condition[..eq_pos].trim();
let right = condition[eq_pos + 2..].trim();
let left_val = context.resolve(left)?;
// Handle quoted string literals for right side
let right_val = if right.starts_with('\'') && right.ends_with('\'') {
// Remove quotes and return as string value
Value::String(right[1..right.len()-1].to_string())
} else if right.starts_with('"') && right.ends_with('"') {
// Remove double quotes and return as string value
Value::String(right[1..right.len()-1].to_string())
} else {
context.resolve(right)?
};
return Ok(left_val == right_val);
}
// Inequality check
if let Some(ne_pos) = condition.find("!=") {
let left = condition[..ne_pos].trim();
let right = condition[ne_pos + 2..].trim();
let left_val = context.resolve(left)?;
let right_val = context.resolve(right)?;
return Ok(left_val != right_val);
}
// Default: treat as truthy check
Ok(!resolved.is_null())
}
/// Get run status
pub async fn get_run(&self, run_id: &str) -> Option<PipelineRun> {
self.runs.read().await.get(run_id).cloned()
}
/// Get run progress
pub async fn get_progress(&self, run_id: &str) -> Option<PipelineProgress> {
let run = self.runs.read().await.get(run_id)?.clone();
let (current_step, percentage) = if run.total_steps == 0 {
// Empty pipeline or unknown total
match run.status {
RunStatus::Completed => ("completed".to_string(), 100),
_ => ("starting".to_string(), 0),
}
} else if run.status == RunStatus::Completed {
("completed".to_string(), 100)
} else if let Some(step) = &run.current_step {
// P3-04: Calculate actual percentage from completed steps
let completed = run.step_results.len();
let pct = ((completed as f64 / run.total_steps as f64) * 100.0).min(99.0) as u8;
(step.clone(), pct)
} else if run.step_results.is_empty() {
("starting".to_string(), 0)
} else {
// Not running, not completed (failed/cancelled)
let completed = run.step_results.len();
let pct = ((completed as f64 / run.total_steps as f64) * 100.0) as u8;
("stopped".to_string(), pct)
};
Some(PipelineProgress {
run_id: run.id,
current_step,
message: run.current_step.clone().unwrap_or_default(),
percentage,
status: run.status,
})
}
/// Cancel a run
pub async fn cancel(&self, run_id: &str) {
self.cancellations.write().await.insert(run_id.to_string(), true);
}
/// List all runs
pub async fn list_runs(&self) -> Vec<PipelineRun> {
self.runs.read().await.values().cloned().collect()
}
/// Clean up old completed/failed/cancelled runs to prevent memory leaks.
/// Keeps at most MAX_COMPLETED_RUNS finished runs, evicting the oldest first.
pub async fn cleanup(&self) {
let mut runs = self.runs.write().await;
// Collect IDs of finished runs (completed, failed, cancelled)
let mut finished: Vec<(String, chrono::DateTime<Utc>)> = runs
.iter()
.filter(|(_, r)| matches!(r.status, RunStatus::Completed | RunStatus::Failed | RunStatus::Cancelled))
.map(|(id, r)| (id.clone(), r.ended_at.unwrap_or(r.started_at)))
.collect();
let to_remove = finished.len().saturating_sub(MAX_COMPLETED_RUNS);
if to_remove > 0 {
// Sort by end time ascending (oldest first)
finished.sort_by_key(|(_, t)| *t);
for (id, _) in finished.into_iter().take(to_remove) {
runs.remove(&id);
// Also clean up cancellation flag
drop(runs);
self.cancellations.write().await.remove(&id);
runs = self.runs.write().await;
}
tracing::debug!("Cleaned up {} old pipeline runs", to_remove);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_evaluate_condition_bool() {
let registry = Arc::new(ActionRegistry::new());
let executor = PipelineExecutor::new(registry);
let ctx = ExecutionContext::new(HashMap::new());
assert!(executor.evaluate_condition("true", &ctx).unwrap());
assert!(!executor.evaluate_condition("false", &ctx).unwrap());
}
#[test]
fn test_evaluate_condition_equality() {
let registry = Arc::new(ActionRegistry::new());
let executor = PipelineExecutor::new(registry);
let ctx = ExecutionContext::new(
vec![("type".to_string(), json!("video"))]
.into_iter()
.collect()
);
assert!(executor.evaluate_condition("${inputs.type} == 'video'", &ctx).unwrap());
assert!(!executor.evaluate_condition("${inputs.type} == 'text'", &ctx).unwrap());
}
}