//! Persistent Memory Storage - SQLite-backed memory for ZCLAW //! //! This module provides persistent storage for agent memories, //! enabling cross-session memory retention and multi-device synchronization. //! //! Phase 1 of Intelligence Layer Migration: //! - Replaces localStorage with SQLite //! - Provides memory persistence API //! - Enables data migration from frontend use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; use tauri::Manager; use sqlx::{SqliteConnection, Connection, Row, sqlite::SqliteRow}; use chrono::{DateTime, Utc}; /// Memory entry stored in SQLite #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersistentMemory { pub id: String, pub agent_id: String, pub memory_type: String, pub content: String, pub importance: i32, pub source: String, pub tags: String, // JSON array stored as string pub conversation_id: Option, pub created_at: String, pub last_accessed_at: String, pub access_count: i32, pub embedding: Option>, // Vector embedding for semantic search } // Manual implementation of FromRow since sqlx::FromRow derive has issues with Option> impl<'r> sqlx::FromRow<'r, SqliteRow> for PersistentMemory { fn from_row(row: &'r SqliteRow) -> Result { Ok(PersistentMemory { id: row.try_get("id")?, agent_id: row.try_get("agent_id")?, memory_type: row.try_get("memory_type")?, content: row.try_get("content")?, importance: row.try_get("importance")?, source: row.try_get("source")?, tags: row.try_get("tags")?, conversation_id: row.try_get("conversation_id")?, created_at: row.try_get("created_at")?, last_accessed_at: row.try_get("last_accessed_at")?, access_count: row.try_get("access_count")?, embedding: row.try_get("embedding")?, }) } } /// Memory search options #[derive(Debug, Clone)] pub struct MemorySearchQuery { pub agent_id: Option, pub memory_type: Option, pub tags: Option>, pub query: Option, pub min_importance: Option, pub limit: Option, pub offset: Option, } /// Memory statistics #[derive(Debug, Clone, Serialize)] pub struct MemoryStats { pub total_entries: i64, pub by_type: std::collections::HashMap, pub by_agent: std::collections::HashMap, pub oldest_entry: Option, pub newest_entry: Option, pub storage_size_bytes: i64, } /// Persistent memory store backed by SQLite pub struct PersistentMemoryStore { path: PathBuf, conn: Arc>, } impl PersistentMemoryStore { /// Create a new persistent memory store pub async fn new(app_handle: &tauri::AppHandle) -> Result { let app_dir = app_handle .path() .app_data_dir() .map_err(|e| format!("Failed to get app data dir: {}", e))?; let memory_dir = app_dir.join("memory"); std::fs::create_dir_all(&memory_dir) .map_err(|e| format!("Failed to create memory dir: {}", e))?; let db_path = memory_dir.join("memories.db"); Self::open(db_path).await } /// Open an existing memory store pub async fn open(path: PathBuf) -> Result { let db_url = format!("sqlite:{}?mode=rwc", path.display()); let conn = SqliteConnection::connect(&db_url) .await .map_err(|e| format!("Failed to open database: {}", e))?; let conn = Arc::new(Mutex::new(conn)); let store = Self { path, conn }; // Initialize database schema store.init_schema().await?; Ok(store) } /// Initialize the database schema async fn init_schema(&self) -> Result<(), String> { let mut conn = self.conn.lock().await; sqlx::query( r#" CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, memory_type TEXT NOT NULL, content TEXT NOT NULL, importance INTEGER DEFAULT 5, source TEXT DEFAULT 'auto', tags TEXT DEFAULT '[]', conversation_id TEXT, created_at TEXT NOT NULL, last_accessed_at TEXT NOT NULL, access_count INTEGER DEFAULT 0, embedding BLOB ); CREATE INDEX IF NOT EXISTS idx_agent_id ON memories(agent_id); CREATE INDEX IF NOT EXISTS idx_memory_type ON memories(memory_type); CREATE INDEX IF NOT EXISTS idx_created_at ON memories(created_at); CREATE INDEX IF NOT EXISTS idx_importance ON memories(importance); "#, ) .execute(&mut *conn) .await .map_err(|e| format!("Failed to create schema: {}", e))?; Ok(()) } /// Store a new memory pub async fn store(&self, memory: &PersistentMemory) -> Result<(), String> { let mut conn = self.conn.lock().await; sqlx::query( r#" INSERT INTO memories ( id, agent_id, memory_type, content, importance, source, tags, conversation_id, created_at, last_accessed_at, access_count, embedding ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "#, ) .bind(&memory.id) .bind(&memory.agent_id) .bind(&memory.memory_type) .bind(&memory.content) .bind(memory.importance) .bind(&memory.source) .bind(&memory.tags) .bind(&memory.conversation_id) .bind(&memory.created_at) .bind(&memory.last_accessed_at) .bind(memory.access_count) .bind(&memory.embedding) .execute(&mut *conn) .await .map_err(|e| format!("Failed to store memory: {}", e))?; Ok(()) } /// Get a memory by ID pub async fn get(&self, id: &str) -> Result, String> { let mut conn = self.conn.lock().await; let result: Option = sqlx::query_as( "SELECT * FROM memories WHERE id = ?", ) .bind(id) .fetch_optional(&mut *conn) .await .map_err(|e| format!("Failed to get memory: {}", e))?; // Update access stats if found if result.is_some() { let now = Utc::now().to_rfc3339(); sqlx::query( "UPDATE memories SET last_accessed_at = ?, access_count = access_count + 1 WHERE id = ?", ) .bind(&now) .bind(id) .execute(&mut *conn) .await .ok(); } Ok(result) } /// Search memories with simple query pub async fn search(&self, query: MemorySearchQuery) -> Result, String> { let mut conn = self.conn.lock().await; let mut sql = String::from("SELECT * FROM memories WHERE 1=1"); let mut params: Vec = Vec::new(); if let Some(agent_id) = &query.agent_id { sql.push_str(" AND agent_id = ?"); params.push(agent_id.clone()); } if let Some(memory_type) = &query.memory_type { sql.push_str(" AND memory_type = ?"); params.push(memory_type.clone()); } if let Some(min_importance) = query.min_importance { sql.push_str(" AND importance >= ?"); params.push(min_importance.to_string()); } if let Some(query_text) = &query.query { sql.push_str(" AND content LIKE ?"); params.push(format!("%{}%", query_text)); } sql.push_str(" ORDER BY created_at DESC"); if let Some(limit) = query.limit { sql.push_str(&format!(" LIMIT {}", limit)); } if let Some(offset) = query.offset { sql.push_str(&format!(" OFFSET {}", offset)); } // Build and execute query dynamically let mut query_builder = sqlx::query_as::<_, PersistentMemory>(&sql); for param in params { query_builder = query_builder.bind(param); } let results = query_builder .fetch_all(&mut *conn) .await .map_err(|e| format!("Failed to search memories: {}", e))?; Ok(results) } /// Delete a memory by ID pub async fn delete(&self, id: &str) -> Result { let mut conn = self.conn.lock().await; let result = sqlx::query("DELETE FROM memories WHERE id = ?") .bind(id) .execute(&mut *conn) .await .map_err(|e| format!("Failed to delete memory: {}", e))?; Ok(result.rows_affected() > 0) } /// Delete all memories for an agent pub async fn delete_by_agent(&self, agent_id: &str) -> Result { let mut conn = self.conn.lock().await; let result = sqlx::query("DELETE FROM memories WHERE agent_id = ?") .bind(agent_id) .execute(&mut *conn) .await .map_err(|e| format!("Failed to delete agent memories: {}", e))?; Ok(result.rows_affected() as usize) } /// Get memory statistics pub async fn stats(&self) -> Result { let mut conn = self.conn.lock().await; let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM memories") .fetch_one(&mut *conn) .await .unwrap_or(0); let by_type: std::collections::HashMap = sqlx::query_as( "SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type", ) .fetch_all(&mut *conn) .await .unwrap_or_default() .into_iter() .map(|row: (String, i64)| row) .collect(); let by_agent: std::collections::HashMap = sqlx::query_as( "SELECT agent_id, COUNT(*) as count FROM memories GROUP BY agent_id", ) .fetch_all(&mut *conn) .await .unwrap_or_default() .into_iter() .map(|row: (String, i64)| row) .collect(); let oldest: Option = sqlx::query_scalar( "SELECT MIN(created_at) FROM memories", ) .fetch_optional(&mut *conn) .await .unwrap_or_default(); let newest: Option = sqlx::query_scalar( "SELECT MAX(created_at) FROM memories", ) .fetch_optional(&mut *conn) .await .unwrap_or_default(); let storage_size: i64 = sqlx::query_scalar( "SELECT SUM(LENGTH(content) + LENGTH(tags) + COALESCE(LENGTH(embedding), 0)) FROM memories", ) .fetch_optional(&mut *conn) .await .unwrap_or(Some(0)) .unwrap_or(0); Ok(MemoryStats { total_entries: total, by_type, by_agent, oldest_entry: oldest, newest_entry: newest, storage_size_bytes: storage_size, }) } /// Export memories for backup pub async fn export_all(&self) -> Result, String> { let mut conn = self.conn.lock().await; let memories = sqlx::query_as::<_, PersistentMemory>( "SELECT * FROM memories ORDER BY created_at ASC", ) .fetch_all(&mut *conn) .await .map_err(|e| format!("Failed to export memories: {}", e))?; Ok(memories) } /// Import memories from backup pub async fn import_batch(&self, memories: &[PersistentMemory]) -> Result { let mut imported = 0; for memory in memories { self.store(memory).await?; imported += 1; } Ok(imported) } /// Get the database path pub fn path(&self) -> &PathBuf { &self.path } } /// Generate a unique memory ID pub fn generate_memory_id() -> String { let uuid_str = Uuid::new_v4().to_string().replace("-", ""); let short_uuid = &uuid_str[..8]; format!("mem_{}_{}", Utc::now().timestamp(), short_uuid) } #[cfg(test)] mod tests { use super::*; #[test] fn test_generate_memory_id() { let memory_id = generate_memory_id(); assert!(memory_id.starts_with("mem_")); } }