Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- 将 intelligence/llm/memory/browser 模块的 dead_code 注释从模糊的 "reserved for future" 改为明确说明 Tauri invoke_handler 运行时注册机制 - 为 identity.rs 中 3 个真正未使用的方法添加 #[allow(dead_code)] - 实现 compactor use_llm: true 功能:新增 compact_with_llm 方法和 compactor_compact_llm Tauri 命令,支持 LLM 驱动的对话摘要生成 - 将 pipeline_commands.rs 中 40+ 处 println!/eprintln! 调试输出替换为 tracing::debug!/warn!/error! 结构化日志 - 移除 intelligence/mod.rs 中不必要的 #[allow(unused_imports)]
944 lines
33 KiB
Rust
944 lines
33 KiB
Rust
//! Pipeline commands for Tauri
|
|
//!
|
|
//! Commands for discovering, running, and monitoring Pipelines.
|
|
|
|
use std::collections::HashMap;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tauri::{AppHandle, Emitter, State};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::sync::RwLock;
|
|
use serde_json::Value;
|
|
use async_trait::async_trait;
|
|
use secrecy::SecretString;
|
|
|
|
use zclaw_pipeline::{
|
|
Pipeline, RunStatus,
|
|
parse_pipeline_yaml,
|
|
PipelineExecutor,
|
|
ActionRegistry,
|
|
LlmActionDriver,
|
|
};
|
|
use zclaw_runtime::{LlmDriver, CompletionRequest};
|
|
|
|
use crate::kernel_commands::KernelState;
|
|
|
|
/// Adapter to connect zclaw-runtime LlmDriver to zclaw-pipeline LlmActionDriver
|
|
pub struct RuntimeLlmAdapter {
|
|
driver: Arc<dyn LlmDriver>,
|
|
default_model: String,
|
|
}
|
|
|
|
impl RuntimeLlmAdapter {
|
|
pub fn new(driver: Arc<dyn LlmDriver>, default_model: Option<String>) -> Self {
|
|
Self {
|
|
driver,
|
|
default_model: default_model.unwrap_or_else(|| "claude-3-sonnet-20240229".to_string()),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl LlmActionDriver for RuntimeLlmAdapter {
|
|
async fn generate(
|
|
&self,
|
|
prompt: String,
|
|
input: HashMap<String, Value>,
|
|
model: Option<String>,
|
|
temperature: Option<f32>,
|
|
max_tokens: Option<u32>,
|
|
json_mode: bool,
|
|
) -> Result<Value, String> {
|
|
tracing::debug!("[RuntimeLlmAdapter] generate called with prompt length: {}", prompt.len());
|
|
tracing::debug!("[RuntimeLlmAdapter] input HashMap contents:");
|
|
for (k, v) in &input {
|
|
println!(" {} => {}", k, v);
|
|
}
|
|
|
|
// Build user content from prompt and input
|
|
let user_content = if input.is_empty() {
|
|
tracing::debug!("[RuntimeLlmAdapter] WARNING: input is empty, using raw prompt");
|
|
prompt.clone()
|
|
} else {
|
|
// Inject input values into prompt
|
|
// Support multiple placeholder formats: {{key}}, {{ key }}, ${key}, ${inputs.key}
|
|
let mut rendered = prompt.clone();
|
|
tracing::debug!("[RuntimeLlmAdapter] Original prompt (first 500 chars): {}", &prompt[..prompt.len().min(500)]);
|
|
for (key, value) in &input {
|
|
let str_value = if let Some(s) = value.as_str() {
|
|
s.to_string()
|
|
} else {
|
|
value.to_string()
|
|
};
|
|
|
|
tracing::debug!("[RuntimeLlmAdapter] Replacing '{}' with '{}'", key, str_value);
|
|
|
|
// Replace all common placeholder formats
|
|
rendered = rendered.replace(&format!("{{{{{key}}}}}"), &str_value); // {{key}}
|
|
rendered = rendered.replace(&format!("{{{{ {key} }}}}"), &str_value); // {{ key }}
|
|
rendered = rendered.replace(&format!("${{{key}}}"), &str_value); // ${key}
|
|
rendered = rendered.replace(&format!("${{inputs.{key}}}"), &str_value); // ${inputs.key}
|
|
}
|
|
tracing::debug!("[RuntimeLlmAdapter] Rendered prompt (first 500 chars): {}", &rendered[..rendered.len().min(500)]);
|
|
rendered
|
|
};
|
|
|
|
// Create message using zclaw_types::Message enum
|
|
let messages = vec![zclaw_types::Message::user(user_content)];
|
|
|
|
let request = CompletionRequest {
|
|
model: model.unwrap_or_else(|| self.default_model.clone()),
|
|
system: None,
|
|
messages,
|
|
tools: Vec::new(),
|
|
max_tokens,
|
|
temperature,
|
|
stop: Vec::new(),
|
|
stream: false,
|
|
};
|
|
|
|
let response = self.driver.complete(request)
|
|
.await
|
|
.map_err(|e| format!("LLM completion failed: {}", e))?;
|
|
|
|
// Extract text from response
|
|
let text = response.content.iter()
|
|
.find_map(|block| match block {
|
|
zclaw_runtime::ContentBlock::Text { text } => Some(text.clone()),
|
|
_ => None,
|
|
})
|
|
.unwrap_or_default();
|
|
|
|
// Safe truncation for UTF-8 strings
|
|
let truncated: String = text.chars().take(1000).collect();
|
|
tracing::debug!("[RuntimeLlmAdapter] LLM response text (first 1000 chars): {}", truncated);
|
|
|
|
// Parse as JSON if json_mode, otherwise return as string
|
|
if json_mode {
|
|
// Try to extract JSON from the response (LLM might wrap it in markdown code blocks)
|
|
let json_text = if text.contains("```json") {
|
|
// Extract JSON from markdown code block
|
|
let start = text.find("```json").map(|i| i + 7).unwrap_or(0);
|
|
let end = text.rfind("```").unwrap_or(text.len());
|
|
text[start..end].trim().to_string()
|
|
} else if text.contains("```") {
|
|
// Extract from generic code block
|
|
let start = text.find("```").map(|i| i + 3).unwrap_or(0);
|
|
let end = text.rfind("```").unwrap_or(text.len());
|
|
text[start..end].trim().to_string()
|
|
} else {
|
|
text.clone()
|
|
};
|
|
|
|
// Safe truncation for UTF-8 strings
|
|
let truncated_json: String = json_text.chars().take(500).collect();
|
|
tracing::debug!("[RuntimeLlmAdapter] JSON text to parse (first 500 chars): {}", truncated_json);
|
|
|
|
serde_json::from_str(&json_text)
|
|
.map_err(|e| {
|
|
tracing::debug!("[RuntimeLlmAdapter] JSON parse error: {}", e);
|
|
format!("Failed to parse LLM response as JSON: {}\nResponse: {}", e, json_text)
|
|
})
|
|
} else {
|
|
Ok(Value::String(text))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Pipeline state wrapper for Tauri
|
|
pub struct PipelineState {
|
|
/// Pipeline executor
|
|
pub executor: Arc<PipelineExecutor>,
|
|
/// Discovered pipelines (id -> Pipeline)
|
|
pub pipelines: RwLock<HashMap<String, Pipeline>>,
|
|
/// Pipeline file paths (id -> path)
|
|
pub pipeline_paths: RwLock<HashMap<String, PathBuf>>,
|
|
}
|
|
|
|
impl PipelineState {
|
|
pub fn new(action_registry: Arc<ActionRegistry>) -> Self {
|
|
Self {
|
|
executor: Arc::new(PipelineExecutor::new(action_registry)),
|
|
pipelines: RwLock::new(HashMap::new()),
|
|
pipeline_paths: RwLock::new(HashMap::new()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Pipeline info for list display
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct PipelineInfo {
|
|
/// Pipeline ID (name)
|
|
pub id: String,
|
|
/// Display name
|
|
pub display_name: String,
|
|
/// Description
|
|
pub description: String,
|
|
/// Category (functional classification)
|
|
pub category: String,
|
|
/// Industry classification (e.g., "internet", "finance", "healthcare")
|
|
pub industry: String,
|
|
/// Tags
|
|
pub tags: Vec<String>,
|
|
/// Icon (emoji)
|
|
pub icon: String,
|
|
/// Version
|
|
pub version: String,
|
|
/// Author
|
|
pub author: String,
|
|
/// Input parameters
|
|
pub inputs: Vec<PipelineInputInfo>,
|
|
}
|
|
|
|
/// Pipeline input parameter info
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct PipelineInputInfo {
|
|
/// Parameter name
|
|
pub name: String,
|
|
/// Input type
|
|
pub input_type: String,
|
|
/// Is required
|
|
pub required: bool,
|
|
/// Label
|
|
pub label: String,
|
|
/// Placeholder
|
|
pub placeholder: Option<String>,
|
|
/// Default value
|
|
pub default: Option<Value>,
|
|
/// Options (for select/multi-select)
|
|
pub options: Vec<String>,
|
|
}
|
|
|
|
/// Run pipeline request
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct RunPipelineRequest {
|
|
/// Pipeline ID
|
|
pub pipeline_id: String,
|
|
/// Input values
|
|
pub inputs: HashMap<String, Value>,
|
|
}
|
|
|
|
/// Run pipeline response
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct RunPipelineResponse {
|
|
/// Run ID
|
|
pub run_id: String,
|
|
/// Pipeline ID
|
|
pub pipeline_id: String,
|
|
/// Status
|
|
pub status: String,
|
|
}
|
|
|
|
/// Pipeline run status response
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct PipelineRunResponse {
|
|
/// Run ID
|
|
pub run_id: String,
|
|
/// Pipeline ID
|
|
pub pipeline_id: String,
|
|
/// Status
|
|
pub status: String,
|
|
/// Current step
|
|
pub current_step: Option<String>,
|
|
/// Progress percentage
|
|
pub percentage: u8,
|
|
/// Message
|
|
pub message: String,
|
|
/// Outputs (if completed)
|
|
pub outputs: Option<Value>,
|
|
/// Error (if failed)
|
|
pub error: Option<String>,
|
|
/// Started at
|
|
pub started_at: String,
|
|
/// Ended at
|
|
pub ended_at: Option<String>,
|
|
}
|
|
|
|
/// Discover and list all available pipelines
|
|
#[tauri::command]
|
|
pub async fn pipeline_list(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
category: Option<String>,
|
|
industry: Option<String>,
|
|
) -> Result<Vec<PipelineInfo>, String> {
|
|
// Get pipelines directory
|
|
let pipelines_dir = get_pipelines_directory()?;
|
|
|
|
tracing::debug!("[pipeline_list] Scanning directory: {:?}", pipelines_dir);
|
|
tracing::debug!("[pipeline_list] Filters - category: {:?}, industry: {:?}", category, industry);
|
|
|
|
// Scan for pipeline files (returns both info and paths)
|
|
let mut pipelines_with_paths: Vec<(PipelineInfo, PathBuf)> = Vec::new();
|
|
if pipelines_dir.exists() {
|
|
scan_pipelines_with_paths(&pipelines_dir, category.as_deref(), industry.as_deref(), &mut pipelines_with_paths)?;
|
|
} else {
|
|
tracing::warn!("[WARN pipeline_list] Pipelines directory does not exist: {:?}", pipelines_dir);
|
|
}
|
|
|
|
tracing::debug!("[pipeline_list] Found {} pipelines", pipelines_with_paths.len());
|
|
|
|
// Debug: log all pipelines with their industry values
|
|
for (info, _) in &pipelines_with_paths {
|
|
tracing::debug!("[pipeline_list] Pipeline: {} -> category: {}, industry: '{}'", info.id, info.category, info.industry);
|
|
}
|
|
|
|
// Update state
|
|
let mut state_pipelines = state.pipelines.write().await;
|
|
let mut state_paths = state.pipeline_paths.write().await;
|
|
|
|
let mut result = Vec::new();
|
|
for (info, path) in &pipelines_with_paths {
|
|
// Load full pipeline into state
|
|
if let Ok(content) = std::fs::read_to_string(path) {
|
|
if let Ok(pipeline) = parse_pipeline_yaml(&content) {
|
|
state_pipelines.insert(info.id.clone(), pipeline);
|
|
state_paths.insert(info.id.clone(), path.clone());
|
|
}
|
|
}
|
|
result.push(info.clone());
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
/// Get pipeline details
|
|
#[tauri::command]
|
|
pub async fn pipeline_get(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
pipeline_id: String,
|
|
) -> Result<PipelineInfo, String> {
|
|
let pipelines = state.pipelines.read().await;
|
|
|
|
let pipeline = pipelines.get(&pipeline_id)
|
|
.ok_or_else(|| format!("Pipeline not found: {}", pipeline_id))?;
|
|
|
|
Ok(pipeline_to_info(pipeline))
|
|
}
|
|
|
|
/// Run a pipeline
|
|
#[tauri::command]
|
|
pub async fn pipeline_run(
|
|
app: AppHandle,
|
|
state: State<'_, Arc<PipelineState>>,
|
|
kernel_state: State<'_, KernelState>,
|
|
request: RunPipelineRequest,
|
|
) -> Result<RunPipelineResponse, String> {
|
|
tracing::debug!("[pipeline_run] Received request for pipeline_id: {}", request.pipeline_id);
|
|
|
|
// Get pipeline
|
|
let pipelines = state.pipelines.read().await;
|
|
tracing::debug!("[pipeline_run] State has {} pipelines loaded", pipelines.len());
|
|
|
|
// Debug: list all loaded pipeline IDs
|
|
for (id, _) in pipelines.iter() {
|
|
tracing::debug!("[pipeline_run] Loaded pipeline: {}", id);
|
|
}
|
|
|
|
let pipeline = pipelines.get(&request.pipeline_id)
|
|
.ok_or_else(|| {
|
|
println!("[ERROR pipeline_run] Pipeline '{}' not found in state. Available: {:?}",
|
|
request.pipeline_id,
|
|
pipelines.keys().collect::<Vec<_>>());
|
|
format!("Pipeline not found: {}", request.pipeline_id)
|
|
})?
|
|
.clone();
|
|
drop(pipelines);
|
|
|
|
// Try to get LLM driver from Kernel
|
|
let llm_driver = {
|
|
let kernel_lock = kernel_state.lock().await;
|
|
if let Some(kernel) = kernel_lock.as_ref() {
|
|
tracing::debug!("[pipeline_run] Got LLM driver from Kernel");
|
|
Some(Arc::new(RuntimeLlmAdapter::new(
|
|
kernel.driver(),
|
|
Some(kernel.config().llm.model.clone()),
|
|
)) as Arc<dyn LlmActionDriver>)
|
|
} else {
|
|
tracing::debug!("[pipeline_run] Kernel not initialized, no LLM driver available");
|
|
None
|
|
}
|
|
};
|
|
|
|
// Create executor with or without LLM driver
|
|
let executor = if let Some(driver) = llm_driver {
|
|
let registry = Arc::new(ActionRegistry::new().with_llm_driver(driver));
|
|
Arc::new(PipelineExecutor::new(registry))
|
|
} else {
|
|
state.executor.clone()
|
|
};
|
|
|
|
// Generate run ID upfront so we can return it to the caller
|
|
let run_id = uuid::Uuid::new_v4().to_string();
|
|
let pipeline_id = request.pipeline_id.clone();
|
|
let inputs = request.inputs.clone();
|
|
|
|
// Clone for async task
|
|
let run_id_for_spawn = run_id.clone();
|
|
|
|
// Run pipeline in background with the known run_id
|
|
tokio::spawn(async move {
|
|
tracing::debug!("[pipeline_run] Starting execution with run_id: {}", run_id_for_spawn);
|
|
let result = executor.execute_with_id(&pipeline, inputs, &run_id_for_spawn).await;
|
|
|
|
tracing::debug!("[pipeline_run] Execution completed for run_id: {}, status: {:?}",
|
|
run_id_for_spawn,
|
|
result.as_ref().map(|r| r.status.clone()).unwrap_or(RunStatus::Failed));
|
|
|
|
// Emit completion event
|
|
let _ = app.emit("pipeline-complete", &PipelineRunResponse {
|
|
run_id: run_id_for_spawn.clone(),
|
|
pipeline_id: pipeline_id.clone(),
|
|
status: match &result {
|
|
Ok(r) => r.status.to_string(),
|
|
Err(_) => "failed".to_string(),
|
|
},
|
|
current_step: None,
|
|
percentage: 100,
|
|
message: match &result {
|
|
Ok(_) => "Pipeline completed".to_string(),
|
|
Err(e) => e.to_string(),
|
|
},
|
|
outputs: result.as_ref().ok().and_then(|r| r.outputs.clone()),
|
|
error: result.as_ref().err().map(|e| e.to_string()),
|
|
started_at: chrono::Utc::now().to_rfc3339(),
|
|
ended_at: Some(chrono::Utc::now().to_rfc3339()),
|
|
});
|
|
});
|
|
|
|
// Return immediately with the known run ID
|
|
tracing::debug!("[pipeline_run] Returning run_id: {} to caller", run_id);
|
|
Ok(RunPipelineResponse {
|
|
run_id,
|
|
pipeline_id: request.pipeline_id,
|
|
status: "running".to_string(),
|
|
})
|
|
}
|
|
|
|
/// Get pipeline run progress
|
|
#[tauri::command]
|
|
pub async fn pipeline_progress(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
run_id: String,
|
|
) -> Result<PipelineRunResponse, String> {
|
|
let progress = state.executor.get_progress(&run_id).await
|
|
.ok_or_else(|| format!("Run not found: {}", run_id))?;
|
|
|
|
let run = state.executor.get_run(&run_id).await;
|
|
|
|
Ok(PipelineRunResponse {
|
|
run_id: progress.run_id,
|
|
pipeline_id: run.as_ref().map(|r| r.pipeline_id.clone()).unwrap_or_default(),
|
|
status: progress.status.to_string(),
|
|
current_step: Some(progress.current_step),
|
|
percentage: progress.percentage,
|
|
message: progress.message,
|
|
outputs: run.as_ref().and_then(|r| r.outputs.clone()),
|
|
error: run.and_then(|r| r.error),
|
|
started_at: chrono::Utc::now().to_rfc3339(), // TODO: use actual time
|
|
ended_at: None,
|
|
})
|
|
}
|
|
|
|
/// Cancel a pipeline run
|
|
#[tauri::command]
|
|
pub async fn pipeline_cancel(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
run_id: String,
|
|
) -> Result<(), String> {
|
|
state.executor.cancel(&run_id).await;
|
|
Ok(())
|
|
}
|
|
|
|
/// Get pipeline run result
|
|
#[tauri::command]
|
|
pub async fn pipeline_result(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
run_id: String,
|
|
) -> Result<PipelineRunResponse, String> {
|
|
let run = state.executor.get_run(&run_id).await
|
|
.ok_or_else(|| format!("Run not found: {}", run_id))?;
|
|
|
|
let current_step = run.current_step.clone();
|
|
let status = run.status.clone();
|
|
|
|
Ok(PipelineRunResponse {
|
|
run_id: run.id,
|
|
pipeline_id: run.pipeline_id,
|
|
status: status.to_string(),
|
|
current_step: current_step.clone(),
|
|
percentage: if status == RunStatus::Completed { 100 } else { 0 },
|
|
message: current_step.unwrap_or_default(),
|
|
outputs: run.outputs,
|
|
error: run.error,
|
|
started_at: run.started_at.to_rfc3339(),
|
|
ended_at: run.ended_at.map(|t| t.to_rfc3339()),
|
|
})
|
|
}
|
|
|
|
/// List all runs
|
|
#[tauri::command]
|
|
pub async fn pipeline_runs(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
) -> Result<Vec<PipelineRunResponse>, String> {
|
|
let runs = state.executor.list_runs().await;
|
|
|
|
Ok(runs.into_iter().map(|run| {
|
|
let current_step = run.current_step.clone();
|
|
let status = run.status.clone();
|
|
PipelineRunResponse {
|
|
run_id: run.id,
|
|
pipeline_id: run.pipeline_id,
|
|
status: status.to_string(),
|
|
current_step: current_step.clone(),
|
|
percentage: if status == RunStatus::Completed { 100 } else if status == RunStatus::Running { 50 } else { 0 },
|
|
message: current_step.unwrap_or_default(),
|
|
outputs: run.outputs,
|
|
error: run.error,
|
|
started_at: run.started_at.to_rfc3339(),
|
|
ended_at: run.ended_at.map(|t| t.to_rfc3339()),
|
|
}
|
|
}).collect())
|
|
}
|
|
|
|
/// Refresh pipeline discovery
|
|
#[tauri::command]
|
|
pub async fn pipeline_refresh(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
) -> Result<Vec<PipelineInfo>, String> {
|
|
let pipelines_dir = get_pipelines_directory()?;
|
|
|
|
if !pipelines_dir.exists() {
|
|
std::fs::create_dir_all(&pipelines_dir)
|
|
.map_err(|e| format!("Failed to create pipelines directory: {}", e))?;
|
|
}
|
|
|
|
let mut state_pipelines = state.pipelines.write().await;
|
|
let mut state_paths = state.pipeline_paths.write().await;
|
|
|
|
// Clear existing
|
|
state_pipelines.clear();
|
|
state_paths.clear();
|
|
|
|
// Scan and load all pipelines (synchronous)
|
|
let mut pipelines = Vec::new();
|
|
scan_pipelines_full_sync(&pipelines_dir, &mut pipelines)?;
|
|
|
|
for (path, pipeline) in &pipelines {
|
|
let id = pipeline.metadata.name.clone();
|
|
state_pipelines.insert(id.clone(), pipeline.clone());
|
|
state_paths.insert(id, path.clone());
|
|
}
|
|
|
|
Ok(pipelines.into_iter().map(|(_, p)| pipeline_to_info(&p)).collect())
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
fn get_pipelines_directory() -> Result<PathBuf, String> {
|
|
// Try to find pipelines directory
|
|
// Priority: ZCLAW_PIPELINES_DIR env > workspace pipelines/ > ~/.zclaw/pipelines/
|
|
|
|
if let Ok(dir) = std::env::var("ZCLAW_PIPELINES_DIR") {
|
|
return Ok(PathBuf::from(dir));
|
|
}
|
|
|
|
// Try workspace directory
|
|
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
|
let workspace_pipelines = manifest_dir
|
|
.parent()
|
|
.and_then(|p| p.parent())
|
|
.map(|p| p.join("pipelines"));
|
|
|
|
if let Some(ref dir) = workspace_pipelines {
|
|
if dir.exists() {
|
|
return Ok(dir.clone());
|
|
}
|
|
}
|
|
|
|
// Fallback to user home directory
|
|
if let Some(home) = dirs::home_dir() {
|
|
let dir = home.join(".zclaw").join("pipelines");
|
|
return Ok(dir);
|
|
}
|
|
|
|
Err("Could not determine pipelines directory".to_string())
|
|
}
|
|
|
|
/// Scan pipelines with paths (returns both info and file paths)
|
|
fn scan_pipelines_with_paths(
|
|
dir: &PathBuf,
|
|
category_filter: Option<&str>,
|
|
industry_filter: Option<&str>,
|
|
pipelines: &mut Vec<(PipelineInfo, PathBuf)>,
|
|
) -> Result<(), String> {
|
|
tracing::debug!("[scan] Entering directory: {:?}", dir);
|
|
let entries = std::fs::read_dir(dir)
|
|
.map_err(|e| format!("Failed to read pipelines directory: {}", e))?;
|
|
|
|
for entry in entries {
|
|
let entry = entry.map_err(|e| format!("Failed to read entry: {}", e))?;
|
|
let path = entry.path();
|
|
|
|
if path.is_dir() {
|
|
// Recursively scan subdirectory
|
|
scan_pipelines_with_paths(&path, category_filter, industry_filter, pipelines)?;
|
|
} else if path.extension().map(|e| e == "yaml" || e == "yml").unwrap_or(false) {
|
|
// Try to parse pipeline file
|
|
tracing::debug!("[scan] Found YAML file: {:?}", path);
|
|
if let Ok(content) = std::fs::read_to_string(&path) {
|
|
tracing::debug!("[scan] File content length: {} bytes", content.len());
|
|
match parse_pipeline_yaml(&content) {
|
|
Ok(pipeline) => {
|
|
// Debug: log parsed pipeline metadata
|
|
println!(
|
|
"[DEBUG scan] Parsed YAML: {} -> category: {:?}, industry: {:?}",
|
|
pipeline.metadata.name,
|
|
pipeline.metadata.category,
|
|
pipeline.metadata.industry
|
|
);
|
|
|
|
// Apply category filter
|
|
if let Some(filter) = category_filter {
|
|
if pipeline.metadata.category.as_deref() != Some(filter) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Apply industry filter
|
|
if let Some(filter) = industry_filter {
|
|
if pipeline.metadata.industry.as_deref() != Some(filter) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
tracing::debug!("[scan] Found pipeline: {} at {:?}", pipeline.metadata.name, path);
|
|
pipelines.push((pipeline_to_info(&pipeline), path));
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("[scan] Failed to parse pipeline at {:?}: {}", path, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn scan_pipelines_full_sync(
|
|
dir: &PathBuf,
|
|
pipelines: &mut Vec<(PathBuf, Pipeline)>,
|
|
) -> Result<(), String> {
|
|
let entries = std::fs::read_dir(dir)
|
|
.map_err(|e| format!("Failed to read pipelines directory: {}", e))?;
|
|
|
|
for entry in entries {
|
|
let entry = entry.map_err(|e| format!("Failed to read entry: {}", e))?;
|
|
let path = entry.path();
|
|
|
|
if path.is_dir() {
|
|
scan_pipelines_full_sync(&path, pipelines)?;
|
|
} else if path.extension().map(|e| e == "yaml" || e == "yml").unwrap_or(false) {
|
|
if let Ok(content) = std::fs::read_to_string(&path) {
|
|
if let Ok(pipeline) = parse_pipeline_yaml(&content) {
|
|
pipelines.push((path, pipeline));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn pipeline_to_info(pipeline: &Pipeline) -> PipelineInfo {
|
|
let industry = pipeline.metadata.industry.clone().unwrap_or_default();
|
|
println!(
|
|
"[DEBUG pipeline_to_info] Pipeline: {}, category: {:?}, industry: {:?}",
|
|
pipeline.metadata.name,
|
|
pipeline.metadata.category,
|
|
pipeline.metadata.industry
|
|
);
|
|
|
|
PipelineInfo {
|
|
id: pipeline.metadata.name.clone(),
|
|
display_name: pipeline.metadata.display_name.clone()
|
|
.unwrap_or_else(|| pipeline.metadata.name.clone()),
|
|
description: pipeline.metadata.description.clone().unwrap_or_default(),
|
|
category: pipeline.metadata.category.clone().unwrap_or_default(),
|
|
industry,
|
|
tags: pipeline.metadata.tags.clone(),
|
|
icon: pipeline.metadata.icon.clone().unwrap_or_else(|| "📦".to_string()),
|
|
version: pipeline.metadata.version.clone(),
|
|
author: pipeline.metadata.author.clone().unwrap_or_default(),
|
|
inputs: pipeline.spec.inputs.iter().map(|input| {
|
|
PipelineInputInfo {
|
|
name: input.name.clone(),
|
|
input_type: match input.input_type {
|
|
zclaw_pipeline::InputType::String => "string".to_string(),
|
|
zclaw_pipeline::InputType::Number => "number".to_string(),
|
|
zclaw_pipeline::InputType::Boolean => "boolean".to_string(),
|
|
zclaw_pipeline::InputType::Select => "select".to_string(),
|
|
zclaw_pipeline::InputType::MultiSelect => "multi-select".to_string(),
|
|
zclaw_pipeline::InputType::File => "file".to_string(),
|
|
zclaw_pipeline::InputType::Text => "text".to_string(),
|
|
},
|
|
required: input.required,
|
|
label: input.label.clone().unwrap_or_else(|| input.name.clone()),
|
|
placeholder: input.placeholder.clone(),
|
|
default: input.default.clone(),
|
|
options: input.options.clone(),
|
|
}
|
|
}).collect(),
|
|
}
|
|
}
|
|
|
|
/// Create pipeline state with default action registry
|
|
pub fn create_pipeline_state() -> Arc<PipelineState> {
|
|
// Try to create an LLM driver from environment/config
|
|
let action_registry = if let Some(driver) = create_llm_driver_from_config() {
|
|
tracing::debug!("[create_pipeline_state] LLM driver configured successfully");
|
|
Arc::new(ActionRegistry::new().with_llm_driver(driver))
|
|
} else {
|
|
tracing::debug!("[create_pipeline_state] No LLM driver configured - pipelines requiring LLM will fail");
|
|
Arc::new(ActionRegistry::new())
|
|
};
|
|
Arc::new(PipelineState::new(action_registry))
|
|
}
|
|
|
|
// === Intent Router Commands ===
|
|
|
|
/// Route result for frontend
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(tag = "type", rename_all = "snake_case")]
|
|
pub enum RouteResultResponse {
|
|
Matched {
|
|
pipeline_id: String,
|
|
display_name: Option<String>,
|
|
mode: String,
|
|
params: HashMap<String, Value>,
|
|
confidence: f32,
|
|
missing_params: Vec<MissingParamInfo>,
|
|
},
|
|
Ambiguous {
|
|
candidates: Vec<PipelineCandidateInfo>,
|
|
},
|
|
NoMatch {
|
|
suggestions: Vec<PipelineCandidateInfo>,
|
|
},
|
|
NeedMoreInfo {
|
|
prompt: String,
|
|
related_pipeline: Option<String>,
|
|
},
|
|
}
|
|
|
|
/// Missing parameter info
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct MissingParamInfo {
|
|
pub name: String,
|
|
pub label: Option<String>,
|
|
pub param_type: String,
|
|
pub required: bool,
|
|
pub default: Option<Value>,
|
|
}
|
|
|
|
/// Pipeline candidate info
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct PipelineCandidateInfo {
|
|
pub id: String,
|
|
pub display_name: Option<String>,
|
|
pub description: Option<String>,
|
|
pub icon: Option<String>,
|
|
pub category: Option<String>,
|
|
pub match_reason: Option<String>,
|
|
}
|
|
|
|
/// Route user input to matching pipeline
|
|
#[tauri::command]
|
|
pub async fn route_intent(
|
|
state: State<'_, Arc<PipelineState>>,
|
|
user_input: String,
|
|
) -> Result<RouteResultResponse, String> {
|
|
use zclaw_pipeline::{TriggerParser, Trigger, TriggerParam, compile_trigger};
|
|
|
|
tracing::debug!("[route_intent] Routing user input: {}", user_input);
|
|
|
|
// Build trigger parser from loaded pipelines
|
|
let pipelines = state.pipelines.read().await;
|
|
let mut parser = TriggerParser::new();
|
|
|
|
for (id, pipeline) in pipelines.iter() {
|
|
// Extract trigger info from pipeline metadata
|
|
// For now, use tags as keywords and description as trigger description
|
|
let trigger = Trigger {
|
|
keywords: pipeline.metadata.tags.clone(),
|
|
patterns: vec![], // TODO: add pattern support in pipeline definition
|
|
description: pipeline.metadata.description.clone(),
|
|
examples: vec![],
|
|
};
|
|
|
|
// Convert pipeline inputs to trigger params
|
|
let param_defs: Vec<TriggerParam> = pipeline.spec.inputs.iter().map(|input| {
|
|
TriggerParam {
|
|
name: input.name.clone(),
|
|
param_type: match input.input_type {
|
|
zclaw_pipeline::InputType::String => "string".to_string(),
|
|
zclaw_pipeline::InputType::Number => "number".to_string(),
|
|
zclaw_pipeline::InputType::Boolean => "boolean".to_string(),
|
|
zclaw_pipeline::InputType::Select => "select".to_string(),
|
|
zclaw_pipeline::InputType::MultiSelect => "multi-select".to_string(),
|
|
zclaw_pipeline::InputType::File => "file".to_string(),
|
|
zclaw_pipeline::InputType::Text => "text".to_string(),
|
|
},
|
|
required: input.required,
|
|
label: input.label.clone(),
|
|
default: input.default.clone(),
|
|
}
|
|
}).collect();
|
|
|
|
match compile_trigger(
|
|
id.clone(),
|
|
pipeline.metadata.display_name.clone(),
|
|
&trigger,
|
|
param_defs,
|
|
) {
|
|
Ok(compiled) => parser.register(compiled),
|
|
Err(e) => {
|
|
tracing::warn!("[WARN route_intent] Failed to compile trigger for {}: {}", id, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Quick match
|
|
if let Some(match_result) = parser.quick_match(&user_input) {
|
|
let trigger = parser.get_trigger(&match_result.pipeline_id);
|
|
|
|
// Determine input mode
|
|
let mode = if let Some(t) = &trigger {
|
|
let required_count = t.param_defs.iter().filter(|p| p.required).count();
|
|
if required_count > 3 || t.param_defs.len() > 5 {
|
|
"form"
|
|
} else if t.param_defs.is_empty() {
|
|
"conversation"
|
|
} else {
|
|
"conversation"
|
|
}
|
|
} else {
|
|
"auto"
|
|
};
|
|
|
|
// Find missing params
|
|
let missing_params: Vec<MissingParamInfo> = trigger
|
|
.map(|t| {
|
|
t.param_defs.iter()
|
|
.filter(|p| p.required && !match_result.params.contains_key(&p.name) && p.default.is_none())
|
|
.map(|p| MissingParamInfo {
|
|
name: p.name.clone(),
|
|
label: p.label.clone(),
|
|
param_type: p.param_type.clone(),
|
|
required: p.required,
|
|
default: p.default.clone(),
|
|
})
|
|
.collect()
|
|
})
|
|
.unwrap_or_default();
|
|
|
|
return Ok(RouteResultResponse::Matched {
|
|
pipeline_id: match_result.pipeline_id,
|
|
display_name: trigger.and_then(|t| t.display_name.clone()),
|
|
mode: mode.to_string(),
|
|
params: match_result.params,
|
|
confidence: match_result.confidence,
|
|
missing_params,
|
|
});
|
|
}
|
|
|
|
// No match - return suggestions
|
|
let suggestions: Vec<PipelineCandidateInfo> = parser.triggers()
|
|
.iter()
|
|
.take(3)
|
|
.map(|t| PipelineCandidateInfo {
|
|
id: t.pipeline_id.clone(),
|
|
display_name: t.display_name.clone(),
|
|
description: t.description.clone(),
|
|
icon: None,
|
|
category: None,
|
|
match_reason: Some("推荐".to_string()),
|
|
})
|
|
.collect();
|
|
|
|
Ok(RouteResultResponse::NoMatch { suggestions })
|
|
}
|
|
|
|
/// Create an LLM driver from configuration file or environment variables
|
|
fn create_llm_driver_from_config() -> Option<Arc<dyn LlmActionDriver>> {
|
|
// Try to read config file
|
|
let config_path = dirs::config_dir()
|
|
.map(|p| p.join("zclaw").join("config.toml"))?;
|
|
|
|
if !config_path.exists() {
|
|
tracing::debug!("[create_llm_driver] Config file not found at {:?}", config_path);
|
|
return None;
|
|
}
|
|
|
|
// Read and parse config
|
|
let config_content = std::fs::read_to_string(&config_path).ok()?;
|
|
let config: toml::Value = toml::from_str(&config_content).ok()?;
|
|
|
|
// Extract LLM config
|
|
let llm_config = config.get("llm")?;
|
|
|
|
let provider = llm_config.get("provider")?.as_str()?.to_string();
|
|
let api_key = llm_config.get("api_key")?.as_str()?.to_string();
|
|
let base_url = llm_config.get("base_url").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
let model = llm_config.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
|
|
tracing::debug!("[create_llm_driver] Found LLM config: provider={}, model={:?}", provider, model);
|
|
|
|
// Convert api_key to SecretString
|
|
let secret_key = SecretString::new(api_key);
|
|
|
|
// Create the runtime driver
|
|
let runtime_driver: Arc<dyn zclaw_runtime::LlmDriver> = match provider.as_str() {
|
|
"anthropic" => {
|
|
Arc::new(zclaw_runtime::AnthropicDriver::new(secret_key))
|
|
}
|
|
"openai" | "doubao" | "qwen" | "deepseek" | "kimi" => {
|
|
Arc::new(zclaw_runtime::OpenAiDriver::new(secret_key))
|
|
}
|
|
"gemini" => {
|
|
Arc::new(zclaw_runtime::GeminiDriver::new(secret_key))
|
|
}
|
|
"local" | "ollama" => {
|
|
let url = base_url.unwrap_or_else(|| "http://localhost:11434".to_string());
|
|
Arc::new(zclaw_runtime::LocalDriver::new(&url))
|
|
}
|
|
_ => {
|
|
tracing::warn!("[WARN create_llm_driver] Unknown provider: {}", provider);
|
|
return None;
|
|
}
|
|
};
|
|
|
|
Some(Arc::new(RuntimeLlmAdapter::new(runtime_driver, model)))
|
|
}
|
|
|
|
/// Analyze presentation data
|
|
#[tauri::command]
|
|
pub async fn analyze_presentation(
|
|
data: Value,
|
|
) -> Result<serde_json::Value, String> {
|
|
use zclaw_pipeline::presentation::PresentationAnalyzer;
|
|
|
|
let analyzer = PresentationAnalyzer::new();
|
|
let analysis = analyzer.analyze(&data);
|
|
|
|
// Convert analysis to JSON
|
|
serde_json::to_value(&analysis).map_err(|e| e.to_string())
|
|
}
|