Files
zclaw_openfang/crates/zclaw-growth/src/storage/sqlite.rs
iven e10549a1b9 fix: 发布前审计 Batch 2 — Debug遮蔽 + unwrap + 静默吞错 + MCP锁 + 索引 + Config验证
安全:
- LlmConfig 自定义 Debug impl,api_key 显示为 "***REDACTED***"
- tsconfig.json 移除 ErrorBoundary.tsx 排除项(安全关键组件)
- billing/handlers.rs Response builder unwrap → map_err 错误传播
- classroom_commands/mod.rs db_path.parent().unwrap() → ok_or_else

静默吞错:
- approvals.rs 3处 warn→error(审批状态丢失是严重事件)
- events.rs publish() 添加 Event dropped debug 日志
- mcp_transport.rs eprintln→tracing::warn (僵尸进程风险)
- zclaw-growth sqlite.rs 4处迁移:区分 duplicate column name 与真实错误

MCP Transport:
- 合并 stdin+stdout 为单一 Mutex<TransportHandles>
- send_request write-then-read 原子化,防止并发响应错配

数据库:
- 新迁移 20260418000001: idx_rle_created_at + idx_billing_sub_plan + idx_ki_created_by

配置验证:
- SaaSConfig::load() 添加 jwt_expiration_hours>=1, max_connections>0, min<=max
2026-04-18 14:09:36 +08:00

1116 lines
41 KiB
Rust

//! SQLite Storage Backend
//!
//! Persistent storage backend using SQLite for production use.
//! Provides efficient querying and full-text search capabilities.
use crate::retrieval::semantic::{EmbeddingClient, SemanticScorer};
use crate::types::MemoryEntry;
use crate::viking_adapter::{FindOptions, VikingStorage};
use async_trait::async_trait;
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions, SqliteRow};
use sqlx::Row;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use zclaw_types::Result;
use zclaw_types::ZclawError;
/// SQLite storage backend with TF-IDF semantic scoring
pub struct SqliteStorage {
/// Database connection pool
pool: SqlitePool,
/// Semantic scorer for similarity computation
scorer: Arc<RwLock<SemanticScorer>>,
/// Database path (for reference)
#[allow(dead_code)]
path: PathBuf,
}
/// Database row structure for memory entry
pub(crate) struct MemoryRow {
uri: String,
memory_type: String,
content: String,
keywords: String,
importance: i32,
access_count: i32,
created_at: String,
last_accessed: String,
overview: Option<String>,
abstract_summary: Option<String>,
}
impl SqliteStorage {
/// Get a reference to the underlying connection pool
pub fn pool(&self) -> &SqlitePool {
&self.pool
}
/// Create a new SQLite storage at the given path
pub async fn new(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
// Ensure parent directory exists
if let Some(parent) = path.parent() {
if parent.to_str() != Some(":memory:") {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
ZclawError::StorageError(format!("Failed to create storage directory: {}", e))
})?;
}
}
// Build connection string
let db_url = if path.to_str() == Some(":memory:") {
"sqlite::memory:".to_string()
} else {
format!("sqlite:{}?mode=rwc", path.to_string_lossy())
};
// Create connection pool
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect(&db_url)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to connect to database: {}", e)))?;
let storage = Self {
pool,
scorer: Arc::new(RwLock::new(SemanticScorer::new())),
path,
};
storage.initialize_schema().await?;
storage.warmup_scorer().await?;
Ok(storage)
}
/// Create an in-memory SQLite database (for testing)
pub async fn in_memory() -> Self {
Self::new(":memory:").await.expect("Failed to create in-memory database")
}
/// Configure embedding client for semantic search
/// Replaces the current scorer with a new one that has embedding support
pub async fn configure_embedding(
&self,
client: Arc<dyn EmbeddingClient>,
) -> Result<()> {
let new_scorer = SemanticScorer::with_embedding(client);
let mut scorer = self.scorer.write().await;
*scorer = new_scorer;
tracing::info!("[SqliteStorage] Embedding client configured, re-indexing with embeddings...");
self.warmup_scorer_with_embedding().await
}
/// Check if embedding is available
pub async fn is_embedding_available(&self) -> bool {
let scorer = self.scorer.read().await;
scorer.is_embedding_available()
}
/// Initialize database schema with FTS5
async fn initialize_schema(&self) -> Result<()> {
// Create main memories table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS memories (
uri TEXT PRIMARY KEY,
memory_type TEXT NOT NULL,
content TEXT NOT NULL,
keywords TEXT NOT NULL DEFAULT '[]',
importance INTEGER NOT NULL DEFAULT 5,
access_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
last_accessed TEXT NOT NULL
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create memories table: {}", e)))?;
// Create FTS5 virtual table for full-text search
// Use trigram tokenizer for CJK (Chinese/Japanese/Korean) support.
// unicode61 cannot tokenize CJK characters, causing memory search to fail.
// trigram indexes overlapping 3-character slices, works well for all languages.
sqlx::query(
r#"
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
uri,
content,
keywords,
tokenize='trigram'
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create FTS5 table: {}", e)))?;
// Create index on memory_type for filtering
sqlx::query("CREATE INDEX IF NOT EXISTS idx_memory_type ON memories(memory_type)")
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create index: {}", e)))?;
// Create index on importance for sorting
sqlx::query("CREATE INDEX IF NOT EXISTS idx_importance ON memories(importance DESC)")
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create importance index: {}", e)))?;
// Migration: add overview column (L1 summary)
// SQLite ALTER TABLE ADD COLUMN fails with "duplicate column name" if already applied
if let Err(e) = sqlx::query("ALTER TABLE memories ADD COLUMN overview TEXT")
.execute(&self.pool)
.await
{
let msg = e.to_string();
if !msg.contains("duplicate column name") {
tracing::warn!("[Growth] Migration overview failed: {}", msg);
}
}
// Migration: add abstract_summary column (L0 keywords)
if let Err(e) = sqlx::query("ALTER TABLE memories ADD COLUMN abstract_summary TEXT")
.execute(&self.pool)
.await
{
let msg = e.to_string();
if !msg.contains("duplicate column name") {
tracing::warn!("[Growth] Migration abstract_summary failed: {}", msg);
}
}
// P2-24: Migration — content fingerprint for deduplication
if let Err(e) = sqlx::query("ALTER TABLE memories ADD COLUMN content_hash TEXT")
.execute(&self.pool)
.await
{
let msg = e.to_string();
if !msg.contains("duplicate column name") {
tracing::warn!("[Growth] Migration content_hash failed: {}", msg);
}
}
if let Err(e) = sqlx::query("CREATE INDEX IF NOT EXISTS idx_content_hash ON memories(content_hash)")
.execute(&self.pool)
.await
{
tracing::warn!("[Growth] Migration idx_content_hash failed: {}", e);
}
// Backfill content_hash for existing entries that have NULL content_hash
{
use std::hash::{Hash, Hasher};
let rows: Vec<(String, String)> = sqlx::query_as(
"SELECT uri, content FROM memories WHERE content_hash IS NULL"
)
.fetch_all(&self.pool)
.await
.unwrap_or_default();
if !rows.is_empty() {
for (uri, content) in &rows {
let normalized = content.trim().to_lowercase();
let mut hasher = std::collections::hash_map::DefaultHasher::new();
normalized.hash(&mut hasher);
let hash = format!("{:016x}", hasher.finish());
let _ = sqlx::query("UPDATE memories SET content_hash = ? WHERE uri = ?")
.bind(&hash)
.bind(uri)
.execute(&self.pool)
.await;
}
tracing::info!(
"[SqliteStorage] Backfilled content_hash for {} existing entries",
rows.len()
);
}
}
// Create metadata table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
json TEXT NOT NULL
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to create metadata table: {}", e)))?;
// Migration: Rebuild FTS5 table if using old unicode61 tokenizer (can't handle CJK)
// Check tokenizer by inspecting the existing FTS5 table definition
let needs_rebuild: bool = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='memories_fts' AND sql LIKE '%unicode61%'"
)
.fetch_one(&self.pool)
.await
.unwrap_or(0) > 0;
if needs_rebuild {
tracing::info!("[SqliteStorage] Rebuilding FTS5 table: unicode61 → trigram for CJK support");
// Drop old FTS5 table
let _ = sqlx::query("DROP TABLE IF EXISTS memories_fts")
.execute(&self.pool)
.await;
// Recreate with trigram tokenizer
sqlx::query(
r#"
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
uri,
content,
keywords,
tokenize='trigram'
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to recreate FTS5 table: {}", e)))?;
// Reindex all existing memories into FTS5
let reindexed = sqlx::query(
"INSERT INTO memories_fts (uri, content, keywords) SELECT uri, content, keywords FROM memories"
)
.execute(&self.pool)
.await
.map(|r| r.rows_affected())
.unwrap_or(0);
tracing::info!("[SqliteStorage] FTS5 rebuild complete, reindexed {} entries", reindexed);
}
tracing::info!("[SqliteStorage] Database schema initialized");
Ok(())
}
/// Warmup semantic scorer with existing entries
async fn warmup_scorer(&self) -> Result<()> {
let rows = sqlx::query_as::<_, MemoryRow>(
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories"
)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to load memories for warmup: {}", e)))?;
let mut scorer = self.scorer.write().await;
for row in rows {
let entry = self.row_to_entry(&row);
scorer.index_entry(&entry);
}
let stats = scorer.stats();
tracing::info!(
"[SqliteStorage] Warmed up scorer with {} entries, {} terms",
stats.indexed_entries,
stats.unique_terms
);
Ok(())
}
/// Warmup semantic scorer with embedding support for existing entries
async fn warmup_scorer_with_embedding(&self) -> Result<()> {
let rows = sqlx::query_as::<_, MemoryRow>(
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories"
)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to load memories for warmup: {}", e)))?;
let mut scorer = self.scorer.write().await;
for row in rows {
let entry = self.row_to_entry(&row);
scorer.index_entry_with_embedding(&entry).await;
}
let stats = scorer.stats();
tracing::info!(
"[SqliteStorage] Warmed up scorer with {} entries ({} with embeddings), {} terms",
stats.indexed_entries,
stats.embedding_entries,
stats.unique_terms
);
Ok(())
}
/// Convert database row to MemoryEntry
fn row_to_entry(&self, row: &MemoryRow) -> MemoryEntry {
let memory_type = crate::types::MemoryType::parse(&row.memory_type);
let keywords: Vec<String> = serde_json::from_str(&row.keywords).unwrap_or_default();
let created_at = chrono::DateTime::parse_from_rfc3339(&row.created_at)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
let last_accessed = chrono::DateTime::parse_from_rfc3339(&row.last_accessed)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
MemoryEntry {
uri: row.uri.clone(),
memory_type,
content: row.content.clone(),
keywords,
importance: row.importance as u8,
access_count: row.access_count as u32,
created_at,
last_accessed,
overview: row.overview.clone(),
abstract_summary: row.abstract_summary.clone(),
}
}
/// Update access count and last accessed time
async fn touch_entry(&self, uri: &str) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE uri = ?"
)
.bind(&now)
.bind(uri)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to update access count: {}", e)))?;
Ok(())
}
/// Decay stale memories: reduce importance for long-unaccessed entries
/// and archive those below the minimum threshold.
///
/// - For every `decay_interval_days` since last access, importance drops by 1.
/// - Memories with importance ≤ `archive_threshold` are deleted.
pub async fn decay_memories(
&self,
decay_interval_days: u32,
archive_threshold: u8,
) -> crate::types::DecayResult {
// Step 1: Reduce importance of stale memories
let decay_result = sqlx::query(
r#"
UPDATE memories
SET importance = MAX(1, importance - CAST(
(julianday('now') - julianday(last_accessed)) / ? AS INTEGER
))
WHERE last_accessed < datetime('now', '-' || ? || ' days')
AND importance > 1
"#,
)
.bind(decay_interval_days)
.bind(decay_interval_days)
.execute(&self.pool)
.await;
let decayed = decay_result
.map(|r| r.rows_affected())
.unwrap_or(0);
// Step 2: Remove memories that fell below archive threshold
// and haven't been accessed in 90+ days
let archive_result = sqlx::query(
r#"
DELETE FROM memories
WHERE importance <= ?
AND last_accessed < datetime('now', '-90 days')
"#,
)
.bind(archive_threshold as i32)
.execute(&self.pool)
.await;
// Also clean up FTS entries for archived memories
let _ = sqlx::query(
r#"
DELETE FROM memories_fts
WHERE uri NOT IN (SELECT uri FROM memories)
"#,
)
.execute(&self.pool)
.await;
let archived = archive_result
.map(|r| r.rows_affected())
.unwrap_or(0);
if decayed > 0 || archived > 0 {
tracing::info!(
"[SqliteStorage] Memory decay: {} decayed, {} archived",
decayed,
archived
);
}
crate::types::DecayResult { decayed, archived }
}
}
impl sqlx::FromRow<'_, SqliteRow> for MemoryRow {
fn from_row(row: &SqliteRow) -> sqlx::Result<Self> {
Ok(MemoryRow {
uri: row.try_get("uri")?,
memory_type: row.try_get("memory_type")?,
content: row.try_get("content")?,
keywords: row.try_get("keywords")?,
importance: row.try_get("importance")?,
access_count: row.try_get("access_count")?,
created_at: row.try_get("created_at")?,
last_accessed: row.try_get("last_accessed")?,
overview: row.try_get("overview").ok(),
abstract_summary: row.try_get("abstract_summary").ok(),
})
}
}
/// Private helper methods on SqliteStorage (NOT in impl VikingStorage block)
impl SqliteStorage {
/// Sanitize a user query for FTS5 MATCH syntax.
///
/// FTS5 treats several characters as operators (`+`, `-`, `*`, `"`, `(`, `)`, `:`).
/// Strips these and keeps only alphanumeric + CJK tokens with length > 1,
/// then joins them with `OR` for broad matching.
fn sanitize_fts_query(query: &str) -> String {
// trigram tokenizer requires quoted phrases for substring matching
// and needs at least 3 characters per term to produce results.
let lower = query.to_lowercase();
// Check if query contains CJK characters — trigram handles them natively
let has_cjk = lower.chars().any(|c| {
matches!(c, '\u{4E00}'..='\u{9FFF}' | '\u{3400}'..='\u{4DBF}' | '\u{F900}'..='\u{FAFF}')
});
if has_cjk {
// For CJK queries, extract tokens: CJK character sequences and ASCII words.
// Join with OR for broad matching (not exact phrase, which would miss scattered terms).
let mut tokens: Vec<String> = Vec::new();
let mut cjk_buf = String::new();
let mut ascii_buf = String::new();
for ch in lower.chars() {
let is_cjk = matches!(ch, '\u{4E00}'..='\u{9FFF}' | '\u{3400}'..='\u{4DBF}' | '\u{F900}'..='\u{FAFF}');
if is_cjk {
if !ascii_buf.is_empty() {
if ascii_buf.len() >= 2 {
tokens.push(format!("\"{}\"", ascii_buf));
}
ascii_buf.clear();
}
cjk_buf.push(ch);
} else if ch.is_alphanumeric() {
if !cjk_buf.is_empty() {
// Flush CJK buffer — each CJK character is a potential token
// (trigram indexes 3-char sequences, so single CJK chars won't
// match alone, but 2+ char sequences will)
if cjk_buf.len() >= 2 {
tokens.push(format!("\"{}\"", cjk_buf));
}
cjk_buf.clear();
}
ascii_buf.push(ch);
} else {
// Separator — flush both buffers
if cjk_buf.len() >= 2 {
tokens.push(format!("\"{}\"", cjk_buf));
}
cjk_buf.clear();
if ascii_buf.len() >= 2 {
tokens.push(format!("\"{}\"", ascii_buf));
}
ascii_buf.clear();
}
}
// Flush remaining
if cjk_buf.len() >= 2 {
tokens.push(format!("\"{}\"", cjk_buf));
}
if ascii_buf.len() >= 2 {
tokens.push(format!("\"{}\"", ascii_buf));
}
if tokens.is_empty() {
return String::new();
}
tokens.join(" OR ")
} else {
// For non-CJK, split into terms and join with OR
let terms: Vec<String> = lower
.split(|c: char| !c.is_alphanumeric())
.filter(|s| !s.is_empty() && s.len() > 1)
.map(|s| format!("\"{}\"", s))
.collect();
if terms.is_empty() {
return String::new();
}
terms.join(" OR ")
}
}
/// Fetch memories by scope with importance-based ordering.
/// Used internally by find() for scope-based queries.
pub(crate) async fn fetch_by_scope_priv(&self, scope: Option<&str>, limit: usize) -> Result<Vec<MemoryRow>> {
let rows = if let Some(scope) = scope {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary
FROM memories
WHERE uri LIKE ?
ORDER BY importance DESC, access_count DESC
LIMIT ?
"#
)
.bind(format!("{}%", scope))
.bind(limit as i64)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to fetch by scope: {}", e)))?
} else {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary
FROM memories
ORDER BY importance DESC
LIMIT ?
"#
)
.bind(limit as i64)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to fetch by scope: {}", e)))?
};
Ok(rows)
}
}
#[async_trait]
impl VikingStorage for SqliteStorage {
async fn store(&self, entry: &MemoryEntry) -> Result<()> {
let keywords_json = serde_json::to_string(&entry.keywords)
.map_err(|e| ZclawError::StorageError(format!("Failed to serialize keywords: {}", e)))?;
let created_at = entry.created_at.to_rfc3339();
let last_accessed = entry.last_accessed.to_rfc3339();
let memory_type = entry.memory_type.to_string();
// P2-24: Content-hash deduplication
let normalized_content = entry.content.trim().to_lowercase();
let content_hash = {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
normalized_content.hash(&mut hasher);
format!("{:016x}", hasher.finish())
};
// Check for existing entry with the same content hash (within same agent scope)
let agent_scope = entry.uri.split('/').nth(2).unwrap_or("");
let existing: Option<(String, i32, i32)> = sqlx::query_as::<_, (String, i32, i32)>(
"SELECT uri, importance, access_count FROM memories WHERE content_hash = ? AND uri LIKE ? LIMIT 1"
)
.bind(&content_hash)
.bind(format!("agent://{agent_scope}/%"))
.fetch_optional(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Dedup check failed: {}", e)))?;
if let Some((existing_uri, existing_importance, existing_access)) = existing {
// Merge: keep higher importance, bump access count, update last_accessed
let merged_importance = existing_importance.max(entry.importance as i32);
let merged_access = existing_access + 1;
sqlx::query(
"UPDATE memories SET importance = ?, access_count = ?, last_accessed = ? WHERE uri = ?"
)
.bind(merged_importance)
.bind(merged_access)
.bind(&last_accessed)
.bind(&existing_uri)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Dedup merge failed: {}", e)))?;
tracing::debug!(
"[SqliteStorage] Dedup: merged '{}' into existing '{}' (importance={}, access_count={})",
entry.uri, existing_uri, merged_importance, merged_access
);
return Ok(());
}
// Insert into main table
sqlx::query(
r#"
INSERT OR REPLACE INTO memories
(uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary, content_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&entry.uri)
.bind(&memory_type)
.bind(&entry.content)
.bind(&keywords_json)
.bind(entry.importance as i32)
.bind(entry.access_count as i32)
.bind(&created_at)
.bind(&last_accessed)
.bind(&entry.overview)
.bind(&entry.abstract_summary)
.bind(&content_hash)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to store memory: {}", e)))?;
// Update FTS index - delete old and insert new
let _ = sqlx::query("DELETE FROM memories_fts WHERE uri = ?")
.bind(&entry.uri)
.execute(&self.pool)
.await
.map_err(|e| {
tracing::warn!("[SqliteStorage] Failed to delete old FTS entry: {}", e);
});
let keywords_text = entry.keywords.join(" ");
let _ = sqlx::query(
r#"
INSERT INTO memories_fts (uri, content, keywords)
VALUES (?, ?, ?)
"#,
)
.bind(&entry.uri)
.bind(&entry.content)
.bind(&keywords_text)
.execute(&self.pool)
.await
.map_err(|e| {
tracing::warn!("[SqliteStorage] Failed to insert FTS entry: {}", e);
});
// Update semantic scorer (use embedding when available)
let mut scorer = self.scorer.write().await;
if scorer.is_embedding_available() {
scorer.index_entry_with_embedding(entry).await;
} else {
scorer.index_entry(entry);
}
tracing::debug!("[SqliteStorage] Stored memory: {}", entry.uri);
Ok(())
}
async fn get(&self, uri: &str) -> Result<Option<MemoryEntry>> {
let row = sqlx::query_as::<_, MemoryRow>(
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories WHERE uri = ?"
)
.bind(uri)
.fetch_optional(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to get memory: {}", e)))?;
if let Some(row) = row {
let entry = self.row_to_entry(&row);
// Update access count
self.touch_entry(&entry.uri).await?;
Ok(Some(entry))
} else {
Ok(None)
}
}
async fn find(&self, query: &str, options: FindOptions) -> Result<Vec<MemoryEntry>> {
let limit = options.limit.unwrap_or(50).max(20); // Fetch more candidates for reranking
// Strategy: use FTS5 for initial filtering when query is non-empty,
// then score candidates with TF-IDF / embedding for precise ranking.
// When FTS5 returns nothing, we return empty — do NOT fall back to
// scope scan (that returns irrelevant high-importance memories).
let rows = if !query.is_empty() {
// Sanitize query for FTS5: strip operators that cause syntax errors
let sanitized = Self::sanitize_fts_query(query);
if sanitized.is_empty() {
// Query had no meaningful terms after sanitization (e.g., "1+2")
tracing::debug!(
"[SqliteStorage] Query '{}' produced no FTS5-searchable terms, skipping",
query.chars().take(50).collect::<String>()
);
return Ok(Vec::new());
}
// FTS5-powered candidate retrieval (fast, index-based)
let fts_candidates = if let Some(ref scope) = options.scope {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT m.uri, m.memory_type, m.content, m.keywords, m.importance,
m.access_count, m.created_at, m.last_accessed, m.overview, m.abstract_summary
FROM memories m
INNER JOIN memories_fts f ON m.uri = f.uri
WHERE f.memories_fts MATCH ?
AND m.uri LIKE ?
ORDER BY f.rank
LIMIT ?
"#
)
.bind(&sanitized)
.bind(format!("{}%", scope))
.bind(limit as i64)
.fetch_all(&self.pool)
.await
} else {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT m.uri, m.memory_type, m.content, m.keywords, m.importance,
m.access_count, m.created_at, m.last_accessed, m.overview, m.abstract_summary
FROM memories m
INNER JOIN memories_fts f ON m.uri = f.uri
WHERE f.memories_fts MATCH ?
ORDER BY f.rank
LIMIT ?
"#
)
.bind(&sanitized)
.bind(limit as i64)
.fetch_all(&self.pool)
.await
};
match fts_candidates {
Ok(rows) if !rows.is_empty() => rows,
Ok(_) | Err(_) => {
// FTS5 returned no results or failed — check if query contains CJK
// characters. unicode61 tokenizer doesn't index CJK, so fall back
// to LIKE-based search for CJK queries.
let has_cjk = query.chars().any(|c| {
matches!(c, '\u{4E00}'..='\u{9FFF}' | '\u{3400}'..='\u{4DBF}' | '\u{F900}'..='\u{FAFF}')
});
if !has_cjk {
tracing::debug!(
"[SqliteStorage] FTS5 returned no results for query: '{}'",
query.chars().take(50).collect::<String>()
);
return Ok(Vec::new());
}
tracing::debug!(
"[SqliteStorage] FTS5 miss for CJK query, falling back to LIKE: '{}'",
query.chars().take(50).collect::<String>()
);
let pattern = format!("%{}%", query);
if let Some(ref scope) = options.scope {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT uri, memory_type, content, keywords, importance,
access_count, created_at, last_accessed, overview, abstract_summary
FROM memories
WHERE content LIKE ?
AND uri LIKE ?
ORDER BY importance DESC, access_count DESC
LIMIT ?
"#
)
.bind(&pattern)
.bind(format!("{}%", scope))
.bind(limit as i64)
.fetch_all(&self.pool)
.await
.unwrap_or_default()
} else {
sqlx::query_as::<_, MemoryRow>(
r#"
SELECT uri, memory_type, content, keywords, importance,
access_count, created_at, last_accessed, overview, abstract_summary
FROM memories
WHERE content LIKE ?
ORDER BY importance DESC, access_count DESC
LIMIT ?
"#
)
.bind(&pattern)
.bind(limit as i64)
.fetch_all(&self.pool)
.await
.unwrap_or_default()
}
}
}
} else {
// Empty query: scope-based scan only (no FTS5 needed)
self.fetch_by_scope_priv(options.scope.as_deref(), limit).await?
};
// Convert to entries and compute semantic scores
let use_embedding = {
let scorer = self.scorer.read().await;
scorer.is_embedding_available()
};
let mut scored_entries: Vec<(f32, f32, MemoryEntry)> = Vec::new();
for row in rows {
let entry = self.row_to_entry(&row);
// Compute semantic score: use embedding when available, fallback to TF-IDF
let semantic_score = if use_embedding {
let scorer = self.scorer.read().await;
let tfidf_score = scorer.score_similarity(query, &entry);
let entry_embedding = scorer.get_entry_embedding(&entry.uri);
drop(scorer);
match entry_embedding {
Some(entry_emb) => {
// Try embedding the query for hybrid scoring
let embedding_client = {
let scorer2 = self.scorer.read().await;
scorer2.get_embedding_client()
};
match embedding_client.embed(query).await {
Ok(query_emb) => {
let emb_score = SemanticScorer::cosine_similarity_embedding(&query_emb, &entry_emb);
// Hybrid: 70% embedding + 30% TF-IDF
emb_score * 0.7 + tfidf_score * 0.3
}
Err(_) => {
tracing::debug!("[SqliteStorage] Query embedding failed, using TF-IDF only");
tfidf_score
}
}
}
None => tfidf_score,
}
} else {
let scorer = self.scorer.read().await;
scorer.score_similarity(query, &entry)
};
// Apply similarity threshold
if let Some(min_similarity) = options.min_similarity {
if semantic_score < min_similarity {
continue;
}
}
// Apply time decay to importance before final scoring
let time_decayed_importance = crate::types::effective_importance(&entry);
scored_entries.push((semantic_score, time_decayed_importance, entry));
}
// Sort by: semantic score → time-decayed importance → access count (all descending)
scored_entries.sort_by(|a, b| {
b.0.partial_cmp(&a.0)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal))
.then_with(|| b.2.access_count.cmp(&a.2.access_count))
});
// Apply limit
if let Some(limit) = options.limit {
scored_entries.truncate(limit);
}
Ok(scored_entries.into_iter().map(|(_, _, entry)| entry).collect())
}
async fn find_by_prefix(&self, prefix: &str) -> Result<Vec<MemoryEntry>> {
let rows = self.fetch_by_scope_priv(Some(prefix), 100).await?;
let entries = rows.iter().map(|row| self.row_to_entry(row)).collect();
Ok(entries)
}
async fn delete(&self, uri: &str) -> Result<()> {
sqlx::query("DELETE FROM memories WHERE uri = ?")
.bind(uri)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to delete memory: {}", e)))?;
// Remove from FTS index
let _ = sqlx::query("DELETE FROM memories_fts WHERE uri = ?")
.bind(uri)
.execute(&self.pool)
.await
.map_err(|e| {
tracing::warn!("[SqliteStorage] Failed to delete FTS entry: {}", e);
});
// Remove from in-memory scorer
let mut scorer = self.scorer.write().await;
scorer.remove_entry(uri);
tracing::debug!("[SqliteStorage] Deleted memory: {}", uri);
Ok(())
}
async fn store_metadata_json(&self, key: &str, json: &str) -> Result<()> {
sqlx::query(
r#"
INSERT OR REPLACE INTO metadata (key, json)
VALUES (?, ?)
"#,
)
.bind(key)
.bind(json)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to store metadata: {}", e)))?;
Ok(())
}
async fn get_metadata_json(&self, key: &str) -> Result<Option<String>> {
let result = sqlx::query_scalar::<_, String>("SELECT json FROM metadata WHERE key = ?")
.bind(key)
.fetch_optional(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(format!("Failed to get metadata: {}", e)))?;
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::MemoryType;
#[tokio::test]
async fn test_sqlite_storage_store_and_get() {
let storage = SqliteStorage::in_memory().await;
let entry = MemoryEntry::new(
"test-agent",
MemoryType::Preference,
"style",
"User prefers concise responses".to_string(),
);
storage.store(&entry).await.unwrap();
let retrieved = storage.get(&entry.uri).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().content, "User prefers concise responses");
}
#[tokio::test]
async fn test_sqlite_storage_semantic_search() {
let storage = SqliteStorage::in_memory().await;
// Store entries with different content
let entry1 = MemoryEntry::new(
"agent-1",
MemoryType::Knowledge,
"rust",
"Rust is a systems programming language focused on safety".to_string(),
).with_keywords(vec!["rust".to_string(), "programming".to_string(), "safety".to_string()]);
let entry2 = MemoryEntry::new(
"agent-1",
MemoryType::Knowledge,
"python",
"Python is a high-level programming language".to_string(),
).with_keywords(vec!["python".to_string(), "programming".to_string()]);
storage.store(&entry1).await.unwrap();
storage.store(&entry2).await.unwrap();
// Search for "rust safety"
let results = storage.find(
"rust safety",
FindOptions {
scope: Some("agent://agent-1".to_string()),
limit: Some(10),
min_similarity: Some(0.1),
},
).await.unwrap();
// Should find the Rust entry with higher score
assert!(!results.is_empty());
assert!(results[0].content.contains("Rust"));
}
#[tokio::test]
async fn test_sqlite_storage_delete() {
let storage = SqliteStorage::in_memory().await;
let entry = MemoryEntry::new(
"test-agent",
MemoryType::Preference,
"style",
"test".to_string(),
);
storage.store(&entry).await.unwrap();
storage.delete(&entry.uri).await.unwrap();
let retrieved = storage.get(&entry.uri).await.unwrap();
assert!(retrieved.is_none());
}
#[tokio::test]
async fn test_persistence() {
let path = std::env::temp_dir().join("zclaw_test_memories.db");
// Clean up any existing test db
let _ = std::fs::remove_file(&path);
// Create and store
{
let storage = SqliteStorage::new(&path).await.unwrap();
let entry = MemoryEntry::new(
"persist-test",
MemoryType::Knowledge,
"test",
"This should persist".to_string(),
);
storage.store(&entry).await.unwrap();
}
// Reopen and verify
{
let storage = SqliteStorage::new(&path).await.unwrap();
let results = storage.find_by_prefix("agent://persist-test").await.unwrap();
assert!(!results.is_empty());
assert_eq!(results[0].content, "This should persist");
}
// Clean up
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn test_metadata_storage() {
let storage = SqliteStorage::in_memory().await;
let json = r#"{"test": "value"}"#;
storage.store_metadata_json("test-key", json).await.unwrap();
let retrieved = storage.get_metadata_json("test-key").await.unwrap();
assert_eq!(retrieved, Some(json.to_string()));
}
#[tokio::test]
async fn test_access_count() {
let storage = SqliteStorage::in_memory().await;
let entry = MemoryEntry::new(
"test-agent",
MemoryType::Knowledge,
"test",
"test content".to_string(),
);
storage.store(&entry).await.unwrap();
// Access multiple times
for _ in 0..3 {
let _ = storage.get(&entry.uri).await.unwrap();
}
let retrieved = storage.get(&entry.uri).await.unwrap().unwrap();
assert!(retrieved.access_count >= 3);
}
}