Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- Add effective_importance() with exponential time decay (30-day half-life) and access count boost for fair scoring of stale vs fresh memories - Add SqliteStorage::decay_memories() for periodic maintenance: reduces stored importance per interval, archives (deletes) below threshold - Update find() scoring to use time-decayed importance in sort - Add DecayResult type and effective_importance re-export in lib.rs - Remove dead frontend active-learning.ts (370 lines, zero imports)
902 lines
32 KiB
Rust
902 lines
32 KiB
Rust
//! SQLite Storage Backend
|
|
//!
|
|
//! Persistent storage backend using SQLite for production use.
|
|
//! Provides efficient querying and full-text search capabilities.
|
|
|
|
use crate::retrieval::semantic::{EmbeddingClient, SemanticScorer};
|
|
use crate::types::MemoryEntry;
|
|
use crate::viking_adapter::{FindOptions, VikingStorage};
|
|
use async_trait::async_trait;
|
|
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions, SqliteRow};
|
|
use sqlx::Row;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
use zclaw_types::Result;
|
|
use zclaw_types::ZclawError;
|
|
|
|
/// SQLite storage backend with TF-IDF semantic scoring
|
|
pub struct SqliteStorage {
|
|
/// Database connection pool
|
|
pool: SqlitePool,
|
|
/// Semantic scorer for similarity computation
|
|
scorer: Arc<RwLock<SemanticScorer>>,
|
|
/// Database path (for reference)
|
|
#[allow(dead_code)]
|
|
path: PathBuf,
|
|
}
|
|
|
|
/// Database row structure for memory entry
|
|
pub(crate) struct MemoryRow {
|
|
uri: String,
|
|
memory_type: String,
|
|
content: String,
|
|
keywords: String,
|
|
importance: i32,
|
|
access_count: i32,
|
|
created_at: String,
|
|
last_accessed: String,
|
|
overview: Option<String>,
|
|
abstract_summary: Option<String>,
|
|
}
|
|
|
|
impl SqliteStorage {
|
|
/// Create a new SQLite storage at the given path
|
|
pub async fn new(path: impl Into<PathBuf>) -> Result<Self> {
|
|
let path = path.into();
|
|
|
|
// Ensure parent directory exists
|
|
if let Some(parent) = path.parent() {
|
|
if parent.to_str() != Some(":memory:") {
|
|
tokio::fs::create_dir_all(parent).await.map_err(|e| {
|
|
ZclawError::StorageError(format!("Failed to create storage directory: {}", e))
|
|
})?;
|
|
}
|
|
}
|
|
|
|
// Build connection string
|
|
let db_url = if path.to_str() == Some(":memory:") {
|
|
"sqlite::memory:".to_string()
|
|
} else {
|
|
format!("sqlite:{}?mode=rwc", path.to_string_lossy())
|
|
};
|
|
|
|
// Create connection pool
|
|
let pool = SqlitePoolOptions::new()
|
|
.max_connections(5)
|
|
.connect(&db_url)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to connect to database: {}", e)))?;
|
|
|
|
let storage = Self {
|
|
pool,
|
|
scorer: Arc::new(RwLock::new(SemanticScorer::new())),
|
|
path,
|
|
};
|
|
|
|
storage.initialize_schema().await?;
|
|
storage.warmup_scorer().await?;
|
|
|
|
Ok(storage)
|
|
}
|
|
|
|
/// Create an in-memory SQLite database (for testing)
|
|
pub async fn in_memory() -> Self {
|
|
Self::new(":memory:").await.expect("Failed to create in-memory database")
|
|
}
|
|
|
|
/// Configure embedding client for semantic search
|
|
/// Replaces the current scorer with a new one that has embedding support
|
|
pub async fn configure_embedding(
|
|
&self,
|
|
client: Arc<dyn EmbeddingClient>,
|
|
) -> Result<()> {
|
|
let new_scorer = SemanticScorer::with_embedding(client);
|
|
let mut scorer = self.scorer.write().await;
|
|
*scorer = new_scorer;
|
|
|
|
tracing::info!("[SqliteStorage] Embedding client configured, re-indexing with embeddings...");
|
|
self.warmup_scorer_with_embedding().await
|
|
}
|
|
|
|
/// Check if embedding is available
|
|
pub async fn is_embedding_available(&self) -> bool {
|
|
let scorer = self.scorer.read().await;
|
|
scorer.is_embedding_available()
|
|
}
|
|
|
|
/// Initialize database schema with FTS5
|
|
async fn initialize_schema(&self) -> Result<()> {
|
|
// Create main memories table
|
|
sqlx::query(
|
|
r#"
|
|
CREATE TABLE IF NOT EXISTS memories (
|
|
uri TEXT PRIMARY KEY,
|
|
memory_type TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
keywords TEXT NOT NULL DEFAULT '[]',
|
|
importance INTEGER NOT NULL DEFAULT 5,
|
|
access_count INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL,
|
|
last_accessed TEXT NOT NULL
|
|
)
|
|
"#,
|
|
)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to create memories table: {}", e)))?;
|
|
|
|
// Create FTS5 virtual table for full-text search
|
|
sqlx::query(
|
|
r#"
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
|
|
uri,
|
|
content,
|
|
keywords,
|
|
tokenize='unicode61'
|
|
)
|
|
"#,
|
|
)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to create FTS5 table: {}", e)))?;
|
|
|
|
// Create index on memory_type for filtering
|
|
sqlx::query("CREATE INDEX IF NOT EXISTS idx_memory_type ON memories(memory_type)")
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to create index: {}", e)))?;
|
|
|
|
// Create index on importance for sorting
|
|
sqlx::query("CREATE INDEX IF NOT EXISTS idx_importance ON memories(importance DESC)")
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to create importance index: {}", e)))?;
|
|
|
|
// Migration: add overview column (L1 summary)
|
|
let _ = sqlx::query("ALTER TABLE memories ADD COLUMN overview TEXT")
|
|
.execute(&self.pool)
|
|
.await;
|
|
|
|
// Migration: add abstract_summary column (L0 keywords)
|
|
let _ = sqlx::query("ALTER TABLE memories ADD COLUMN abstract_summary TEXT")
|
|
.execute(&self.pool)
|
|
.await;
|
|
|
|
// Create metadata table
|
|
sqlx::query(
|
|
r#"
|
|
CREATE TABLE IF NOT EXISTS metadata (
|
|
key TEXT PRIMARY KEY,
|
|
json TEXT NOT NULL
|
|
)
|
|
"#,
|
|
)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to create metadata table: {}", e)))?;
|
|
|
|
tracing::info!("[SqliteStorage] Database schema initialized");
|
|
Ok(())
|
|
}
|
|
|
|
/// Warmup semantic scorer with existing entries
|
|
async fn warmup_scorer(&self) -> Result<()> {
|
|
let rows = sqlx::query_as::<_, MemoryRow>(
|
|
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories"
|
|
)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to load memories for warmup: {}", e)))?;
|
|
|
|
let mut scorer = self.scorer.write().await;
|
|
for row in rows {
|
|
let entry = self.row_to_entry(&row);
|
|
scorer.index_entry(&entry);
|
|
}
|
|
|
|
let stats = scorer.stats();
|
|
tracing::info!(
|
|
"[SqliteStorage] Warmed up scorer with {} entries, {} terms",
|
|
stats.indexed_entries,
|
|
stats.unique_terms
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Warmup semantic scorer with embedding support for existing entries
|
|
async fn warmup_scorer_with_embedding(&self) -> Result<()> {
|
|
let rows = sqlx::query_as::<_, MemoryRow>(
|
|
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories"
|
|
)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to load memories for warmup: {}", e)))?;
|
|
|
|
let mut scorer = self.scorer.write().await;
|
|
for row in rows {
|
|
let entry = self.row_to_entry(&row);
|
|
scorer.index_entry_with_embedding(&entry).await;
|
|
}
|
|
|
|
let stats = scorer.stats();
|
|
tracing::info!(
|
|
"[SqliteStorage] Warmed up scorer with {} entries ({} with embeddings), {} terms",
|
|
stats.indexed_entries,
|
|
stats.embedding_entries,
|
|
stats.unique_terms
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Convert database row to MemoryEntry
|
|
fn row_to_entry(&self, row: &MemoryRow) -> MemoryEntry {
|
|
let memory_type = crate::types::MemoryType::parse(&row.memory_type);
|
|
let keywords: Vec<String> = serde_json::from_str(&row.keywords).unwrap_or_default();
|
|
let created_at = chrono::DateTime::parse_from_rfc3339(&row.created_at)
|
|
.map(|dt| dt.with_timezone(&chrono::Utc))
|
|
.unwrap_or_else(|_| chrono::Utc::now());
|
|
let last_accessed = chrono::DateTime::parse_from_rfc3339(&row.last_accessed)
|
|
.map(|dt| dt.with_timezone(&chrono::Utc))
|
|
.unwrap_or_else(|_| chrono::Utc::now());
|
|
|
|
MemoryEntry {
|
|
uri: row.uri.clone(),
|
|
memory_type,
|
|
content: row.content.clone(),
|
|
keywords,
|
|
importance: row.importance as u8,
|
|
access_count: row.access_count as u32,
|
|
created_at,
|
|
last_accessed,
|
|
overview: row.overview.clone(),
|
|
abstract_summary: row.abstract_summary.clone(),
|
|
}
|
|
}
|
|
|
|
/// Update access count and last accessed time
|
|
async fn touch_entry(&self, uri: &str) -> Result<()> {
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
sqlx::query(
|
|
"UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE uri = ?"
|
|
)
|
|
.bind(&now)
|
|
.bind(uri)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to update access count: {}", e)))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Decay stale memories: reduce importance for long-unaccessed entries
|
|
/// and archive those below the minimum threshold.
|
|
///
|
|
/// - For every `decay_interval_days` since last access, importance drops by 1.
|
|
/// - Memories with importance ≤ `archive_threshold` are deleted.
|
|
pub async fn decay_memories(
|
|
&self,
|
|
decay_interval_days: u32,
|
|
archive_threshold: u8,
|
|
) -> crate::types::DecayResult {
|
|
// Step 1: Reduce importance of stale memories
|
|
let decay_result = sqlx::query(
|
|
r#"
|
|
UPDATE memories
|
|
SET importance = MAX(1, importance - CAST(
|
|
(julianday('now') - julianday(last_accessed)) / ? AS INTEGER
|
|
))
|
|
WHERE last_accessed < datetime('now', '-' || ? || ' days')
|
|
AND importance > 1
|
|
"#,
|
|
)
|
|
.bind(decay_interval_days)
|
|
.bind(decay_interval_days)
|
|
.execute(&self.pool)
|
|
.await;
|
|
|
|
let decayed = decay_result
|
|
.map(|r| r.rows_affected())
|
|
.unwrap_or(0);
|
|
|
|
// Step 2: Remove memories that fell below archive threshold
|
|
// and haven't been accessed in 90+ days
|
|
let archive_result = sqlx::query(
|
|
r#"
|
|
DELETE FROM memories
|
|
WHERE importance <= ?
|
|
AND last_accessed < datetime('now', '-90 days')
|
|
"#,
|
|
)
|
|
.bind(archive_threshold as i32)
|
|
.execute(&self.pool)
|
|
.await;
|
|
|
|
// Also clean up FTS entries for archived memories
|
|
let _ = sqlx::query(
|
|
r#"
|
|
DELETE FROM memories_fts
|
|
WHERE uri NOT IN (SELECT uri FROM memories)
|
|
"#,
|
|
)
|
|
.execute(&self.pool)
|
|
.await;
|
|
|
|
let archived = archive_result
|
|
.map(|r| r.rows_affected())
|
|
.unwrap_or(0);
|
|
|
|
if decayed > 0 || archived > 0 {
|
|
tracing::info!(
|
|
"[SqliteStorage] Memory decay: {} decayed, {} archived",
|
|
decayed,
|
|
archived
|
|
);
|
|
}
|
|
|
|
crate::types::DecayResult { decayed, archived }
|
|
}
|
|
}
|
|
|
|
impl sqlx::FromRow<'_, SqliteRow> for MemoryRow {
|
|
fn from_row(row: &SqliteRow) -> sqlx::Result<Self> {
|
|
Ok(MemoryRow {
|
|
uri: row.try_get("uri")?,
|
|
memory_type: row.try_get("memory_type")?,
|
|
content: row.try_get("content")?,
|
|
keywords: row.try_get("keywords")?,
|
|
importance: row.try_get("importance")?,
|
|
access_count: row.try_get("access_count")?,
|
|
created_at: row.try_get("created_at")?,
|
|
last_accessed: row.try_get("last_accessed")?,
|
|
overview: row.try_get("overview").ok(),
|
|
abstract_summary: row.try_get("abstract_summary").ok(),
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Private helper methods on SqliteStorage (NOT in impl VikingStorage block)
|
|
impl SqliteStorage {
|
|
/// Sanitize a user query for FTS5 MATCH syntax.
|
|
///
|
|
/// FTS5 treats several characters as operators (`+`, `-`, `*`, `"`, `(`, `)`, `:`).
|
|
/// Strips these and keeps only alphanumeric + CJK tokens with length > 1,
|
|
/// then joins them with `OR` for broad matching.
|
|
fn sanitize_fts_query(query: &str) -> String {
|
|
let terms: Vec<String> = query
|
|
.to_lowercase()
|
|
.split(|c: char| !c.is_alphanumeric())
|
|
.filter(|s| !s.is_empty() && s.len() > 1)
|
|
.map(|s| s.to_string())
|
|
.collect();
|
|
|
|
if terms.is_empty() {
|
|
return String::new();
|
|
}
|
|
|
|
// Join with OR so any term can match (broad recall, then rerank by similarity)
|
|
terms.join(" OR ")
|
|
}
|
|
|
|
/// Fetch memories by scope with importance-based ordering.
|
|
/// Used internally by find() for scope-based queries.
|
|
pub(crate) async fn fetch_by_scope_priv(&self, scope: Option<&str>, limit: usize) -> Result<Vec<MemoryRow>> {
|
|
let rows = if let Some(scope) = scope {
|
|
sqlx::query_as::<_, MemoryRow>(
|
|
r#"
|
|
SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary
|
|
FROM memories
|
|
WHERE uri LIKE ?
|
|
ORDER BY importance DESC, access_count DESC
|
|
LIMIT ?
|
|
"#
|
|
)
|
|
.bind(format!("{}%", scope))
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to fetch by scope: {}", e)))?
|
|
} else {
|
|
sqlx::query_as::<_, MemoryRow>(
|
|
r#"
|
|
SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary
|
|
FROM memories
|
|
ORDER BY importance DESC
|
|
LIMIT ?
|
|
"#
|
|
)
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to fetch by scope: {}", e)))?
|
|
};
|
|
Ok(rows)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl VikingStorage for SqliteStorage {
|
|
async fn store(&self, entry: &MemoryEntry) -> Result<()> {
|
|
let keywords_json = serde_json::to_string(&entry.keywords)
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to serialize keywords: {}", e)))?;
|
|
|
|
let created_at = entry.created_at.to_rfc3339();
|
|
let last_accessed = entry.last_accessed.to_rfc3339();
|
|
let memory_type = entry.memory_type.to_string();
|
|
|
|
// Insert into main table
|
|
sqlx::query(
|
|
r#"
|
|
INSERT OR REPLACE INTO memories
|
|
(uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"#,
|
|
)
|
|
.bind(&entry.uri)
|
|
.bind(&memory_type)
|
|
.bind(&entry.content)
|
|
.bind(&keywords_json)
|
|
.bind(entry.importance as i32)
|
|
.bind(entry.access_count as i32)
|
|
.bind(&created_at)
|
|
.bind(&last_accessed)
|
|
.bind(&entry.overview)
|
|
.bind(&entry.abstract_summary)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to store memory: {}", e)))?;
|
|
|
|
// Update FTS index - delete old and insert new
|
|
let _ = sqlx::query("DELETE FROM memories_fts WHERE uri = ?")
|
|
.bind(&entry.uri)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| {
|
|
tracing::warn!("[SqliteStorage] Failed to delete old FTS entry: {}", e);
|
|
});
|
|
|
|
let keywords_text = entry.keywords.join(" ");
|
|
let _ = sqlx::query(
|
|
r#"
|
|
INSERT INTO memories_fts (uri, content, keywords)
|
|
VALUES (?, ?, ?)
|
|
"#,
|
|
)
|
|
.bind(&entry.uri)
|
|
.bind(&entry.content)
|
|
.bind(&keywords_text)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| {
|
|
tracing::warn!("[SqliteStorage] Failed to insert FTS entry: {}", e);
|
|
});
|
|
|
|
// Update semantic scorer (use embedding when available)
|
|
let mut scorer = self.scorer.write().await;
|
|
if scorer.is_embedding_available() {
|
|
scorer.index_entry_with_embedding(entry).await;
|
|
} else {
|
|
scorer.index_entry(entry);
|
|
}
|
|
|
|
tracing::debug!("[SqliteStorage] Stored memory: {}", entry.uri);
|
|
Ok(())
|
|
}
|
|
|
|
async fn get(&self, uri: &str) -> Result<Option<MemoryEntry>> {
|
|
let row = sqlx::query_as::<_, MemoryRow>(
|
|
"SELECT uri, memory_type, content, keywords, importance, access_count, created_at, last_accessed, overview, abstract_summary FROM memories WHERE uri = ?"
|
|
)
|
|
.bind(uri)
|
|
.fetch_optional(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to get memory: {}", e)))?;
|
|
|
|
if let Some(row) = row {
|
|
let entry = self.row_to_entry(&row);
|
|
|
|
// Update access count
|
|
self.touch_entry(&entry.uri).await?;
|
|
|
|
Ok(Some(entry))
|
|
} else {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
async fn find(&self, query: &str, options: FindOptions) -> Result<Vec<MemoryEntry>> {
|
|
let limit = options.limit.unwrap_or(50).max(20); // Fetch more candidates for reranking
|
|
|
|
// Strategy: use FTS5 for initial filtering when query is non-empty,
|
|
// then score candidates with TF-IDF / embedding for precise ranking.
|
|
// When FTS5 returns nothing, we return empty — do NOT fall back to
|
|
// scope scan (that returns irrelevant high-importance memories).
|
|
let rows = if !query.is_empty() {
|
|
// Sanitize query for FTS5: strip operators that cause syntax errors
|
|
let sanitized = Self::sanitize_fts_query(query);
|
|
|
|
if sanitized.is_empty() {
|
|
// Query had no meaningful terms after sanitization (e.g., "1+2")
|
|
tracing::debug!(
|
|
"[SqliteStorage] Query '{}' produced no FTS5-searchable terms, skipping",
|
|
query.chars().take(50).collect::<String>()
|
|
);
|
|
return Ok(Vec::new());
|
|
}
|
|
|
|
// FTS5-powered candidate retrieval (fast, index-based)
|
|
let fts_candidates = if let Some(ref scope) = options.scope {
|
|
sqlx::query_as::<_, MemoryRow>(
|
|
r#"
|
|
SELECT m.uri, m.memory_type, m.content, m.keywords, m.importance,
|
|
m.access_count, m.created_at, m.last_accessed, m.overview, m.abstract_summary
|
|
FROM memories m
|
|
INNER JOIN memories_fts f ON m.uri = f.uri
|
|
WHERE f.memories_fts MATCH ?
|
|
AND m.uri LIKE ?
|
|
ORDER BY f.rank
|
|
LIMIT ?
|
|
"#
|
|
)
|
|
.bind(&sanitized)
|
|
.bind(format!("{}%", scope))
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
} else {
|
|
sqlx::query_as::<_, MemoryRow>(
|
|
r#"
|
|
SELECT m.uri, m.memory_type, m.content, m.keywords, m.importance,
|
|
m.access_count, m.created_at, m.last_accessed, m.overview, m.abstract_summary
|
|
FROM memories m
|
|
INNER JOIN memories_fts f ON m.uri = f.uri
|
|
WHERE f.memories_fts MATCH ?
|
|
ORDER BY f.rank
|
|
LIMIT ?
|
|
"#
|
|
)
|
|
.bind(&sanitized)
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
};
|
|
|
|
match fts_candidates {
|
|
Ok(rows) if !rows.is_empty() => rows,
|
|
Ok(_) | Err(_) => {
|
|
// FTS5 returned no results or failed — check if query contains CJK
|
|
// characters. unicode61 tokenizer doesn't index CJK, so fall back
|
|
// to LIKE-based search for CJK queries.
|
|
let has_cjk = query.chars().any(|c| {
|
|
matches!(c, '\u{4E00}'..='\u{9FFF}' | '\u{3400}'..='\u{4DBF}' | '\u{F900}'..='\u{FAFF}')
|
|
});
|
|
|
|
if !has_cjk {
|
|
tracing::debug!(
|
|
"[SqliteStorage] FTS5 returned no results for query: '{}'",
|
|
query.chars().take(50).collect::<String>()
|
|
);
|
|
return Ok(Vec::new());
|
|
}
|
|
|
|
tracing::debug!(
|
|
"[SqliteStorage] FTS5 miss for CJK query, falling back to LIKE: '{}'",
|
|
query.chars().take(50).collect::<String>()
|
|
);
|
|
|
|
let pattern = format!("%{}%", query);
|
|
if let Some(ref scope) = options.scope {
|
|
sqlx::query_as::<_, MemoryRow>(
|
|
r#"
|
|
SELECT uri, memory_type, content, keywords, importance,
|
|
access_count, created_at, last_accessed, overview, abstract_summary
|
|
FROM memories
|
|
WHERE content LIKE ?
|
|
AND uri LIKE ?
|
|
ORDER BY importance DESC, access_count DESC
|
|
LIMIT ?
|
|
"#
|
|
)
|
|
.bind(&pattern)
|
|
.bind(format!("{}%", scope))
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.unwrap_or_default()
|
|
} else {
|
|
sqlx::query_as::<_, MemoryRow>(
|
|
r#"
|
|
SELECT uri, memory_type, content, keywords, importance,
|
|
access_count, created_at, last_accessed, overview, abstract_summary
|
|
FROM memories
|
|
WHERE content LIKE ?
|
|
ORDER BY importance DESC, access_count DESC
|
|
LIMIT ?
|
|
"#
|
|
)
|
|
.bind(&pattern)
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.unwrap_or_default()
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// Empty query: scope-based scan only (no FTS5 needed)
|
|
self.fetch_by_scope_priv(options.scope.as_deref(), limit).await?
|
|
};
|
|
|
|
// Convert to entries and compute semantic scores
|
|
let use_embedding = {
|
|
let scorer = self.scorer.read().await;
|
|
scorer.is_embedding_available()
|
|
};
|
|
|
|
let mut scored_entries: Vec<(f32, f32, MemoryEntry)> = Vec::new();
|
|
|
|
for row in rows {
|
|
let entry = self.row_to_entry(&row);
|
|
|
|
// Compute semantic score: use embedding when available, fallback to TF-IDF
|
|
let semantic_score = if use_embedding {
|
|
let scorer = self.scorer.read().await;
|
|
let tfidf_score = scorer.score_similarity(query, &entry);
|
|
let entry_embedding = scorer.get_entry_embedding(&entry.uri);
|
|
drop(scorer);
|
|
|
|
match entry_embedding {
|
|
Some(entry_emb) => {
|
|
// Try embedding the query for hybrid scoring
|
|
let embedding_client = {
|
|
let scorer2 = self.scorer.read().await;
|
|
scorer2.get_embedding_client()
|
|
};
|
|
|
|
match embedding_client.embed(query).await {
|
|
Ok(query_emb) => {
|
|
let emb_score = SemanticScorer::cosine_similarity_embedding(&query_emb, &entry_emb);
|
|
// Hybrid: 70% embedding + 30% TF-IDF
|
|
emb_score * 0.7 + tfidf_score * 0.3
|
|
}
|
|
Err(_) => {
|
|
tracing::debug!("[SqliteStorage] Query embedding failed, using TF-IDF only");
|
|
tfidf_score
|
|
}
|
|
}
|
|
}
|
|
None => tfidf_score,
|
|
}
|
|
} else {
|
|
let scorer = self.scorer.read().await;
|
|
scorer.score_similarity(query, &entry)
|
|
};
|
|
|
|
// Apply similarity threshold
|
|
if let Some(min_similarity) = options.min_similarity {
|
|
if semantic_score < min_similarity {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Apply time decay to importance before final scoring
|
|
let time_decayed_importance = crate::types::effective_importance(&entry);
|
|
|
|
scored_entries.push((semantic_score, time_decayed_importance, entry));
|
|
}
|
|
|
|
// Sort by: semantic score → time-decayed importance → access count (all descending)
|
|
scored_entries.sort_by(|a, b| {
|
|
b.0.partial_cmp(&a.0)
|
|
.unwrap_or(std::cmp::Ordering::Equal)
|
|
.then_with(|| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal))
|
|
.then_with(|| b.2.access_count.cmp(&a.2.access_count))
|
|
});
|
|
|
|
// Apply limit
|
|
if let Some(limit) = options.limit {
|
|
scored_entries.truncate(limit);
|
|
}
|
|
|
|
Ok(scored_entries.into_iter().map(|(_, _, entry)| entry).collect())
|
|
}
|
|
|
|
async fn find_by_prefix(&self, prefix: &str) -> Result<Vec<MemoryEntry>> {
|
|
let rows = self.fetch_by_scope_priv(Some(prefix), 100).await?;
|
|
let entries = rows.iter().map(|row| self.row_to_entry(row)).collect();
|
|
Ok(entries)
|
|
}
|
|
|
|
async fn delete(&self, uri: &str) -> Result<()> {
|
|
sqlx::query("DELETE FROM memories WHERE uri = ?")
|
|
.bind(uri)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to delete memory: {}", e)))?;
|
|
|
|
// Remove from FTS index
|
|
let _ = sqlx::query("DELETE FROM memories_fts WHERE uri = ?")
|
|
.bind(uri)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| {
|
|
tracing::warn!("[SqliteStorage] Failed to delete FTS entry: {}", e);
|
|
});
|
|
|
|
// Remove from in-memory scorer
|
|
let mut scorer = self.scorer.write().await;
|
|
scorer.remove_entry(uri);
|
|
|
|
tracing::debug!("[SqliteStorage] Deleted memory: {}", uri);
|
|
Ok(())
|
|
}
|
|
|
|
async fn store_metadata_json(&self, key: &str, json: &str) -> Result<()> {
|
|
sqlx::query(
|
|
r#"
|
|
INSERT OR REPLACE INTO metadata (key, json)
|
|
VALUES (?, ?)
|
|
"#,
|
|
)
|
|
.bind(key)
|
|
.bind(json)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to store metadata: {}", e)))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn get_metadata_json(&self, key: &str) -> Result<Option<String>> {
|
|
let result = sqlx::query_scalar::<_, String>("SELECT json FROM metadata WHERE key = ?")
|
|
.bind(key)
|
|
.fetch_optional(&self.pool)
|
|
.await
|
|
.map_err(|e| ZclawError::StorageError(format!("Failed to get metadata: {}", e)))?;
|
|
|
|
Ok(result)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::types::MemoryType;
|
|
|
|
#[tokio::test]
|
|
async fn test_sqlite_storage_store_and_get() {
|
|
let storage = SqliteStorage::in_memory().await;
|
|
let entry = MemoryEntry::new(
|
|
"test-agent",
|
|
MemoryType::Preference,
|
|
"style",
|
|
"User prefers concise responses".to_string(),
|
|
);
|
|
|
|
storage.store(&entry).await.unwrap();
|
|
let retrieved = storage.get(&entry.uri).await.unwrap();
|
|
|
|
assert!(retrieved.is_some());
|
|
assert_eq!(retrieved.unwrap().content, "User prefers concise responses");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_sqlite_storage_semantic_search() {
|
|
let storage = SqliteStorage::in_memory().await;
|
|
|
|
// Store entries with different content
|
|
let entry1 = MemoryEntry::new(
|
|
"agent-1",
|
|
MemoryType::Knowledge,
|
|
"rust",
|
|
"Rust is a systems programming language focused on safety".to_string(),
|
|
).with_keywords(vec!["rust".to_string(), "programming".to_string(), "safety".to_string()]);
|
|
|
|
let entry2 = MemoryEntry::new(
|
|
"agent-1",
|
|
MemoryType::Knowledge,
|
|
"python",
|
|
"Python is a high-level programming language".to_string(),
|
|
).with_keywords(vec!["python".to_string(), "programming".to_string()]);
|
|
|
|
storage.store(&entry1).await.unwrap();
|
|
storage.store(&entry2).await.unwrap();
|
|
|
|
// Search for "rust safety"
|
|
let results = storage.find(
|
|
"rust safety",
|
|
FindOptions {
|
|
scope: Some("agent://agent-1".to_string()),
|
|
limit: Some(10),
|
|
min_similarity: Some(0.1),
|
|
},
|
|
).await.unwrap();
|
|
|
|
// Should find the Rust entry with higher score
|
|
assert!(!results.is_empty());
|
|
assert!(results[0].content.contains("Rust"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_sqlite_storage_delete() {
|
|
let storage = SqliteStorage::in_memory().await;
|
|
let entry = MemoryEntry::new(
|
|
"test-agent",
|
|
MemoryType::Preference,
|
|
"style",
|
|
"test".to_string(),
|
|
);
|
|
|
|
storage.store(&entry).await.unwrap();
|
|
storage.delete(&entry.uri).await.unwrap();
|
|
|
|
let retrieved = storage.get(&entry.uri).await.unwrap();
|
|
assert!(retrieved.is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_persistence() {
|
|
let path = std::env::temp_dir().join("zclaw_test_memories.db");
|
|
|
|
// Clean up any existing test db
|
|
let _ = std::fs::remove_file(&path);
|
|
|
|
// Create and store
|
|
{
|
|
let storage = SqliteStorage::new(&path).await.unwrap();
|
|
let entry = MemoryEntry::new(
|
|
"persist-test",
|
|
MemoryType::Knowledge,
|
|
"test",
|
|
"This should persist".to_string(),
|
|
);
|
|
storage.store(&entry).await.unwrap();
|
|
}
|
|
|
|
// Reopen and verify
|
|
{
|
|
let storage = SqliteStorage::new(&path).await.unwrap();
|
|
let results = storage.find_by_prefix("agent://persist-test").await.unwrap();
|
|
assert!(!results.is_empty());
|
|
assert_eq!(results[0].content, "This should persist");
|
|
}
|
|
|
|
// Clean up
|
|
let _ = std::fs::remove_file(&path);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_metadata_storage() {
|
|
let storage = SqliteStorage::in_memory().await;
|
|
|
|
let json = r#"{"test": "value"}"#;
|
|
storage.store_metadata_json("test-key", json).await.unwrap();
|
|
|
|
let retrieved = storage.get_metadata_json("test-key").await.unwrap();
|
|
assert_eq!(retrieved, Some(json.to_string()));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_access_count() {
|
|
let storage = SqliteStorage::in_memory().await;
|
|
let entry = MemoryEntry::new(
|
|
"test-agent",
|
|
MemoryType::Knowledge,
|
|
"test",
|
|
"test content".to_string(),
|
|
);
|
|
|
|
storage.store(&entry).await.unwrap();
|
|
|
|
// Access multiple times
|
|
for _ in 0..3 {
|
|
let _ = storage.get(&entry.uri).await.unwrap();
|
|
}
|
|
|
|
let retrieved = storage.get(&entry.uri).await.unwrap().unwrap();
|
|
assert!(retrieved.access_count >= 3);
|
|
}
|
|
}
|