feat: 新增技能编排引擎和工作流构建器组件
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor: 统一Hands系统常量到单个源文件 refactor: 更新Hands中文名称和描述 fix: 修复技能市场在连接状态变化时重新加载 fix: 修复身份变更提案的错误处理逻辑 docs: 更新多个功能文档的验证状态和实现位置 docs: 更新Hands系统文档 test: 添加测试文件验证工作区路径
This commit is contained in:
319
crates/zclaw-skills/src/orchestration/executor.rs
Normal file
319
crates/zclaw-skills/src/orchestration/executor.rs
Normal file
@@ -0,0 +1,319 @@
|
||||
//! Orchestration executor
|
||||
//!
|
||||
//! Executes skill graphs with parallel execution, data passing,
|
||||
//! error handling, and progress tracking.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::RwLock;
|
||||
use serde_json::Value;
|
||||
use zclaw_types::Result;
|
||||
|
||||
use crate::{SkillRegistry, SkillContext};
|
||||
use super::{
|
||||
SkillGraph, OrchestrationPlan, OrchestrationResult, NodeResult,
|
||||
OrchestrationProgress, ErrorStrategy, OrchestrationContext,
|
||||
planner::OrchestrationPlanner,
|
||||
};
|
||||
|
||||
/// Skill graph executor trait
|
||||
#[async_trait::async_trait]
|
||||
pub trait SkillGraphExecutor: Send + Sync {
|
||||
/// Execute a skill graph with given inputs
|
||||
async fn execute(
|
||||
&self,
|
||||
graph: &SkillGraph,
|
||||
inputs: HashMap<String, Value>,
|
||||
context: &SkillContext,
|
||||
) -> Result<OrchestrationResult>;
|
||||
|
||||
/// Execute with progress callback
|
||||
async fn execute_with_progress<F>(
|
||||
&self,
|
||||
graph: &SkillGraph,
|
||||
inputs: HashMap<String, Value>,
|
||||
context: &SkillContext,
|
||||
progress_fn: F,
|
||||
) -> Result<OrchestrationResult>
|
||||
where
|
||||
F: Fn(OrchestrationProgress) + Send + Sync;
|
||||
|
||||
/// Execute a pre-built plan
|
||||
async fn execute_plan(
|
||||
&self,
|
||||
plan: &OrchestrationPlan,
|
||||
inputs: HashMap<String, Value>,
|
||||
context: &SkillContext,
|
||||
) -> Result<OrchestrationResult>;
|
||||
}
|
||||
|
||||
/// Default executor implementation
|
||||
pub struct DefaultExecutor {
|
||||
/// Skill registry for executing skills
|
||||
registry: Arc<SkillRegistry>,
|
||||
/// Cancellation tokens
|
||||
cancellations: RwLock<HashMap<String, bool>>,
|
||||
}
|
||||
|
||||
impl DefaultExecutor {
|
||||
pub fn new(registry: Arc<SkillRegistry>) -> Self {
|
||||
Self {
|
||||
registry,
|
||||
cancellations: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancel an ongoing orchestration
|
||||
pub async fn cancel(&self, graph_id: &str) {
|
||||
let mut cancellations = self.cancellations.write().await;
|
||||
cancellations.insert(graph_id.to_string(), true);
|
||||
}
|
||||
|
||||
/// Check if cancelled
|
||||
async fn is_cancelled(&self, graph_id: &str) -> bool {
|
||||
let cancellations = self.cancellations.read().await;
|
||||
cancellations.get(graph_id).copied().unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Execute a single node
|
||||
async fn execute_node(
|
||||
&self,
|
||||
node: &super::SkillNode,
|
||||
orch_context: &OrchestrationContext,
|
||||
skill_context: &SkillContext,
|
||||
) -> Result<NodeResult> {
|
||||
let start = Instant::now();
|
||||
let node_id = node.id.clone();
|
||||
|
||||
// Check condition
|
||||
if let Some(when) = &node.when {
|
||||
if !orch_context.evaluate_condition(when).unwrap_or(false) {
|
||||
return Ok(NodeResult {
|
||||
node_id,
|
||||
success: true,
|
||||
output: Value::Null,
|
||||
error: None,
|
||||
duration_ms: 0,
|
||||
retries: 0,
|
||||
skipped: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve input mappings
|
||||
let input = orch_context.resolve_node_input(node);
|
||||
|
||||
// Execute with retry
|
||||
let max_attempts = node.retry.as_ref()
|
||||
.map(|r| r.max_attempts)
|
||||
.unwrap_or(1);
|
||||
let delay_ms = node.retry.as_ref()
|
||||
.map(|r| r.delay_ms)
|
||||
.unwrap_or(1000);
|
||||
|
||||
let mut last_error = None;
|
||||
let mut attempts = 0;
|
||||
|
||||
for attempt in 0..max_attempts {
|
||||
attempts = attempt + 1;
|
||||
|
||||
// Apply timeout if specified
|
||||
let result = if let Some(timeout_secs) = node.timeout_secs {
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(timeout_secs),
|
||||
self.registry.execute(&node.skill_id, skill_context, input.clone())
|
||||
).await
|
||||
.map_err(|_| zclaw_types::ZclawError::Timeout(format!(
|
||||
"Node {} timed out after {}s",
|
||||
node.id, timeout_secs
|
||||
)))?
|
||||
} else {
|
||||
self.registry.execute(&node.skill_id, skill_context, input.clone()).await
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(skill_result) if skill_result.success => {
|
||||
return Ok(NodeResult {
|
||||
node_id,
|
||||
success: true,
|
||||
output: skill_result.output,
|
||||
error: None,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
retries: attempt,
|
||||
skipped: false,
|
||||
});
|
||||
}
|
||||
Ok(skill_result) => {
|
||||
last_error = skill_result.error;
|
||||
}
|
||||
Err(e) => {
|
||||
last_error = Some(e.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
// Delay before retry (except last attempt)
|
||||
if attempt < max_attempts - 1 {
|
||||
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// All retries failed
|
||||
Ok(NodeResult {
|
||||
node_id,
|
||||
success: false,
|
||||
output: Value::Null,
|
||||
error: last_error,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
retries: attempts - 1,
|
||||
skipped: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SkillGraphExecutor for DefaultExecutor {
|
||||
async fn execute(
|
||||
&self,
|
||||
graph: &SkillGraph,
|
||||
inputs: HashMap<String, Value>,
|
||||
context: &SkillContext,
|
||||
) -> Result<OrchestrationResult> {
|
||||
// Build plan first
|
||||
let plan = super::DefaultPlanner::new().plan(graph)?;
|
||||
self.execute_plan(&plan, inputs, context).await
|
||||
}
|
||||
|
||||
async fn execute_with_progress<F>(
|
||||
&self,
|
||||
graph: &SkillGraph,
|
||||
inputs: HashMap<String, Value>,
|
||||
context: &SkillContext,
|
||||
progress_fn: F,
|
||||
) -> Result<OrchestrationResult>
|
||||
where
|
||||
F: Fn(OrchestrationProgress) + Send + Sync,
|
||||
{
|
||||
let plan = super::DefaultPlanner::new().plan(graph)?;
|
||||
|
||||
let start = Instant::now();
|
||||
let mut orch_context = OrchestrationContext::new(graph, inputs);
|
||||
let mut node_results: HashMap<String, NodeResult> = HashMap::new();
|
||||
let mut progress = OrchestrationProgress::new(&graph.id, graph.nodes.len());
|
||||
|
||||
// Execute parallel groups
|
||||
for group in &plan.parallel_groups {
|
||||
if self.is_cancelled(&graph.id).await {
|
||||
return Ok(OrchestrationResult {
|
||||
success: false,
|
||||
output: Value::Null,
|
||||
node_results,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error: Some("Cancelled".to_string()),
|
||||
});
|
||||
}
|
||||
|
||||
// Execute nodes in parallel within the group
|
||||
for node_id in group {
|
||||
if let Some(node) = graph.nodes.iter().find(|n| &n.id == node_id) {
|
||||
progress.current_node = Some(node_id.clone());
|
||||
progress_fn(progress.clone());
|
||||
|
||||
let result = self.execute_node(node, &orch_context, context).await
|
||||
.unwrap_or_else(|e| NodeResult {
|
||||
node_id: node_id.clone(),
|
||||
success: false,
|
||||
output: Value::Null,
|
||||
error: Some(e.to_string()),
|
||||
duration_ms: 0,
|
||||
retries: 0,
|
||||
skipped: false,
|
||||
});
|
||||
node_results.insert(node_id.clone(), result);
|
||||
}
|
||||
}
|
||||
|
||||
// Update context with node outputs
|
||||
for node_id in group {
|
||||
if let Some(result) = node_results.get(node_id) {
|
||||
if result.success {
|
||||
orch_context.set_node_output(node_id, result.output.clone());
|
||||
progress.completed_nodes.push(node_id.clone());
|
||||
} else {
|
||||
progress.failed_nodes.push(node_id.clone());
|
||||
|
||||
// Handle error based on strategy
|
||||
match graph.on_error {
|
||||
ErrorStrategy::Stop => {
|
||||
// Clone error before moving node_results
|
||||
let error = result.error.clone();
|
||||
return Ok(OrchestrationResult {
|
||||
success: false,
|
||||
output: Value::Null,
|
||||
node_results,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error,
|
||||
});
|
||||
}
|
||||
ErrorStrategy::Continue => {
|
||||
// Continue to next group
|
||||
}
|
||||
ErrorStrategy::Retry => {
|
||||
// Already handled in execute_node
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update progress
|
||||
progress.progress_percent = ((progress.completed_nodes.len() + progress.failed_nodes.len())
|
||||
* 100 / graph.nodes.len()) as u8;
|
||||
progress.status = format!("Completed group with {} nodes", group.len());
|
||||
progress_fn(progress.clone());
|
||||
}
|
||||
|
||||
// Build final output
|
||||
let output = orch_context.build_output(&graph.output_mapping);
|
||||
|
||||
let success = progress.failed_nodes.is_empty();
|
||||
|
||||
Ok(OrchestrationResult {
|
||||
success,
|
||||
output,
|
||||
node_results,
|
||||
duration_ms: start.elapsed().as_millis() as u64,
|
||||
error: if success { None } else { Some("Some nodes failed".to_string()) },
|
||||
})
|
||||
}
|
||||
|
||||
async fn execute_plan(
|
||||
&self,
|
||||
plan: &OrchestrationPlan,
|
||||
inputs: HashMap<String, Value>,
|
||||
context: &SkillContext,
|
||||
) -> Result<OrchestrationResult> {
|
||||
self.execute_with_progress(&plan.graph, inputs, context, |_| {}).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_node_result_success() {
|
||||
let result = NodeResult {
|
||||
node_id: "test".to_string(),
|
||||
success: true,
|
||||
output: serde_json::json!({"data": "value"}),
|
||||
error: None,
|
||||
duration_ms: 100,
|
||||
retries: 0,
|
||||
skipped: false,
|
||||
};
|
||||
|
||||
assert!(result.success);
|
||||
assert_eq!(result.node_id, "test");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user