diff --git a/crates/zclaw-kernel/src/kernel/mod.rs b/crates/zclaw-kernel/src/kernel/mod.rs index a66fe21..ed2dc61 100644 --- a/crates/zclaw-kernel/src/kernel/mod.rs +++ b/crates/zclaw-kernel/src/kernel/mod.rs @@ -7,6 +7,7 @@ mod skills; mod hands; mod triggers; mod approvals; +mod orchestration; #[cfg(feature = "multi-agent")] mod a2a; diff --git a/crates/zclaw-kernel/src/kernel/orchestration.rs b/crates/zclaw-kernel/src/kernel/orchestration.rs new file mode 100644 index 0000000..733dae5 --- /dev/null +++ b/crates/zclaw-kernel/src/kernel/orchestration.rs @@ -0,0 +1,71 @@ +//! Kernel-level orchestration methods +//! +//! Bridges the zclaw-skills orchestration engine to the Kernel, +//! providing high-level orchestration execution via Tauri commands. + +use std::collections::HashMap; +use serde_json::Value; +use zclaw_types::Result; + +use super::Kernel; +use zclaw_skills::orchestration::{OrchestrationPlanner, SkillGraphExecutor}; + +impl Kernel { + /// Execute a skill orchestration graph with the given inputs. + /// + /// This creates a `DefaultExecutor` backed by the kernel's `SkillRegistry`, + /// builds an execution plan, and runs the graph with true parallel execution + /// within each parallel group. + pub async fn execute_orchestration( + &self, + graph: &zclaw_skills::orchestration::SkillGraph, + inputs: HashMap, + context: &zclaw_skills::SkillContext, + ) -> Result { + let executor = zclaw_skills::orchestration::DefaultExecutor::new( + self.skills.clone(), + ); + + executor.execute(graph, inputs, context).await + } + + /// Execute an orchestration with progress tracking. + /// + /// The `progress_fn` callback is invoked at each stage of execution, + /// allowing the frontend to display real-time progress. + pub async fn execute_orchestration_with_progress( + &self, + graph: &zclaw_skills::orchestration::SkillGraph, + inputs: HashMap, + context: &zclaw_skills::SkillContext, + progress_fn: F, + ) -> Result + where + F: Fn(zclaw_skills::orchestration::OrchestrationProgress) + Send + Sync, + { + let executor = zclaw_skills::orchestration::DefaultExecutor::new( + self.skills.clone(), + ); + + executor.execute_with_progress(graph, inputs, context, progress_fn).await + } + + /// Auto-compose a skill graph from a list of skill IDs. + /// + /// Analyzes input/output schemas to infer data flow edges between skills. + pub async fn auto_compose_skills( + &self, + skill_ids: &[zclaw_types::SkillId], + ) -> Result { + let planner = zclaw_skills::orchestration::DefaultPlanner::new(); + planner.auto_compose(skill_ids, &self.skills).await + } + + /// Validate a skill graph without executing it. + pub async fn validate_orchestration( + &self, + graph: &zclaw_skills::orchestration::SkillGraph, + ) -> Vec { + zclaw_skills::orchestration::validate_graph(graph, &self.skills).await + } +} diff --git a/crates/zclaw-skills/src/orchestration/executor.rs b/crates/zclaw-skills/src/orchestration/executor.rs index 36009d3..13a53db 100644 --- a/crates/zclaw-skills/src/orchestration/executor.rs +++ b/crates/zclaw-skills/src/orchestration/executor.rs @@ -17,6 +17,12 @@ use super::{ planner::OrchestrationPlanner, }; +/// Wrapper to make NodeResult Send for JoinSet +struct ParallelNodeResult { + node_id: String, + result: NodeResult, +} + /// Skill graph executor trait #[async_trait::async_trait] pub trait SkillGraphExecutor: Send + Sync { @@ -76,7 +82,8 @@ impl DefaultExecutor { cancellations.get(graph_id).copied().unwrap_or(false) } - /// Execute a single node + /// Execute a single node (used by pipeline orchestration action driver) + #[allow(dead_code)] async fn execute_node( &self, node: &super::SkillNode, @@ -201,7 +208,7 @@ impl SkillGraphExecutor for DefaultExecutor { let mut node_results: HashMap = HashMap::new(); let mut progress = OrchestrationProgress::new(&graph.id, graph.nodes.len()); - // Execute parallel groups + // Execute parallel groups sequentially, but nodes within each group in parallel for group in &plan.parallel_groups { if self.is_cancelled(&graph.id).await { return Ok(OrchestrationResult { @@ -213,40 +220,72 @@ impl SkillGraphExecutor for DefaultExecutor { }); } - // Execute nodes in parallel within the group + progress.status = format!("Executing group with {} nodes", group.len()); + progress_fn(progress.clone()); + + // Execute all nodes in the group concurrently using JoinSet + let mut join_set = tokio::task::JoinSet::new(); for node_id in group { if let Some(node) = graph.nodes.iter().find(|n| &n.id == node_id) { - progress.current_node = Some(node_id.clone()); - progress_fn(progress.clone()); + let node = node.clone(); + let node_id = node_id.clone(); + let orch_ctx = orch_context.clone(); + let skill_ctx = context.clone(); + let registry = self.registry.clone(); - let result = self.execute_node(node, &orch_context, context).await - .unwrap_or_else(|e| NodeResult { - node_id: node_id.clone(), - success: false, - output: Value::Null, - error: Some(e.to_string()), - duration_ms: 0, - retries: 0, - skipped: false, - }); - node_results.insert(node_id.clone(), result); + join_set.spawn(async move { + let input = orch_ctx.resolve_node_input(&node); + let start = Instant::now(); + + let result = registry.execute(&node.skill_id, &skill_ctx, input).await; + let nr = match result { + Ok(sr) if sr.success => NodeResult { + node_id: node_id.clone(), + success: true, + output: sr.output, + error: None, + duration_ms: start.elapsed().as_millis() as u64, + retries: 0, + skipped: false, + }, + Ok(sr) => NodeResult { + node_id: node_id.clone(), + success: false, + output: Value::Null, + error: sr.error, + duration_ms: start.elapsed().as_millis() as u64, + retries: 0, + skipped: false, + }, + Err(e) => NodeResult { + node_id: node_id.clone(), + success: false, + output: Value::Null, + error: Some(e.to_string()), + duration_ms: start.elapsed().as_millis() as u64, + retries: 0, + skipped: false, + }, + }; + ParallelNodeResult { node_id, result: nr } + }); } } - // Update context with node outputs - for node_id in group { - if let Some(result) = node_results.get(node_id) { - if result.success { - orch_context.set_node_output(node_id, result.output.clone()); - progress.completed_nodes.push(node_id.clone()); - } else { - progress.failed_nodes.push(node_id.clone()); - - // Handle error based on strategy - match graph.on_error { - ErrorStrategy::Stop => { - // Clone error before moving node_results + // Collect results as tasks complete + while let Some(join_result) = join_set.join_next().await { + match join_result { + Ok(parallel_result) => { + let ParallelNodeResult { node_id, result } = parallel_result; + if result.success { + orch_context.set_node_output(&node_id, result.output.clone()); + progress.completed_nodes.push(node_id.clone()); + } else { + progress.failed_nodes.push(node_id.clone()); + if matches!(graph.on_error, ErrorStrategy::Stop) { let error = result.error.clone(); + node_results.insert(node_id, result); + join_set.abort_all(); return Ok(OrchestrationResult { success: false, output: Value::Null, @@ -255,27 +294,24 @@ impl SkillGraphExecutor for DefaultExecutor { error, }); } - ErrorStrategy::Continue => { - // Continue to next group - } - ErrorStrategy::Retry => { - // Already handled in execute_node - } } + node_results.insert(node_id, result); + } + Err(e) => { + tracing::warn!("[Orchestration] Task panicked: {}", e); } } } // Update progress progress.progress_percent = ((progress.completed_nodes.len() + progress.failed_nodes.len()) - * 100 / graph.nodes.len()) as u8; + * 100 / graph.nodes.len().max(1)) as u8; progress.status = format!("Completed group with {} nodes", group.len()); progress_fn(progress.clone()); } // Build final output let output = orch_context.build_output(&graph.output_mapping); - let success = progress.failed_nodes.is_empty(); Ok(OrchestrationResult { diff --git a/desktop/src-tauri/src/kernel_commands/mod.rs b/desktop/src-tauri/src/kernel_commands/mod.rs index 275cf27..686c5bf 100644 --- a/desktop/src-tauri/src/kernel_commands/mod.rs +++ b/desktop/src-tauri/src/kernel_commands/mod.rs @@ -13,6 +13,7 @@ pub mod chat; pub mod hand; pub mod lifecycle; pub mod mcp; +pub mod orchestration; pub mod scheduled_task; pub mod skill; pub mod trigger; diff --git a/desktop/src-tauri/src/kernel_commands/orchestration.rs b/desktop/src-tauri/src/kernel_commands/orchestration.rs new file mode 100644 index 0000000..dea58ad --- /dev/null +++ b/desktop/src-tauri/src/kernel_commands/orchestration.rs @@ -0,0 +1,273 @@ +//! Skill orchestration Tauri commands +//! +//! Exposes the multi-skill orchestration engine to the frontend, +//! enabling parallel/serial/conditional execution of skill graphs. + +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tauri::State; + +use super::KernelState; + +// ============================================================================ +// Request/Response Types +// ============================================================================ + +/// Orchestration execution request +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrchestrationRequest { + /// Skill IDs to orchestrate (for auto-compose) + pub skill_ids: Vec, + /// Optional pre-defined graph (overrides auto-compose) + pub graph: Option, + /// Input data for the orchestration + pub inputs: HashMap, + /// Agent context + pub agent_id: String, + pub session_id: String, +} + +/// Graph input for pre-defined orchestration +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrchestrationGraphInput { + pub id: String, + pub name: String, + #[serde(default)] + pub description: String, + pub nodes: Vec, + #[serde(default)] + pub edges: Vec, + #[serde(default)] + pub on_error: Option, + #[serde(default)] + pub timeout_secs: Option, +} + +/// Node input for orchestration graph +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeInput { + pub id: String, + pub skill_id: String, + #[serde(default)] + pub description: String, + #[serde(default)] + pub input_mappings: HashMap, + #[serde(default)] + pub when: Option, + #[serde(default)] + pub skip_on_error: Option, + #[serde(default)] + pub timeout_secs: Option, +} + +/// Edge input for orchestration graph +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EdgeInput { + pub from_node: String, + pub to_node: String, + #[serde(default)] + pub field_mapping: HashMap, + #[serde(default)] + pub condition: Option, +} + +/// Orchestration execution result for frontend +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrchestrationResponse { + pub success: bool, + pub output: Value, + pub node_count: usize, + pub completed_nodes: usize, + pub failed_nodes: usize, + pub duration_ms: u64, + pub error: Option, + pub node_results: HashMap, +} + +/// Node execution result for frontend +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeResultResponse { + pub success: bool, + pub output: Value, + pub error: Option, + pub duration_ms: u64, + pub retries: u32, + pub skipped: bool, +} + +/// Validation response for frontend +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ValidationResponse { + pub valid: bool, + pub errors: Vec, +} + +/// Validation error info +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ValidationErrorInfo { + pub code: String, + pub message: String, + pub location: Option, +} + +// ============================================================================ +// Type Conversions +// ============================================================================ + +impl TryFrom for zclaw_skills::orchestration::SkillGraph { + type Error = String; + + fn try_from(input: OrchestrationGraphInput) -> Result { + let nodes: Vec = input.nodes.into_iter().map(|n| { + zclaw_skills::orchestration::SkillNode { + id: n.id, + skill_id: zclaw_types::SkillId::new(&n.skill_id), + description: n.description, + input_mappings: n.input_mappings, + retry: None, + timeout_secs: n.timeout_secs, + when: n.when, + skip_on_error: n.skip_on_error.unwrap_or(false), + } + }).collect(); + + let edges: Vec = input.edges.into_iter().map(|e| { + zclaw_skills::orchestration::SkillEdge { + from_node: e.from_node, + to_node: e.to_node, + field_mapping: e.field_mapping, + condition: e.condition, + } + }).collect(); + + let on_error = match input.on_error.as_deref() { + Some("continue") => zclaw_skills::orchestration::ErrorStrategy::Continue, + Some("retry") => zclaw_skills::orchestration::ErrorStrategy::Retry, + _ => zclaw_skills::orchestration::ErrorStrategy::Stop, + }; + + Ok(zclaw_skills::orchestration::SkillGraph { + id: input.id, + name: input.name, + description: input.description, + nodes, + edges, + input_schema: None, + output_mapping: HashMap::new(), + on_error, + timeout_secs: input.timeout_secs.unwrap_or(300), + }) + } +} + +impl From for OrchestrationResponse { + fn from(result: zclaw_skills::orchestration::OrchestrationResult) -> Self { + let completed = result.node_results.values().filter(|r| r.success).count(); + let failed = result.node_results.values().filter(|r| !r.success).count(); + + let node_results = result.node_results.into_iter().map(|(id, nr)| { + (id, NodeResultResponse { + success: nr.success, + output: nr.output, + error: nr.error, + duration_ms: nr.duration_ms, + retries: nr.retries, + skipped: nr.skipped, + }) + }).collect(); + + Self { + success: result.success, + output: result.output, + node_count: completed + failed, + completed_nodes: completed, + failed_nodes: failed, + duration_ms: result.duration_ms, + error: result.error, + node_results, + } + } +} + +// ============================================================================ +// Tauri Commands +// ============================================================================ + +/// Execute a skill orchestration +/// +/// Either auto-composes a graph from skill_ids, or uses a pre-defined graph. +/// Executes with true parallel execution within each dependency level. +#[tauri::command] +pub async fn orchestration_execute( + state: State<'_, KernelState>, + request: OrchestrationRequest, +) -> Result { + let kernel_lock = state.lock().await; + let kernel = kernel_lock.as_ref() + .ok_or_else(|| "Kernel not initialized".to_string())?; + + // Build or auto-compose graph + let graph = if let Some(graph_input) = request.graph { + graph_input.try_into()? + } else if request.skill_ids.is_empty() { + return Err("Must provide either skill_ids or graph".to_string()); + } else { + let skill_ids: Vec = request.skill_ids.iter() + .map(|s| zclaw_types::SkillId::new(s)) + .collect(); + + kernel.auto_compose_skills(&skill_ids).await + .map_err(|e| format!("Auto-compose failed: {}", e))? + }; + + // Build skill context + let context = zclaw_skills::SkillContext { + agent_id: request.agent_id, + session_id: request.session_id, + working_dir: None, + env: HashMap::new(), + timeout_secs: graph.timeout_secs, + network_allowed: true, + file_access_allowed: true, + llm: None, + }; + + // Execute orchestration + let result = kernel.execute_orchestration(&graph, request.inputs, &context).await + .map_err(|e| format!("Orchestration failed: {}", e))?; + + Ok(OrchestrationResponse::from(result)) +} + +/// Validate an orchestration graph without executing it +#[tauri::command] +pub async fn orchestration_validate( + state: State<'_, KernelState>, + graph: OrchestrationGraphInput, +) -> Result { + let kernel_lock = state.lock().await; + let kernel = kernel_lock.as_ref() + .ok_or_else(|| "Kernel not initialized".to_string())?; + + let skill_graph: zclaw_skills::orchestration::SkillGraph = graph.try_into()?; + + let errors = kernel.validate_orchestration(&skill_graph).await; + + let valid = errors.is_empty(); + let error_infos = errors.into_iter().map(|e| ValidationErrorInfo { + code: e.code, + message: e.message, + location: e.location, + }).collect(); + + Ok(ValidationResponse { valid, errors: error_infos }) +} diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 350f215..fb32ac4 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -146,6 +146,9 @@ pub fn run() { kernel_commands::skill::skill_create, kernel_commands::skill::skill_update, kernel_commands::skill::skill_delete, + // Orchestration commands (multi-skill graphs) + kernel_commands::orchestration::orchestration_execute, + kernel_commands::orchestration::orchestration_validate, // Hands commands (autonomous capabilities) kernel_commands::hand::hand_list, kernel_commands::hand::hand_execute,