//! Pipeline discovery, listing, running, and monitoring commands. use std::sync::Arc; use tauri::{AppHandle, Emitter, State}; use zclaw_pipeline::{ RunStatus, parse_pipeline_yaml, PipelineExecutor, ActionRegistry, LlmActionDriver, SkillActionDriver, HandActionDriver, }; use super::{PipelineState, PipelineInfo, PipelineRunResponse, RunPipelineResponse, RunPipelineRequest}; use super::adapters::{RuntimeLlmAdapter, PipelineSkillDriver, PipelineHandDriver}; use super::helpers::{get_pipelines_directory, scan_pipelines_with_paths, scan_pipelines_full_sync, pipeline_to_info}; use crate::kernel_commands::KernelState; /// Discover and list all available pipelines // @reserved: pipeline workflow management // @connected #[tauri::command] pub async fn pipeline_list( state: State<'_, Arc>, category: Option, industry: Option, ) -> Result, String> { // Get pipelines directory let pipelines_dir = get_pipelines_directory()?; tracing::debug!("[pipeline_list] Scanning directory: {:?}", pipelines_dir); tracing::debug!("[pipeline_list] Filters - category: {:?}, industry: {:?}", category, industry); // Scan for pipeline files (returns both info and paths) let mut pipelines_with_paths: Vec<(PipelineInfo, std::path::PathBuf)> = Vec::new(); if pipelines_dir.exists() { scan_pipelines_with_paths(&pipelines_dir, category.as_deref(), industry.as_deref(), &mut pipelines_with_paths)?; } else { tracing::warn!("[WARN pipeline_list] Pipelines directory does not exist: {:?}", pipelines_dir); } tracing::debug!("[pipeline_list] Found {} pipelines", pipelines_with_paths.len()); // Debug: log all pipelines with their industry values for (info, _) in &pipelines_with_paths { tracing::debug!("[pipeline_list] Pipeline: {} -> category: {}, industry: '{}'", info.id, info.category, info.industry); } // Update state let mut state_pipelines = state.pipelines.write().await; let mut state_paths = state.pipeline_paths.write().await; let mut result = Vec::new(); for (info, path) in &pipelines_with_paths { // Load full pipeline into state if let Ok(content) = std::fs::read_to_string(path) { if let Ok(pipeline) = parse_pipeline_yaml(&content) { state_pipelines.insert(info.id.clone(), pipeline); state_paths.insert(info.id.clone(), path.clone()); } // v2 pipelines are listed but not stored in v1 state; // they can be discovered and displayed but execution requires v2 engine support. } result.push(info.clone()); } Ok(result) } /// Get pipeline details // @reserved: pipeline workflow management // @connected #[tauri::command] pub async fn pipeline_get( state: State<'_, Arc>, pipeline_id: String, ) -> Result { let pipelines = state.pipelines.read().await; let pipeline = pipelines.get(&pipeline_id) .ok_or_else(|| format!("Pipeline not found: {}", pipeline_id))?; Ok(pipeline_to_info(pipeline)) } /// Run a pipeline // @reserved: pipeline workflow management // @connected #[tauri::command] pub async fn pipeline_run( app: AppHandle, state: State<'_, Arc>, kernel_state: State<'_, KernelState>, request: RunPipelineRequest, ) -> Result { tracing::debug!("[pipeline_run] Received request for pipeline_id: {}", request.pipeline_id); // Get pipeline let pipelines = state.pipelines.read().await; tracing::debug!("[pipeline_run] State has {} pipelines loaded", pipelines.len()); // Debug: list all loaded pipeline IDs for (id, _) in pipelines.iter() { tracing::debug!("[pipeline_run] Loaded pipeline: {}", id); } let pipeline = pipelines.get(&request.pipeline_id) .ok_or_else(|| { println!("[ERROR pipeline_run] Pipeline '{}' not found in state. Available: {:?}", request.pipeline_id, pipelines.keys().collect::>()); format!("Pipeline not found: {}", request.pipeline_id) })? .clone(); drop(pipelines); // Try to get LLM driver from Kernel let (llm_driver, skill_driver, hand_driver) = { let kernel_lock = kernel_state.lock().await; if let Some(kernel) = kernel_lock.as_ref() { tracing::debug!("[pipeline_run] Got LLM driver from Kernel"); let llm = Some(Arc::new(RuntimeLlmAdapter::new( kernel.driver(), Some(kernel.config().llm.model.clone()), )) as Arc); let kernel_arc = (*kernel_state).clone(); let skill = Some(Arc::new(PipelineSkillDriver::new(kernel_arc.clone())) as Arc); let hand = Some(Arc::new(PipelineHandDriver::new(kernel_arc)) as Arc); (llm, skill, hand) } else { tracing::debug!("[pipeline_run] Kernel not initialized, no drivers available"); (None, None, None) } }; // Create executor with all available drivers let executor = if let Some(driver) = llm_driver { let mut registry = ActionRegistry::new().with_llm_driver(driver); if let Some(skill) = skill_driver { registry = registry.with_skill_registry(skill); } if let Some(hand) = hand_driver { registry = registry.with_hand_registry(hand); } Arc::new(PipelineExecutor::new(Arc::new(registry))) } else { state.executor.clone() }; // Generate run ID upfront so we can return it to the caller let run_id = uuid::Uuid::new_v4().to_string(); let pipeline_id = request.pipeline_id.clone(); let inputs = request.inputs.clone(); // Clone for async task let run_id_for_spawn = run_id.clone(); // Run pipeline in background with the known run_id tokio::spawn(async move { tracing::debug!("[pipeline_run] Starting execution with run_id: {}", run_id_for_spawn); let result = executor.execute_with_id(&pipeline, inputs, &run_id_for_spawn).await; tracing::debug!("[pipeline_run] Execution completed for run_id: {}, status: {:?}", run_id_for_spawn, result.as_ref().map(|r| r.status.clone()).unwrap_or(RunStatus::Failed)); // Emit completion event let _ = app.emit("pipeline-complete", &PipelineRunResponse { run_id: run_id_for_spawn.clone(), pipeline_id: pipeline_id.clone(), status: match &result { Ok(r) => r.status.to_string(), Err(_) => "failed".to_string(), }, current_step: None, percentage: 100, message: match &result { Ok(_) => "Pipeline completed".to_string(), Err(e) => e.to_string(), }, outputs: result.as_ref().ok().and_then(|r| r.outputs.clone()), error: result.as_ref().err().map(|e| e.to_string()), started_at: result.as_ref().map(|r| r.started_at.to_rfc3339()).unwrap_or_else(|_| chrono::Utc::now().to_rfc3339()), ended_at: result.as_ref().map(|r| r.ended_at.map(|t| t.to_rfc3339())).unwrap_or_else(|_| Some(chrono::Utc::now().to_rfc3339())), }); }); // Return immediately with the known run ID tracing::debug!("[pipeline_run] Returning run_id: {} to caller", run_id); Ok(RunPipelineResponse { run_id, pipeline_id: request.pipeline_id, status: "running".to_string(), }) } /// Get pipeline run progress // @reserved: pipeline workflow management // @connected #[tauri::command] pub async fn pipeline_progress( state: State<'_, Arc>, run_id: String, ) -> Result { let progress = state.executor.get_progress(&run_id).await .ok_or_else(|| format!("Run not found: {}", run_id))?; let run = state.executor.get_run(&run_id).await; Ok(PipelineRunResponse { run_id: progress.run_id, pipeline_id: run.as_ref().map(|r| r.pipeline_id.clone()).unwrap_or_default(), status: progress.status.to_string(), current_step: Some(progress.current_step), percentage: progress.percentage, message: progress.message, outputs: run.as_ref().and_then(|r| r.outputs.clone()), error: run.as_ref().and_then(|r| r.error.clone()), started_at: run.as_ref().map(|r| r.started_at.to_rfc3339()).unwrap_or_default(), ended_at: run.as_ref().and_then(|r| r.ended_at.map(|t| t.to_rfc3339())), }) } /// Cancel a pipeline run // @connected #[tauri::command] pub async fn pipeline_cancel( state: State<'_, Arc>, run_id: String, ) -> Result<(), String> { state.executor.cancel(&run_id).await; Ok(()) } /// Get pipeline run result // @reserved: pipeline workflow management // @connected #[tauri::command] pub async fn pipeline_result( state: State<'_, Arc>, run_id: String, ) -> Result { let run = state.executor.get_run(&run_id).await .ok_or_else(|| format!("Run not found: {}", run_id))?; let current_step = run.current_step.clone(); let status = run.status.clone(); Ok(PipelineRunResponse { run_id: run.id, pipeline_id: run.pipeline_id, status: status.to_string(), current_step: current_step.clone(), percentage: if status == RunStatus::Completed { 100 } else { 0 }, message: current_step.unwrap_or_default(), outputs: run.outputs, error: run.error, started_at: run.started_at.to_rfc3339(), ended_at: run.ended_at.map(|t| t.to_rfc3339()), }) } /// List all runs // @reserved: pipeline workflow management // @connected #[tauri::command] pub async fn pipeline_runs( state: State<'_, Arc>, ) -> Result, String> { let runs = state.executor.list_runs().await; Ok(runs.into_iter().map(|run| { let current_step = run.current_step.clone(); let status = run.status.clone(); PipelineRunResponse { run_id: run.id, pipeline_id: run.pipeline_id, status: status.to_string(), current_step: current_step.clone(), percentage: if status == RunStatus::Completed { 100 } else if status == RunStatus::Running { 50 } else { 0 }, message: current_step.unwrap_or_default(), outputs: run.outputs, error: run.error, started_at: run.started_at.to_rfc3339(), ended_at: run.ended_at.map(|t| t.to_rfc3339()), } }).collect()) } /// Refresh pipeline discovery // @reserved: pipeline workflow management // @connected #[tauri::command] pub async fn pipeline_refresh( state: State<'_, Arc>, ) -> Result, String> { let pipelines_dir = get_pipelines_directory()?; if !pipelines_dir.exists() { std::fs::create_dir_all(&pipelines_dir) .map_err(|e| format!("Failed to create pipelines directory: {}", e))?; } let mut state_pipelines = state.pipelines.write().await; let mut state_paths = state.pipeline_paths.write().await; // Clear existing state_pipelines.clear(); state_paths.clear(); // Scan and load all pipelines (synchronous) let mut pipelines = Vec::new(); scan_pipelines_full_sync(&pipelines_dir, &mut pipelines)?; for (path, pipeline) in &pipelines { let id = pipeline.metadata.name.clone(); state_pipelines.insert(id.clone(), pipeline.clone()); state_paths.insert(id, path.clone()); } Ok(pipelines.into_iter().map(|(_, p)| pipeline_to_info(&p)).collect()) }