//! Pain point and proposal persistence layer. //! //! Provides SQLite-backed storage for pain points, evidence, proposals, and //! proposal steps. This module replaces the in-memory `Vec` in //! `PainAggregator` and the in-memory `Vec` in `SolutionGenerator` //! with durable storage that survives app restarts. //! //! ## Schema //! //! Four tables with foreign-key relationships: //! //! ```text //! pain_points ← (1:N) pain_evidence //! pain_points ← (1:N) proposals ← (1:N) proposal_steps //! ``` use chrono::{DateTime, TimeZone, Utc}; use sqlx::SqlitePool; use zclaw_types::Result; use super::pain_aggregator::{PainEvidence, PainPoint, PainSeverity, PainStatus}; use super::solution_generator::{Proposal, ProposalStep, ProposalStatus}; // --------------------------------------------------------------------------- // SQL constants // --------------------------------------------------------------------------- const SCHEMA_SQL: &str = r#" CREATE TABLE IF NOT EXISTS pain_points ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, user_id TEXT NOT NULL, summary TEXT NOT NULL, category TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'low', occurrence_count INTEGER NOT NULL DEFAULT 1, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, confidence REAL NOT NULL DEFAULT 0.25, status TEXT NOT NULL DEFAULT 'detected' ); CREATE TABLE IF NOT EXISTS pain_evidence ( id INTEGER PRIMARY KEY AUTOINCREMENT, pain_id TEXT NOT NULL REFERENCES pain_points(id) ON DELETE CASCADE, occurred_at TEXT NOT NULL, user_said TEXT NOT NULL, why_flagged TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS proposals ( id TEXT PRIMARY KEY, pain_point_id TEXT NOT NULL REFERENCES pain_points(id), title TEXT NOT NULL, description TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', confidence_at_creation REAL NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS proposal_steps ( id INTEGER PRIMARY KEY AUTOINCREMENT, proposal_id TEXT NOT NULL REFERENCES proposals(id) ON DELETE CASCADE, step_index INTEGER NOT NULL, action TEXT NOT NULL, detail TEXT NOT NULL, skill_hint TEXT ); CREATE INDEX IF NOT EXISTS idx_pain_agent ON pain_points(agent_id); CREATE INDEX IF NOT EXISTS idx_pain_status ON pain_points(status); CREATE INDEX IF NOT EXISTS idx_evidence_pain ON pain_evidence(pain_id); CREATE INDEX IF NOT EXISTS idx_proposal_pain ON proposals(pain_point_id); CREATE INDEX IF NOT EXISTS idx_steps_proposal ON proposal_steps(proposal_id); "#; // --------------------------------------------------------------------------- // Severity / Status helpers (enum <-> TEXT) // --------------------------------------------------------------------------- impl PainSeverity { fn as_db_str(&self) -> &'static str { match self { PainSeverity::Low => "low", PainSeverity::Medium => "medium", PainSeverity::High => "high", } } fn from_db_str(s: &str) -> Self { match s { "high" => PainSeverity::High, "medium" => PainSeverity::Medium, _ => PainSeverity::Low, } } } impl PainStatus { fn as_db_str(&self) -> &'static str { match self { PainStatus::Detected => "detected", PainStatus::Confirmed => "confirmed", PainStatus::Solving => "solving", PainStatus::Solved => "solved", PainStatus::Dismissed => "dismissed", } } fn from_db_str(s: &str) -> Self { match s { "confirmed" => PainStatus::Confirmed, "solving" => PainStatus::Solving, "solved" => PainStatus::Solved, "dismissed" => PainStatus::Dismissed, _ => PainStatus::Detected, } } } impl ProposalStatus { fn as_db_str(&self) -> &'static str { match self { ProposalStatus::Pending => "pending", ProposalStatus::Accepted => "accepted", ProposalStatus::Rejected => "rejected", ProposalStatus::Completed => "completed", } } fn from_db_str(s: &str) -> Self { match s { "accepted" => ProposalStatus::Accepted, "rejected" => ProposalStatus::Rejected, "completed" => ProposalStatus::Completed, _ => ProposalStatus::Pending, } } } // --------------------------------------------------------------------------- // DateTime <-> TEXT helpers // --------------------------------------------------------------------------- fn dt_to_db(dt: &DateTime) -> String { dt.to_rfc3339() } fn dt_from_db(s: &str) -> DateTime { // RFC 3339 is the preferred format; fall back to subsecond precision DateTime::parse_from_rfc3339(s) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| { Utc.timestamp_opt(0, 0).single().unwrap_or_else(|| Utc::now()) }) } // --------------------------------------------------------------------------- // Sqlx row mapping structs (derived for query_as) // --------------------------------------------------------------------------- #[derive(Debug, sqlx::FromRow)] struct PainPointRow { id: String, agent_id: String, user_id: String, summary: String, category: String, severity: String, occurrence_count: i64, first_seen: String, last_seen: String, confidence: f64, status: String, } #[derive(Debug, sqlx::FromRow)] struct EvidenceRow { occurred_at: String, user_said: String, why_flagged: String, } #[derive(Debug, sqlx::FromRow)] struct ProposalRow { id: String, pain_point_id: String, title: String, description: String, status: String, confidence_at_creation: f64, created_at: String, updated_at: String, } #[derive(Debug, sqlx::FromRow)] struct StepRow { step_index: i64, action: String, detail: String, skill_hint: Option, } // --------------------------------------------------------------------------- // PainStorage // --------------------------------------------------------------------------- /// SQLite-backed persistence for pain points, evidence, proposals, and steps. pub struct PainStorage { pool: SqlitePool, } impl PainStorage { /// Create a new `PainStorage` backed by the given connection pool. pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Execute the DDL to create tables and indexes if they do not exist yet. pub async fn initialize_schema(&self) -> Result<()> { // SQLite must have foreign keys enabled explicitly per connection. sqlx::query("PRAGMA foreign_keys = ON") .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; sqlx::query(SCHEMA_SQL) .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; Ok(()) } // -- Pain points --------------------------------------------------------- /// Persist a pain point together with its evidence records. /// /// Uses `INSERT OR REPLACE` so that calling this for an existing pain point /// will update all scalar fields and re-insert the full evidence vector. pub async fn store_pain_point(&self, pain: &PainPoint) -> Result<()> { sqlx::query( "INSERT OR REPLACE INTO pain_points (id, agent_id, user_id, summary, category, severity, occurrence_count, first_seen, last_seen, confidence, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&pain.id) .bind(&pain.agent_id) .bind(&pain.user_id) .bind(&pain.summary) .bind(&pain.category) .bind(pain.severity.as_db_str()) .bind(pain.occurrence_count as i64) .bind(dt_to_db(&pain.first_seen)) .bind(dt_to_db(&pain.last_seen)) .bind(pain.confidence) .bind(pain.status.as_db_str()) .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; // Replace all evidence rows for this pain point. sqlx::query("DELETE FROM pain_evidence WHERE pain_id = ?") .bind(&pain.id) .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; for ev in &pain.evidence { sqlx::query( "INSERT INTO pain_evidence (pain_id, occurred_at, user_said, why_flagged) VALUES (?, ?, ?, ?)", ) .bind(&pain.id) .bind(dt_to_db(&ev.when)) .bind(&ev.user_said) .bind(&ev.why_flagged) .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; } Ok(()) } /// Load all pain points from the database, including their evidence. pub async fn load_all_pain_points(&self) -> Result> { let rows = sqlx::query_as::<_, PainPointRow>( "SELECT id, agent_id, user_id, summary, category, severity, occurrence_count, first_seen, last_seen, confidence, status FROM pain_points ORDER BY last_seen DESC", ) .fetch_all(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; let mut points = Vec::with_capacity(rows.len()); for row in rows { let evidence = self.load_evidence(&row.id).await?; points.push(PainPoint { id: row.id, agent_id: row.agent_id, user_id: row.user_id, summary: row.summary, category: row.category, severity: PainSeverity::from_db_str(&row.severity), evidence, occurrence_count: row.occurrence_count as u32, first_seen: dt_from_db(&row.first_seen), last_seen: dt_from_db(&row.last_seen), confidence: row.confidence, status: PainStatus::from_db_str(&row.status), }); } Ok(points) } /// Update an existing pain point (delegates to `store_pain_point` which /// uses `INSERT OR REPLACE`). pub async fn update_pain_point(&self, pain: &PainPoint) -> Result<()> { self.store_pain_point(pain).await } // -- Proposals ----------------------------------------------------------- /// Persist a proposal together with its steps and evidence chain. pub async fn store_proposal(&self, proposal: &Proposal) -> Result<()> { sqlx::query( "INSERT OR REPLACE INTO proposals (id, pain_point_id, title, description, status, confidence_at_creation, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&proposal.id) .bind(&proposal.pain_point_id) .bind(&proposal.title) .bind(&proposal.description) .bind(proposal.status.as_db_str()) .bind(proposal.confidence_at_creation) .bind(dt_to_db(&proposal.created_at)) .bind(dt_to_db(&proposal.updated_at)) .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; // Replace steps. sqlx::query("DELETE FROM proposal_steps WHERE proposal_id = ?") .bind(&proposal.id) .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; for step in &proposal.steps { sqlx::query( "INSERT INTO proposal_steps (proposal_id, step_index, action, detail, skill_hint) VALUES (?, ?, ?, ?, ?)", ) .bind(&proposal.id) .bind(step.index as i64) .bind(&step.action) .bind(&step.detail) .bind(&step.skill_hint) .execute(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; } Ok(()) } /// Load all proposals from the database, including their steps and evidence. /// /// The evidence chain is reconstructed by looking up the pain point's /// evidence records via the `pain_point_id` foreign key. pub async fn load_all_proposals(&self) -> Result> { let rows = sqlx::query_as::<_, ProposalRow>( "SELECT id, pain_point_id, title, description, status, confidence_at_creation, created_at, updated_at FROM proposals ORDER BY created_at DESC", ) .fetch_all(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; let mut proposals = Vec::with_capacity(rows.len()); for row in rows { let steps = self.load_steps(&row.id).await?; let evidence_chain = self.load_evidence(&row.pain_point_id).await?; proposals.push(Proposal { id: row.id, pain_point_id: row.pain_point_id, title: row.title, description: row.description, steps, status: ProposalStatus::from_db_str(&row.status), evidence_chain, confidence_at_creation: row.confidence_at_creation, created_at: dt_from_db(&row.created_at), updated_at: dt_from_db(&row.updated_at), }); } Ok(proposals) } /// Update a proposal (delegates to `store_proposal`). pub async fn update_proposal(&self, proposal: &Proposal) -> Result<()> { self.store_proposal(proposal).await } // -- Private helpers ----------------------------------------------------- async fn load_evidence(&self, pain_id: &str) -> Result> { let rows = sqlx::query_as::<_, EvidenceRow>( "SELECT occurred_at, user_said, why_flagged FROM pain_evidence WHERE pain_id = ? ORDER BY occurred_at ASC", ) .bind(pain_id) .fetch_all(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; Ok(rows .into_iter() .map(|r| PainEvidence { when: dt_from_db(&r.occurred_at), user_said: r.user_said, why_flagged: r.why_flagged, }) .collect()) } async fn load_steps(&self, proposal_id: &str) -> Result> { let rows = sqlx::query_as::<_, StepRow>( "SELECT step_index, action, detail, skill_hint FROM proposal_steps WHERE proposal_id = ? ORDER BY step_index ASC", ) .bind(proposal_id) .fetch_all(&self.pool) .await .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; Ok(rows .into_iter() .map(|r| ProposalStep { index: r.step_index as u32, action: r.action, detail: r.detail, skill_hint: r.skill_hint, }) .collect()) } } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; /// Helper: create an in-memory PainStorage with schema initialized. async fn test_storage() -> PainStorage { let pool = SqlitePool::connect("sqlite::memory:") .await .expect("in-memory pool"); let storage = PainStorage::new(pool); storage .initialize_schema() .await .expect("schema init"); storage } /// Helper: build a sample `PainPoint` with one evidence entry. fn sample_pain(id_suffix: &str) -> PainPoint { PainPoint::new( &format!("agent-{id_suffix}"), &format!("user-{id_suffix}"), &format!("summary-{id_suffix}"), "logistics", PainSeverity::Medium, "user said something", "flagged because recurring", ) } /// Helper: build a sample `Proposal` referencing a pain point. fn sample_proposal(pain: &PainPoint) -> Proposal { Proposal::from_pain_point(pain) } // -- Pain point round-trip ----------------------------------------------- #[tokio::test] async fn test_store_and_load_pain_point() { let storage = test_storage().await; let pain = sample_pain("1"); storage.store_pain_point(&pain).await.unwrap(); let loaded = storage.load_all_pain_points().await.unwrap(); assert_eq!(loaded.len(), 1); assert_eq!(loaded[0].id, pain.id); assert_eq!(loaded[0].agent_id, "agent-1"); assert_eq!(loaded[0].summary, "summary-1"); assert_eq!(loaded[0].category, "logistics"); assert_eq!(loaded[0].severity, PainSeverity::Medium); assert_eq!(loaded[0].occurrence_count, 1); assert_eq!(loaded[0].status, PainStatus::Detected); assert_eq!(loaded[0].evidence.len(), 1); assert_eq!(loaded[0].evidence[0].user_said, "user said something"); } // -- Update pain point --------------------------------------------------- #[tokio::test] async fn test_update_pain_point() { let storage = test_storage().await; let mut pain = sample_pain("2"); storage.store_pain_point(&pain).await.unwrap(); // Mutate and update. pain.occurrence_count = 3; pain.confidence = 0.85; pain.status = PainStatus::Confirmed; pain.severity = PainSeverity::High; pain.evidence.push(PainEvidence { when: Utc::now(), user_said: "second evidence".into(), why_flagged: "escalation".into(), }); storage.update_pain_point(&pain).await.unwrap(); let loaded = storage.load_all_pain_points().await.unwrap(); assert_eq!(loaded.len(), 1); assert_eq!(loaded[0].occurrence_count, 3); assert!((loaded[0].confidence - 0.85).abs() < f64::EPSILON); assert_eq!(loaded[0].status, PainStatus::Confirmed); assert_eq!(loaded[0].severity, PainSeverity::High); assert_eq!(loaded[0].evidence.len(), 2); } // -- Multiple pain points ------------------------------------------------ #[tokio::test] async fn test_store_multiple_pain_points() { let storage = test_storage().await; let p1 = sample_pain("a"); let p2 = sample_pain("b"); let p3 = sample_pain("c"); storage.store_pain_point(&p1).await.unwrap(); storage.store_pain_point(&p2).await.unwrap(); storage.store_pain_point(&p3).await.unwrap(); let loaded = storage.load_all_pain_points().await.unwrap(); assert_eq!(loaded.len(), 3); let ids: Vec<&str> = loaded.iter().map(|p| p.id.as_str()).collect(); assert!(ids.contains(&p1.id.as_str())); assert!(ids.contains(&p2.id.as_str())); assert!(ids.contains(&p3.id.as_str())); } // -- Proposal round-trip ------------------------------------------------- #[tokio::test] async fn test_store_and_load_proposal() { let storage = test_storage().await; let pain = sample_pain("3"); storage.store_pain_point(&pain).await.unwrap(); let proposal = sample_proposal(&pain); storage.store_proposal(&proposal).await.unwrap(); let loaded = storage.load_all_proposals().await.unwrap(); assert_eq!(loaded.len(), 1); assert_eq!(loaded[0].id, proposal.id); assert_eq!(loaded[0].pain_point_id, pain.id); assert_eq!(loaded[0].status, ProposalStatus::Pending); assert!(!loaded[0].steps.is_empty()); // Evidence chain is loaded from the pain point's evidence. assert!(!loaded[0].evidence_chain.is_empty()); } // -- Update proposal status ---------------------------------------------- #[tokio::test] async fn test_update_proposal() { let storage = test_storage().await; let pain = sample_pain("4"); storage.store_pain_point(&pain).await.unwrap(); let mut proposal = sample_proposal(&pain); storage.store_proposal(&proposal).await.unwrap(); // Accept the proposal. proposal.status = ProposalStatus::Accepted; proposal.updated_at = Utc::now(); storage.update_proposal(&proposal).await.unwrap(); let loaded = storage.load_all_proposals().await.unwrap(); assert_eq!(loaded.len(), 1); assert_eq!(loaded[0].status, ProposalStatus::Accepted); } // -- Severity round-trip ------------------------------------------------- #[tokio::test] async fn test_all_severity_round_trip() { let storage = test_storage().await; for (i, sev) in [PainSeverity::Low, PainSeverity::Medium, PainSeverity::High] .into_iter() .enumerate() { let mut pain = PainPoint::new( &format!("agent-{i}"), &format!("user-{i}"), "test", "general", sev, "evidence", "reason", ); // Force unique id by using a known suffix. pain.id = format!("pain-sev-{i}"); storage.store_pain_point(&pain).await.unwrap(); } let loaded = storage.load_all_pain_points().await.unwrap(); assert_eq!(loaded.len(), 3); let severities: Vec = loaded.iter().map(|p| p.severity).collect(); assert!(severities.contains(&PainSeverity::Low)); assert!(severities.contains(&PainSeverity::Medium)); assert!(severities.contains(&PainSeverity::High)); } // -- Status round-trip --------------------------------------------------- #[tokio::test] async fn test_all_pain_status_round_trip() { let storage = test_storage().await; let statuses = [ PainStatus::Detected, PainStatus::Confirmed, PainStatus::Solving, PainStatus::Solved, PainStatus::Dismissed, ]; for (i, status) in statuses.into_iter().enumerate() { let mut pain = sample_pain(&format!("st-{i}")); pain.id = format!("pain-st-{i}"); pain.status = status; storage.store_pain_point(&pain).await.unwrap(); } let loaded = storage.load_all_pain_points().await.unwrap(); assert_eq!(loaded.len(), 5); for expected in [ PainStatus::Detected, PainStatus::Confirmed, PainStatus::Solving, PainStatus::Solved, PainStatus::Dismissed, ] { assert!( loaded.iter().any(|p| p.status == expected), "expected status {:?} not found", expected ); } } // -- Proposal status round-trip ------------------------------------------ #[tokio::test] async fn test_all_proposal_status_round_trip() { let storage = test_storage().await; let statuses = [ ProposalStatus::Pending, ProposalStatus::Accepted, ProposalStatus::Rejected, ProposalStatus::Completed, ]; for (i, status) in statuses.into_iter().enumerate() { let mut pain = sample_pain(&format!("ps-{i}")); pain.id = format!("pain-ps-{i}"); storage.store_pain_point(&pain).await.unwrap(); let mut proposal = sample_proposal(&pain); proposal.id = format!("proposal-ps-{i}"); proposal.status = status; storage.store_proposal(&proposal).await.unwrap(); } let loaded = storage.load_all_proposals().await.unwrap(); assert_eq!(loaded.len(), 4); for expected in [ ProposalStatus::Pending, ProposalStatus::Accepted, ProposalStatus::Rejected, ProposalStatus::Completed, ] { assert!( loaded.iter().any(|p| p.status == expected), "expected proposal status {:?} not found", expected ); } } // -- Empty database returns empty vec ------------------------------------ #[tokio::test] async fn test_load_empty_returns_empty() { let storage = test_storage().await; let pains = storage.load_all_pain_points().await.unwrap(); assert!(pains.is_empty()); let proposals = storage.load_all_proposals().await.unwrap(); assert!(proposals.is_empty()); } // -- Proposal steps are ordered ------------------------------------------ #[tokio::test] async fn test_proposal_steps_ordering() { let storage = test_storage().await; let pain = sample_pain("order"); storage.store_pain_point(&pain).await.unwrap(); let proposal = sample_proposal(&pain); // The proposal from `from_pain_point` should have at least 1 step for "logistics". assert!(!proposal.steps.is_empty()); storage.store_proposal(&proposal).await.unwrap(); let loaded = storage.load_all_proposals().await.unwrap(); assert_eq!(loaded.len(), 1); // Steps should be in index order. let step_indices: Vec = loaded[0].steps.iter().map(|s| s.index).collect(); let mut sorted = step_indices.clone(); sorted.sort(); assert_eq!(step_indices, sorted, "steps should be ordered by index"); } // -- Schema is idempotent ------------------------------------------------ #[tokio::test] async fn test_schema_idempotent() { let storage = test_storage().await; // Running initialize_schema twice should not fail. storage.initialize_schema().await.unwrap(); storage.initialize_schema().await.unwrap(); let pain = sample_pain("idem"); storage.store_pain_point(&pain).await.unwrap(); let loaded = storage.load_all_pain_points().await.unwrap(); assert_eq!(loaded.len(), 1); } }