Files
zclaw_openfang/crates/zclaw-growth/src/storage/sqlite.rs
iven 04c366fe8b feat(runtime): DeerFlow 模式中间件链 Phase 1-4 全部完成
借鉴 DeerFlow 架构,实现完整中间件链系统:

Phase 1 - Agent 中间件链基础设施
- MiddlewareChain Clone 支持
- LoopRunner 双路径集成 (middleware/legacy)
- Kernel create_middleware_chain() 工厂方法

Phase 2 - 技能按需注入
- SkillIndexMiddleware (priority 200)
- SkillLoadTool 工具
- SkillDetail/SkillIndexEntry 结构体
- KernelSkillExecutor trait 扩展

Phase 3 - Guardrail 安全护栏
- GuardrailMiddleware (priority 400, fail_open)
- ShellExecRule / FileWriteRule / WebFetchRule

Phase 4 - 记忆闭环统一
- MemoryMiddleware (priority 150, 30s 防抖)
- after_completion 双路径调用

中间件注册顺序:
100 Compaction | 150 Memory | 200 SkillIndex
400 Guardrail  | 500 LoopGuard | 700 TokenCalibration

向后兼容:Option<MiddlewareChain> 默认 None 走旧路径
2026-03-29 23:19:41 +08:00

736 lines
26 KiB
Rust

//! SQLite Storage Backend
//!
//! Persistent storage backend using SQLite for production use.
//! Provides efficient querying and full-text search capabilities.
use crate::retrieval::semantic::{EmbeddingClient, SemanticScorer};
use crate::types::MemoryEntry;
use crate::viking_adapter::{FindOptions, VikingStorage};
use async_trait::async_trait;
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions, SqliteRow};
use sqlx::Row;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use zclaw_types::Result;
use zclaw_types::ZclawError;
/// SQLite storage backend with TF-IDF semantic scoring
pub struct SqliteStorage {
/// Database connection pool
pool: SqlitePool,
/// Semantic scorer for similarity computation
scorer: Arc<RwLock<SemanticScorer>>,
/// Database path (for reference)
#[allow(dead_code)]
path: PathBuf,
}
/// Database row structure for memory entry
pub(crate) struct MemoryRow {
uri: String,
memory_type: String,
content: String,
keywords: String,
importance: i32,
access_count: i32,
created_at: String,
last_accessed: String,
overview: Option<String>,
abstract_summary: Option<String>,
}
impl SqliteStorage {
/// Create a new SQLite storage at the given path
pub async fn new(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
// Ensure parent directory exists
if let Some(parent) = path.parent() {
if parent.to_str() != Some(":memory:") {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
ZclawError::StorageError(format!("Failed to create storage directory: {}", e))
})?;
}
}
// Build connection string
let db_url = if path.to_str() == Some(":memory:") {
"sqlite::memory:".to_string()
} else {
format!("sqlite:{}?mode=rwc", path.to_string_lossy())
};
// Create connection pool
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect(&db_url)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to connect to database: {}", e)))?;
let storage = Self {
pool,
scorer: Arc::new(RwLock::new(SemanticScorer::new())),
path,
};
storage.initialize_schema().await?;
storage.warmup_scorer().await?;
Ok(storage)
}
/// Create an in-memory SQLite database (for testing)
pub async fn in_memory() -> Self {
Self::new(":memory:").await.expect("Failed to create in-memory database")
}
/// Configure embedding client for semantic search
/// Replaces the current scorer with a new one that has embedding support
pub async fn configure_embedding(
&self,
client: Arc<dyn EmbeddingClient>,
) -> Result<()> {
let new_scorer = SemanticScorer::with_embedding(client);
let mut scorer = self.scorer.write().await;
*scorer = new_scorer;
tracing::info!("[SqliteStorage] Embedding client configured, re-indexing with embeddings...");
self.warmup_scorer_with_embedding().await
}
/// Check if embedding is available
pub async fn is_embedding_available(&self) -> bool {
let scorer = self.scorer.read().await;
scorer.is_embedding_available()
}
/// Initialize database schema with FTS5
async fn initialize_schema(&self) -> Result<()> {
// Create main memories table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS memories (
uri TEXT PRIMARY KEY,
memory_type TEXT NOT NULL,
content TEXT NOT NULL,
keywords TEXT NOT NULL DEFAULT '[]',
importance INTEGER NOT NULL DEFAULT 5,
access_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
last_accessed TEXT NOT NULL
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create memories table: {}", e)))?;
// Create FTS5 virtual table for full-text search
sqlx::query(
r#"
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
uri,
content,
keywords,
tokenize='unicode61'
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create FTS5 table: {}", e)))?;
// Create index on memory_type for filtering
sqlx::query("CREATE INDEX IF NOT EXISTS idx_memory_type ON memories(memory_type)")
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create index: {}", e)))?;
// Create index on importance for sorting
sqlx::query("CREATE INDEX IF NOT EXISTS idx_importance ON memories(importance DESC)")
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create importance index: {}", e)))?;
// Migration: add overview column (L1 summary)
let _ = sqlx::query("ALTER TABLE memories ADD COLUMN overview TEXT")
.execute(&self.pool)
.await;
// Migration: add abstract_summary column (L0 keywords)
let _ = sqlx::query("ALTER TABLE memories ADD COLUMN abstract_summary TEXT")
.execute(&self.pool)
.await;
// Create metadata table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
json TEXT NOT NULL
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create metadata table: {}", e)))?;
tracing::info!("[SqliteStorage] Database schema initialized");
Ok(())
}
/// Warmup semantic scorer with existing entries
async fn warmup_scorer(&self) -> Result<()> {
let rows = sqlx::query_as::<_, MemoryRow>(
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories"
)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to load memories for warmup: {}", e)))?;
let mut scorer = self.scorer.write().await;
for row in rows {
let entry = self.row_to_entry(&row);
scorer.index_entry(&entry);
}
let stats = scorer.stats();
tracing::info!(
"[SqliteStorage] Warmed up scorer with {} entries, {} terms",
stats.indexed_entries,
stats.unique_terms
);
Ok(())
}
/// Warmup semantic scorer with embedding support for existing entries
async fn warmup_scorer_with_embedding(&self) -> Result<()> {
let rows = sqlx::query_as::<_, MemoryRow>(
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories"
)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to load memories for warmup: {}", e)))?;
let mut scorer = self.scorer.write().await;
for row in rows {
let entry = self.row_to_entry(&row);
scorer.index_entry_with_embedding(&entry).await;
}
let stats = scorer.stats();
tracing::info!(
"[SqliteStorage] Warmed up scorer with {} entries ({} with embeddings), {} terms",
stats.indexed_entries,
stats.embedding_entries,
stats.unique_terms
);
Ok(())
}
/// Convert database row to MemoryEntry
fn row_to_entry(&self, row: &MemoryRow) -> MemoryEntry {
let memory_type = crate::types::MemoryType::parse(&row.memory_type);
let keywords: Vec<String> = serde_json::from_str(&row.keywords).unwrap_or_default();
let created_at = chrono::DateTime::parse_from_rfc3339(&row.created_at)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
let last_accessed = chrono::DateTime::parse_from_rfc3339(&row.last_accessed)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
MemoryEntry {
uri: row.uri.clone(),
memory_type,
content: row.content.clone(),
keywords,
importance: row.importance as u8,
access_count: row.access_count as u32,
created_at,
last_accessed,
overview: row.overview.clone(),
abstract_summary: row.abstract_summary.clone(),
}
}
/// Update access count and last accessed time
async fn touch_entry(&self, uri: &str) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE uri = ?"
)
.bind(&now)
.bind(uri)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to update access count: {}", e)))?;
Ok(())
}
}
impl sqlx::FromRow<'_, SqliteRow> for MemoryRow {
fn from_row(row: &SqliteRow) -> sqlx::Result<Self> {
Ok(MemoryRow {
uri: row.try_get("uri")?,
memory_type: row.try_get("memory_type")?,
content: row.try_get("content")?,
keywords: row.try_get("keywords")?,
importance: row.try_get("importance")?,
access_count: row.try_get("access_count")?,
created_at: row.try_get("created_at")?,
last_accessed: row.try_get("last_accessed")?,
overview: row.try_get("overview").ok(),
abstract_summary: row.try_get("abstract_summary").ok(),
})
}
}
/// Private helper methods on SqliteStorage (NOT in impl VikingStorage block)
impl SqliteStorage {
/// Fetch memories by scope with importance-based ordering.
/// Used internally by find() for scope-based queries.
pub(crate) async fn fetch_by_scope_priv(&self, scope: Option<&str>, limit: usize) -> Result<Vec<MemoryRow>> {
let rows = if let Some(scope) = scope {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary
FROM memories
WHERE uri LIKE ?
ORDER BY importance DESC, access_count DESC
LIMIT ?
"#
)
.bind(format!("{}%", scope))
.bind(limit as i64)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to fetch by scope: {}", e)))?
} else {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary
FROM memories
ORDER BY importance DESC
LIMIT ?
"#
)
.bind(limit as i64)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to fetch by scope: {}", e)))?
};
Ok(rows)
}
}
#[async_trait]
impl VikingStorage for SqliteStorage {
async fn store(&self, entry: &MemoryEntry) -> Result<()> {
let keywords_json = serde_json::to_string(&entry.keywords)
.map_err(|e| ZclawError::StorageError(format!("Failed to serialize keywords: {}", e)))?;
let created_at = entry.created_at.to_rfc3339();
let last_accessed = entry.last_accessed.to_rfc3339();
let memory_type = entry.memory_type.to_string();
// Insert into main table
sqlx::query(
r#"
INSERT OR REPLACE INTO memories
(uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&entry.uri)
.bind(&memory_type)
.bind(&entry.content)
.bind(&keywords_json)
.bind(entry.importance as i32)
.bind(entry.access_count as i32)
.bind(&created_at)
.bind(&last_accessed)
.bind(&entry.overview)
.bind(&entry.abstract_summary)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to store memory: {}", e)))?;
// Update FTS index - delete old and insert new
let _ = sqlx::query("DELETE FROM memories_fts WHERE uri = ?")
.bind(&entry.uri)
.execute(&self.pool)
.await;
let keywords_text = entry.keywords.join(" ");
let _ = sqlx::query(
r#"
INSERT INTO memories_fts (uri, content, keywords)
VALUES (?, ?, ?)
"#,
)
.bind(&entry.uri)
.bind(&entry.content)
.bind(&keywords_text)
.execute(&self.pool)
.await;
// Update semantic scorer (use embedding when available)
let mut scorer = self.scorer.write().await;
if scorer.is_embedding_available() {
scorer.index_entry_with_embedding(entry).await;
} else {
scorer.index_entry(entry);
}
tracing::debug!("[SqliteStorage] Stored memory: {}", entry.uri);
Ok(())
}
async fn get(&self, uri: &str) -> Result<Option<MemoryEntry>> {
let row = sqlx::query_as::<_, MemoryRow>(
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories WHERE uri = ?"
)
.bind(uri)
.fetch_optional(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to get memory: {}", e)))?;
if let Some(row) = row {
let entry = self.row_to_entry(&row);
// Update access count
self.touch_entry(&entry.uri).await?;
Ok(Some(entry))
} else {
Ok(None)
}
}
async fn find(&self, query: &str, options: FindOptions) -> Result<Vec<MemoryEntry>> {
let limit = options.limit.unwrap_or(50).max(20); // Fetch more candidates for reranking
// Strategy: use FTS5 for initial filtering when query is non-empty,
// then score candidates with TF-IDF / embedding for precise ranking.
// Fallback to scope-only scan when query is empty (e.g., "list all").
let rows = if !query.is_empty() {
// FTS5-powered candidate retrieval (fast, index-based)
let fts_candidates = if let Some(ref scope) = options.scope {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT m.uri, m.memory_type, m.content, m.keywords, m.importance,
m.access_count, m.created_at, m.last_accessed, m.overview, m.abstract_summary
FROM memories m
INNER JOIN memories_fts f ON m.uri = f.uri
WHERE f.memories_fts MATCH ?
AND m.uri LIKE ?
ORDER BY f.rank
LIMIT ?
"#
)
.bind(query)
.bind(format!("{}%", scope))
.bind(limit as i64)
.fetch_all(&self.pool)
.await
} else {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT m.uri, m.memory_type, m.content, m.keywords, m.importance,
m.access_count, m.created_at, m.last_accessed, m.overview, m.abstract_summary
FROM memories m
INNER JOIN memories_fts f ON m.uri = f.uri
WHERE f.memories_fts MATCH ?
ORDER BY f.rank
LIMIT ?
"#
)
.bind(query)
.bind(limit as i64)
.fetch_all(&self.pool)
.await
};
match fts_candidates {
Ok(rows) if !rows.is_empty() => rows,
Ok(_) | Err(_) => {
// FTS5 returned nothing or query syntax was invalid —
// fallback to scope-based scan (no full table scan unless no scope)
tracing::debug!("[SqliteStorage] FTS5 returned no results, falling back to scope scan");
self.fetch_by_scope_priv(options.scope.as_deref(), limit).await?
}
}
} else {
// Empty query: scope-based scan only (no FTS5 needed)
self.fetch_by_scope_priv(options.scope.as_deref(), limit).await?
};
// Convert to entries and compute semantic scores
let use_embedding = {
let scorer = self.scorer.read().await;
scorer.is_embedding_available()
};
let mut scored_entries: Vec<(f32, MemoryEntry)> = Vec::new();
for row in rows {
let entry = self.row_to_entry(&row);
// Compute semantic score: use embedding when available, fallback to TF-IDF
let semantic_score = if use_embedding {
let scorer = self.scorer.read().await;
let tfidf_score = scorer.score_similarity(query, &entry);
let entry_embedding = scorer.get_entry_embedding(&entry.uri);
drop(scorer);
match entry_embedding {
Some(entry_emb) => {
// Try embedding the query for hybrid scoring
let embedding_client = {
let scorer2 = self.scorer.read().await;
scorer2.get_embedding_client()
};
match embedding_client.embed(query).await {
Ok(query_emb) => {
let emb_score = SemanticScorer::cosine_similarity_embedding(&query_emb, &entry_emb);
// Hybrid: 70% embedding + 30% TF-IDF
emb_score * 0.7 + tfidf_score * 0.3
}
Err(_) => {
tracing::debug!("[SqliteStorage] Query embedding failed, using TF-IDF only");
tfidf_score
}
}
}
None => tfidf_score,
}
} else {
let scorer = self.scorer.read().await;
scorer.score_similarity(query, &entry)
};
// Apply similarity threshold
if let Some(min_similarity) = options.min_similarity {
if semantic_score < min_similarity {
continue;
}
}
scored_entries.push((semantic_score, entry));
}
// Sort by score (descending), then by importance and access count
scored_entries.sort_by(|a, b| {
b.0.partial_cmp(&a.0)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| b.1.importance.cmp(&a.1.importance))
.then_with(|| b.1.access_count.cmp(&a.1.access_count))
});
// Apply limit
if let Some(limit) = options.limit {
scored_entries.truncate(limit);
}
Ok(scored_entries.into_iter().map(|(_, entry)| entry).collect())
}
async fn find_by_prefix(&self, prefix: &str) -> Result<Vec<MemoryEntry>> {
let rows = self.fetch_by_scope_priv(Some(prefix), 100).await?;
let entries = rows.iter().map(|row| self.row_to_entry(row)).collect();
Ok(entries)
}
async fn delete(&self, uri: &str) -> Result<()> {
sqlx::query("DELETE FROM memories WHERE uri = ?")
.bind(uri)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to delete memory: {}", e)))?;
// Remove from FTS index
let _ = sqlx::query("DELETE FROM memories_fts WHERE uri = ?")
.bind(uri)
.execute(&self.pool)
.await;
// Remove from in-memory scorer
let mut scorer = self.scorer.write().await;
scorer.remove_entry(uri);
tracing::debug!("[SqliteStorage] Deleted memory: {}", uri);
Ok(())
}
async fn store_metadata_json(&self, key: &str, json: &str) -> Result<()> {
sqlx::query(
r#"
INSERT OR REPLACE INTO metadata (key, json)
VALUES (?, ?)
"#,
)
.bind(key)
.bind(json)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to store metadata: {}", e)))?;
Ok(())
}
async fn get_metadata_json(&self, key: &str) -> Result<Option<String>> {
let result = sqlx::query_scalar::<_, String>("SELECT json FROM metadata WHERE key = ?")
.bind(key)
.fetch_optional(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to get metadata: {}", e)))?;
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::MemoryType;
#[tokio::test]
async fn test_sqlite_storage_store_and_get() {
let storage = SqliteStorage::in_memory().await;
let entry = MemoryEntry::new(
"test-agent",
MemoryType::Preference,
"style",
"User prefers concise responses".to_string(),
);
storage.store(&entry).await.unwrap();
let retrieved = storage.get(&entry.uri).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().content, "User prefers concise responses");
}
#[tokio::test]
async fn test_sqlite_storage_semantic_search() {
let storage = SqliteStorage::in_memory().await;
// Store entries with different content
let entry1 = MemoryEntry::new(
"agent-1",
MemoryType::Knowledge,
"rust",
"Rust is a systems programming language focused on safety".to_string(),
).with_keywords(vec!["rust".to_string(), "programming".to_string(), "safety".to_string()]);
let entry2 = MemoryEntry::new(
"agent-1",
MemoryType::Knowledge,
"python",
"Python is a high-level programming language".to_string(),
).with_keywords(vec!["python".to_string(), "programming".to_string()]);
storage.store(&entry1).await.unwrap();
storage.store(&entry2).await.unwrap();
// Search for "rust safety"
let results = storage.find(
"rust safety",
FindOptions {
scope: Some("agent://agent-1".to_string()),
limit: Some(10),
min_similarity: Some(0.1),
},
).await.unwrap();
// Should find the Rust entry with higher score
assert!(!results.is_empty());
assert!(results[0].content.contains("Rust"));
}
#[tokio::test]
async fn test_sqlite_storage_delete() {
let storage = SqliteStorage::in_memory().await;
let entry = MemoryEntry::new(
"test-agent",
MemoryType::Preference,
"style",
"test".to_string(),
);
storage.store(&entry).await.unwrap();
storage.delete(&entry.uri).await.unwrap();
let retrieved = storage.get(&entry.uri).await.unwrap();
assert!(retrieved.is_none());
}
#[tokio::test]
async fn test_persistence() {
let path = std::env::temp_dir().join("zclaw_test_memories.db");
// Clean up any existing test db
let _ = std::fs::remove_file(&path);
// Create and store
{
let storage = SqliteStorage::new(&path).await.unwrap();
let entry = MemoryEntry::new(
"persist-test",
MemoryType::Knowledge,
"test",
"This should persist".to_string(),
);
storage.store(&entry).await.unwrap();
}
// Reopen and verify
{
let storage = SqliteStorage::new(&path).await.unwrap();
let results = storage.find_by_prefix("agent://persist-test").await.unwrap();
assert!(!results.is_empty());
assert_eq!(results[0].content, "This should persist");
}
// Clean up
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn test_metadata_storage() {
let storage = SqliteStorage::in_memory().await;
let json = r#"{"test": "value"}"#;
storage.store_metadata_json("test-key", json).await.unwrap();
let retrieved = storage.get_metadata_json("test-key").await.unwrap();
assert_eq!(retrieved, Some(json.to_string()));
}
#[tokio::test]
async fn test_access_count() {
let storage = SqliteStorage::in_memory().await;
let entry = MemoryEntry::new(
"test-agent",
MemoryType::Knowledge,
"test",
"test content".to_string(),
);
storage.store(&entry).await.unwrap();
// Access multiple times
for _ in 0..3 {
let _ = storage.get(&entry.uri).await.unwrap();
}
let retrieved = storage.get(&entry.uri).await.unwrap().unwrap();
assert!(retrieved.access_count >= 3);
}
}