From eed26a1ce4e48f3bdba54c2bd3a733c6eaca18f4 Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 30 Mar 2026 00:25:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20Pipeline=20=E5=9B=BE=E6=8C=81?= =?UTF-8?q?=E4=B9=85=E5=8C=96=20=E2=80=94=20GraphStore=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 GraphStore trait 和 MemoryGraphStore 实现: - save/load/delete/list_ids 异步接口 - 可选文件持久化到 JSON 目录 - 启动时从磁盘加载已保存的图 SkillOrchestrationDriver 集成: - 新增 with_graph_store() 构造函数 - graph_id 路径从硬编码错误改为从 GraphStore 查找 - 无 store 时返回明确的错误信息 修复了 "Graph loading by ID not yet implemented" 的 TODO --- .../src/actions/orchestration.rs | 19 ++- .../src/orchestration/graph_store.rs | 132 ++++++++++++++++++ crates/zclaw-skills/src/orchestration/mod.rs | 2 + 3 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 crates/zclaw-skills/src/orchestration/graph_store.rs diff --git a/crates/zclaw-pipeline/src/actions/orchestration.rs b/crates/zclaw-pipeline/src/actions/orchestration.rs index 1dd3f53..89e663a 100644 --- a/crates/zclaw-pipeline/src/actions/orchestration.rs +++ b/crates/zclaw-pipeline/src/actions/orchestration.rs @@ -13,12 +13,22 @@ use super::OrchestrationActionDriver; pub struct SkillOrchestrationDriver { /// Skill registry for executing skills skill_registry: Arc, + /// Graph store for persisting/loading graphs by ID + graph_store: Option>, } impl SkillOrchestrationDriver { /// Create a new orchestration driver pub fn new(skill_registry: Arc) -> Self { - Self { skill_registry } + Self { skill_registry, graph_store: None } + } + + /// Create with graph persistence + pub fn with_graph_store( + skill_registry: Arc, + graph_store: Arc, + ) -> Self { + Self { skill_registry, graph_store: Some(graph_store) } } } @@ -38,8 +48,11 @@ impl OrchestrationActionDriver for SkillOrchestrationDriver { serde_json::from_value::(graph_value.clone()) .map_err(|e| format!("Failed to parse graph: {}", e))? } else if let Some(id) = graph_id { - // Load graph from registry (TODO: implement graph storage) - return Err(format!("Graph loading by ID not yet implemented: {}", id)); + // Load graph from store + self.graph_store.as_ref() + .ok_or_else(|| "Graph store not configured. Cannot resolve graph_id.".to_string())? + .load(id).await + .ok_or_else(|| format!("Graph not found: {}", id))? } else { return Err("Either graph_id or graph must be provided".to_string()); }; diff --git a/crates/zclaw-skills/src/orchestration/graph_store.rs b/crates/zclaw-skills/src/orchestration/graph_store.rs new file mode 100644 index 0000000..aaf364a --- /dev/null +++ b/crates/zclaw-skills/src/orchestration/graph_store.rs @@ -0,0 +1,132 @@ +//! Graph store — persistence layer for SkillGraph definitions +//! +//! Provides save/load/delete operations for orchestration graphs, +//! enabling graph_id references in pipeline actions. + +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::PathBuf; +use tokio::sync::RwLock; +use crate::orchestration::SkillGraph; + +/// Trait for graph persistence backends +#[async_trait] +pub trait GraphStore: Send + Sync { + /// Save a graph definition + async fn save(&self, graph: &SkillGraph) -> Result<(), String>; + /// Load a graph by ID + async fn load(&self, id: &str) -> Option; + /// Delete a graph by ID + async fn delete(&self, id: &str) -> bool; + /// List all stored graph IDs + async fn list_ids(&self) -> Vec; +} + +/// In-memory graph store with optional file persistence +pub struct MemoryGraphStore { + graphs: RwLock>, + persist_dir: Option, +} + +impl MemoryGraphStore { + /// Create an in-memory-only store + pub fn new() -> Self { + Self { + graphs: RwLock::new(HashMap::new()), + persist_dir: None, + } + } + + /// Create with file persistence to the given directory + pub fn with_persist_dir(dir: PathBuf) -> Self { + let store = Self { + graphs: RwLock::new(HashMap::new()), + persist_dir: Some(dir), + }; + // We'll load from disk lazily on first access + store + } + + /// Load all graphs from the persist directory + pub async fn load_from_disk(&self) -> Result { + let dir = match &self.persist_dir { + Some(d) => d.clone(), + None => return Ok(0), + }; + + if !dir.exists() { + return Ok(0); + } + + let mut count = 0; + let mut entries = tokio::fs::read_dir(&dir) + .await + .map_err(|e| format!("Failed to read graph dir: {}", e))?; + + while let Some(entry) = entries.next_entry().await + .map_err(|e| format!("Failed to read entry: {}", e))? + { + let path = entry.path(); + if path.extension().map(|e| e == "json").unwrap_or(false) { + let content = tokio::fs::read_to_string(&path) + .await + .map_err(|e| format!("Failed to read {}: {}", path.display(), e))?; + if let Ok(graph) = serde_json::from_str::(&content) { + let id = graph.id.clone(); + self.graphs.write().await.insert(id, graph); + count += 1; + } + } + } + + tracing::info!("[GraphStore] Loaded {} graphs from {}", count, dir.display()); + Ok(count) + } + + async fn persist_graph(&self, graph: &SkillGraph) { + if let Some(ref dir) = self.persist_dir { + let path = dir.join(format!("{}.json", graph.id)); + if let Ok(content) = serde_json::to_string_pretty(graph) { + if let Err(e) = tokio::fs::write(&path, &content).await { + tracing::warn!("[GraphStore] Failed to persist {}: {}", graph.id, e); + } + } + } + } + + async fn remove_persist(&self, id: &str) { + if let Some(ref dir) = self.persist_dir { + let path = dir.join(format!("{}.json", id)); + let _ = tokio::fs::remove_file(&path).await; + } + } +} + +#[async_trait] +impl GraphStore for MemoryGraphStore { + async fn save(&self, graph: &SkillGraph) -> Result<(), String> { + self.persist_graph(graph).await; + self.graphs.write().await.insert(graph.id.clone(), graph.clone()); + tracing::debug!("[GraphStore] Saved graph: {}", graph.id); + Ok(()) + } + + async fn load(&self, id: &str) -> Option { + self.graphs.read().await.get(id).cloned() + } + + async fn delete(&self, id: &str) -> bool { + self.remove_persist(id).await; + self.graphs.write().await.remove(id).is_some() + } + + async fn list_ids(&self) -> Vec { + self.graphs.read().await.keys().cloned().collect() + } +} + +impl Default for MemoryGraphStore { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/zclaw-skills/src/orchestration/mod.rs b/crates/zclaw-skills/src/orchestration/mod.rs index 5baa1d8..e909767 100644 --- a/crates/zclaw-skills/src/orchestration/mod.rs +++ b/crates/zclaw-skills/src/orchestration/mod.rs @@ -9,6 +9,7 @@ mod planner; mod executor; mod context; mod auto_compose; +mod graph_store; pub use types::*; pub use validation::*; @@ -16,3 +17,4 @@ pub use planner::*; pub use executor::*; pub use context::*; pub use auto_compose::*; +pub use graph_store::{GraphStore, MemoryGraphStore};