feat(pipeline): Pipeline 图持久化 — GraphStore 实现

新增 GraphStore trait 和 MemoryGraphStore 实现:
- save/load/delete/list_ids 异步接口
- 可选文件持久化到 JSON 目录
- 启动时从磁盘加载已保存的图

SkillOrchestrationDriver 集成:
- 新增 with_graph_store() 构造函数
- graph_id 路径从硬编码错误改为从 GraphStore 查找
- 无 store 时返回明确的错误信息

修复了 "Graph loading by ID not yet implemented" 的 TODO
This commit is contained in:
iven
2026-03-30 00:25:38 +08:00
parent f3f586efef
commit eed26a1ce4
3 changed files with 150 additions and 3 deletions

View File

@@ -13,12 +13,22 @@ use super::OrchestrationActionDriver;
pub struct SkillOrchestrationDriver {
/// Skill registry for executing skills
skill_registry: Arc<zclaw_skills::SkillRegistry>,
/// Graph store for persisting/loading graphs by ID
graph_store: Option<Arc<dyn zclaw_skills::orchestration::GraphStore>>,
}
impl SkillOrchestrationDriver {
/// Create a new orchestration driver
pub fn new(skill_registry: Arc<zclaw_skills::SkillRegistry>) -> Self {
Self { skill_registry }
Self { skill_registry, graph_store: None }
}
/// Create with graph persistence
pub fn with_graph_store(
skill_registry: Arc<zclaw_skills::SkillRegistry>,
graph_store: Arc<dyn zclaw_skills::orchestration::GraphStore>,
) -> Self {
Self { skill_registry, graph_store: Some(graph_store) }
}
}
@@ -38,8 +48,11 @@ impl OrchestrationActionDriver for SkillOrchestrationDriver {
serde_json::from_value::<SkillGraph>(graph_value.clone())
.map_err(|e| format!("Failed to parse graph: {}", e))?
} else if let Some(id) = graph_id {
// Load graph from registry (TODO: implement graph storage)
return Err(format!("Graph loading by ID not yet implemented: {}", id));
// Load graph from store
self.graph_store.as_ref()
.ok_or_else(|| "Graph store not configured. Cannot resolve graph_id.".to_string())?
.load(id).await
.ok_or_else(|| format!("Graph not found: {}", id))?
} else {
return Err("Either graph_id or graph must be provided".to_string());
};

View File

@@ -0,0 +1,132 @@
//! Graph store — persistence layer for SkillGraph definitions
//!
//! Provides save/load/delete operations for orchestration graphs,
//! enabling graph_id references in pipeline actions.
use async_trait::async_trait;
use std::collections::HashMap;
use std::path::PathBuf;
use tokio::sync::RwLock;
use crate::orchestration::SkillGraph;
/// Trait for graph persistence backends
#[async_trait]
pub trait GraphStore: Send + Sync {
/// Save a graph definition
async fn save(&self, graph: &SkillGraph) -> Result<(), String>;
/// Load a graph by ID
async fn load(&self, id: &str) -> Option<SkillGraph>;
/// Delete a graph by ID
async fn delete(&self, id: &str) -> bool;
/// List all stored graph IDs
async fn list_ids(&self) -> Vec<String>;
}
/// In-memory graph store with optional file persistence
pub struct MemoryGraphStore {
graphs: RwLock<HashMap<String, SkillGraph>>,
persist_dir: Option<PathBuf>,
}
impl MemoryGraphStore {
/// Create an in-memory-only store
pub fn new() -> Self {
Self {
graphs: RwLock::new(HashMap::new()),
persist_dir: None,
}
}
/// Create with file persistence to the given directory
pub fn with_persist_dir(dir: PathBuf) -> Self {
let store = Self {
graphs: RwLock::new(HashMap::new()),
persist_dir: Some(dir),
};
// We'll load from disk lazily on first access
store
}
/// Load all graphs from the persist directory
pub async fn load_from_disk(&self) -> Result<usize, String> {
let dir = match &self.persist_dir {
Some(d) => d.clone(),
None => return Ok(0),
};
if !dir.exists() {
return Ok(0);
}
let mut count = 0;
let mut entries = tokio::fs::read_dir(&dir)
.await
.map_err(|e| format!("Failed to read graph dir: {}", e))?;
while let Some(entry) = entries.next_entry().await
.map_err(|e| format!("Failed to read entry: {}", e))?
{
let path = entry.path();
if path.extension().map(|e| e == "json").unwrap_or(false) {
let content = tokio::fs::read_to_string(&path)
.await
.map_err(|e| format!("Failed to read {}: {}", path.display(), e))?;
if let Ok(graph) = serde_json::from_str::<SkillGraph>(&content) {
let id = graph.id.clone();
self.graphs.write().await.insert(id, graph);
count += 1;
}
}
}
tracing::info!("[GraphStore] Loaded {} graphs from {}", count, dir.display());
Ok(count)
}
async fn persist_graph(&self, graph: &SkillGraph) {
if let Some(ref dir) = self.persist_dir {
let path = dir.join(format!("{}.json", graph.id));
if let Ok(content) = serde_json::to_string_pretty(graph) {
if let Err(e) = tokio::fs::write(&path, &content).await {
tracing::warn!("[GraphStore] Failed to persist {}: {}", graph.id, e);
}
}
}
}
async fn remove_persist(&self, id: &str) {
if let Some(ref dir) = self.persist_dir {
let path = dir.join(format!("{}.json", id));
let _ = tokio::fs::remove_file(&path).await;
}
}
}
#[async_trait]
impl GraphStore for MemoryGraphStore {
async fn save(&self, graph: &SkillGraph) -> Result<(), String> {
self.persist_graph(graph).await;
self.graphs.write().await.insert(graph.id.clone(), graph.clone());
tracing::debug!("[GraphStore] Saved graph: {}", graph.id);
Ok(())
}
async fn load(&self, id: &str) -> Option<SkillGraph> {
self.graphs.read().await.get(id).cloned()
}
async fn delete(&self, id: &str) -> bool {
self.remove_persist(id).await;
self.graphs.write().await.remove(id).is_some()
}
async fn list_ids(&self) -> Vec<String> {
self.graphs.read().await.keys().cloned().collect()
}
}
impl Default for MemoryGraphStore {
fn default() -> Self {
Self::new()
}
}

View File

@@ -9,6 +9,7 @@ mod planner;
mod executor;
mod context;
mod auto_compose;
mod graph_store;
pub use types::*;
pub use validation::*;
@@ -16,3 +17,4 @@ pub use planner::*;
pub use executor::*;
pub use context::*;
pub use auto_compose::*;
pub use graph_store::{GraphStore, MemoryGraphStore};