Files
zclaw_openfang/desktop/src-tauri/src/intelligence/pain_aggregator.rs
iven a4c89ec6f1 feat(intelligence): persist pain points and proposals to SQLite
PainAggregator and SolutionGenerator were in-memory only, losing all
data on restart. Add PainStorage module with SQLite backend (4 tables),
dual-write strategy (hot cache + durable), and startup cache warming.

- New: pain_storage.rs — SQLite CRUD for pain_points, pain_evidence,
  proposals, proposal_steps with schema initialization
- Modified: pain_aggregator.rs — global PAIN_STORAGE singleton,
  init_pain_storage() for startup, dual-write in merge_or_create/update
- Modified: solution_generator.rs — same dual-write pattern via
  global PAIN_STORAGE
- 20 tests passing (10 storage + 10 aggregator)
2026-04-09 09:15:15 +08:00

640 lines
22 KiB
Rust

//! Pain Point Aggregator — detect, merge, and score recurring user difficulties.
//!
//! Collects pain signals from conversations, merges similar ones with increasing
//! confidence, and surfaces high-confidence pain points for solution generation.
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::debug;
use uuid::Uuid;
use zclaw_types::Result;
use super::pain_storage::PainStorage;
// ---------------------------------------------------------------------------
// Data structures
// ---------------------------------------------------------------------------
/// A recurring difficulty or systemic problem the user encounters.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PainPoint {
pub id: String,
pub agent_id: String,
pub user_id: String,
pub summary: String,
pub category: String,
pub severity: PainSeverity,
pub evidence: Vec<PainEvidence>,
pub occurrence_count: u32,
pub first_seen: DateTime<Utc>,
pub last_seen: DateTime<Utc>,
pub confidence: f64,
pub status: PainStatus,
}
/// A single piece of evidence supporting a pain point.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PainEvidence {
pub when: DateTime<Utc>,
pub user_said: String,
pub why_flagged: String,
}
/// How severe the pain point is.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum PainSeverity {
#[default]
Low,
Medium,
High,
}
/// Lifecycle status of a pain point.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PainStatus {
Detected,
Confirmed,
Solving,
Solved,
Dismissed,
}
impl PainPoint {
/// Create a new pain point with a single evidence entry.
pub fn new(
agent_id: &str,
user_id: &str,
summary: &str,
category: &str,
severity: PainSeverity,
user_said: &str,
why_flagged: &str,
) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4().to_string(),
agent_id: agent_id.to_string(),
user_id: user_id.to_string(),
summary: summary.to_string(),
category: category.to_string(),
severity,
evidence: vec![PainEvidence {
when: now,
user_said: user_said.to_string(),
why_flagged: why_flagged.to_string(),
}],
occurrence_count: 1,
first_seen: now,
last_seen: now,
confidence: 0.25,
status: PainStatus::Detected,
}
}
}
// ---------------------------------------------------------------------------
// LLM analysis result
// ---------------------------------------------------------------------------
/// Result of LLM-based pain point analysis on a conversation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PainAnalysisResult {
pub found: bool,
#[serde(default)]
pub summary: String,
#[serde(default)]
pub category: String,
#[serde(default)]
pub severity: PainSeverity,
#[serde(default)]
pub evidence: String,
}
// ---------------------------------------------------------------------------
// PainAggregator — cross-session merge + confidence scoring
// ---------------------------------------------------------------------------
/// Aggregates pain points across conversations, merging similar ones
/// and escalating confidence as evidence accumulates.
///
/// When the global `PAIN_STORAGE` is initialized (via `init_pain_storage`),
/// writes are dual: memory Vec (hot cache) + SQLite (durable).
pub struct PainAggregator {
pain_points: Arc<RwLock<Vec<PainPoint>>>,
}
impl PainAggregator {
pub fn new() -> Self {
Self {
pain_points: Arc::new(RwLock::new(Vec::new())),
}
}
/// Get the global pain storage, if initialized.
fn get_storage() -> Option<Arc<PainStorage>> {
PAIN_STORAGE.get().cloned()
}
/// Load all persisted pain points from storage into the in-memory cache.
/// Call this once during app startup after `init_pain_storage()`.
pub async fn load_from_storage(&self) -> Result<()> {
if let Some(ref storage) = Self::get_storage() {
let persisted = storage.load_all_pain_points().await?;
let mut points = self.pain_points.write().await;
*points = persisted;
debug!("[PainAggregator] Loaded {} pain points from storage", points.len());
}
Ok(())
}
/// Merge a new pain point with an existing similar one, or create a new entry.
/// Persists to SQLite if storage is configured.
pub async fn merge_or_create(&self, new_pain: PainPoint) -> Result<PainPoint> {
let mut points = self.pain_points.write().await;
let similar_idx = points.iter().position(|p| {
p.agent_id == new_pain.agent_id
&& p.category == new_pain.category
&& Self::summaries_similar(&p.summary, &new_pain.summary)
});
let result = if let Some(idx) = similar_idx {
let existing = &mut points[idx];
existing.evidence.extend(new_pain.evidence);
existing.occurrence_count += 1;
existing.last_seen = Utc::now();
existing.confidence = Self::calculate_confidence(existing);
if new_pain.severity as u8 > existing.severity as u8 {
existing.severity = new_pain.severity;
}
if existing.occurrence_count >= 2 && existing.status == PainStatus::Detected {
existing.status = PainStatus::Confirmed;
}
existing.clone()
} else {
let result = new_pain.clone();
points.push(new_pain);
result
};
// Dual-write: persist to SQLite
if let Some(storage) = Self::get_storage() {
if let Err(e) = storage.store_pain_point(&result).await {
debug!("[PainAggregator] Failed to persist pain point: {}", e);
}
}
Ok(result)
}
/// Get all high-confidence pain points for an agent.
pub async fn get_high_confidence_pains(
&self,
agent_id: &str,
min_confidence: f64,
) -> Vec<PainPoint> {
let points = self.pain_points.read().await;
points
.iter()
.filter(|p| {
p.agent_id == agent_id
&& p.confidence >= min_confidence
&& p.status != PainStatus::Solved
&& p.status != PainStatus::Dismissed
})
.cloned()
.collect()
}
/// List all pain points for an agent.
pub async fn list_pain_points(&self, agent_id: &str) -> Vec<PainPoint> {
let points = self.pain_points.read().await;
points
.iter()
.filter(|p| p.agent_id == agent_id)
.cloned()
.collect()
}
/// List all pain points across all agents (for internal use).
pub async fn list_all_pain_points(&self) -> Vec<PainPoint> {
let points = self.pain_points.read().await;
points.clone()
}
/// Update the status of a pain point. Persists to SQLite if configured.
pub async fn update_status(&self, pain_id: &str, status: PainStatus) -> Result<()> {
let mut points = self.pain_points.write().await;
if let Some(p) = points.iter_mut().find(|p| p.id == pain_id) {
p.status = status;
// Dual-write
if let Some(storage) = Self::get_storage() {
if let Err(e) = storage.store_pain_point(p).await {
debug!("[PainAggregator] Failed to persist status update: {}", e);
}
}
}
Ok(())
}
// -- internal helpers ----------------------------------------------------
fn summaries_similar(a: &str, b: &str) -> bool {
let a_lower = a.to_lowercase();
let b_lower = b.to_lowercase();
if a_lower.contains(&b_lower) || b_lower.contains(&a_lower) {
return true;
}
let a_ngrams = Self::char_ngrams(&a_lower, 2);
let b_ngrams = Self::char_ngrams(&b_lower, 2);
if a_ngrams.is_empty() || b_ngrams.is_empty() {
return false;
}
let shared = a_ngrams.iter().filter(|ng| b_ngrams.contains(ng)).count();
let overlap_ratio = shared as f64 / a_ngrams.len().min(b_ngrams.len()) as f64;
overlap_ratio >= 0.4
}
fn char_ngrams(text: &str, n: usize) -> Vec<String> {
let chars: Vec<char> = text.chars().collect();
if chars.len() < n {
return Vec::new();
}
chars.windows(n).map(|w| w.iter().collect()).collect()
}
fn calculate_confidence(pain: &PainPoint) -> f64 {
let base = (pain.occurrence_count as f64 * 0.25).min(0.75);
let days_since_last = (Utc::now() - pain.last_seen).num_days().max(0) as f64;
let decay = 1.0 - (days_since_last / 90.0);
let severity_boost = match pain.severity {
PainSeverity::High => 0.1,
PainSeverity::Medium => 0.05,
PainSeverity::Low => 0.0,
};
(base * decay.max(0.0) + severity_boost).min(1.0)
}
}
impl Default for PainAggregator {
fn default() -> Self {
Self::new()
}
}
// ---------------------------------------------------------------------------
// Rule-based pain signal detection
// ---------------------------------------------------------------------------
const FRUSTRATION_SIGNALS: &[&str] = &[
"烦死了", "又来了", "每次都", "受不了", "还是不行",
"又出错", "第三次", "第四次", "第五次", "怎么又",
"老是", "一直", "还是没解决", "退了", "被退",
"又要改", "搞不定", "没办法",
];
/// Analyze messages for pain signals using rule-based detection.
pub fn analyze_for_pain_signals(messages: &[zclaw_types::Message]) -> Option<PainAnalysisResult> {
let mut user_frustrations: Vec<String> = Vec::new();
for msg in messages {
if let zclaw_types::Message::User { content } = msg {
for signal in FRUSTRATION_SIGNALS {
if content.contains(signal) {
user_frustrations.push(content.clone());
break;
}
}
}
}
if user_frustrations.is_empty() {
return None;
}
let combined = user_frustrations.join(" ");
let severity = if combined.contains("烦死了")
|| combined.contains("受不了")
|| user_frustrations.len() >= 3
{
PainSeverity::High
} else if user_frustrations.len() >= 2 {
PainSeverity::Medium
} else {
PainSeverity::Low
};
let summary = extract_summary(&user_frustrations[0]);
let category = classify_category(&combined);
Some(PainAnalysisResult {
found: true,
summary,
category,
severity,
evidence: user_frustrations.join("; "),
})
}
fn extract_summary(text: &str) -> String {
let trimmed = text.trim();
if trimmed.chars().count() <= 50 {
trimmed.to_string()
} else {
let end = trimmed.char_indices().take_while(|(i, _)| *i < 50).last();
match end {
Some((i, c)) => trimmed[..i + c.len_utf8()].to_string() + "",
None => trimmed.chars().take(50).collect::<String>() + "",
}
}
}
fn classify_category(text: &str) -> String {
let lower = text.to_lowercase();
if ["", "", "出口", "退"].iter().any(|k| lower.contains(k)) {
"logistics".to_string()
} else if ["合规", "法规", "标准"].iter().any(|k| lower.contains(k)) {
"compliance".to_string()
} else if ["客户", "投诉", "反馈"].iter().any(|k| lower.contains(k)) {
"customer".to_string()
} else if ["报价", "价格", "成本"].iter().any(|k| lower.contains(k)) {
"pricing".to_string()
} else if ["系统", "软件", "电脑"].iter().any(|k| lower.contains(k)) {
"technology".to_string()
} else {
"general".to_string()
}
}
// ---------------------------------------------------------------------------
// Tauri commands
// ---------------------------------------------------------------------------
use std::sync::OnceLock;
use super::solution_generator::{Proposal, ProposalStatus, SolutionGenerator};
static PAIN_AGGREGATOR: OnceLock<Arc<PainAggregator>> = OnceLock::new();
static SOLUTION_GENERATOR: OnceLock<Arc<SolutionGenerator>> = OnceLock::new();
pub(crate) static PAIN_STORAGE: OnceLock<Arc<PainStorage>> = OnceLock::new();
fn pain_store() -> Arc<PainAggregator> {
PAIN_AGGREGATOR.get_or_init(|| Arc::new(PainAggregator::new())).clone()
}
fn solution_store() -> Arc<SolutionGenerator> {
SOLUTION_GENERATOR.get_or_init(|| Arc::new(SolutionGenerator::new())).clone()
}
/// Initialize pain point persistence with a SQLite pool.
///
/// Creates the schema, sets the global storage, and loads any previously
/// persisted data into the in-memory caches.
///
/// Should be called once during app startup, before any pain-related operations.
pub async fn init_pain_storage(pool: sqlx::SqlitePool) -> Result<()> {
let storage = Arc::new(PainStorage::new(pool));
storage.initialize_schema().await?;
// Set global storage (must succeed on first call)
PAIN_STORAGE.set(storage.clone()).map_err(|_| zclaw_types::ZclawError::StorageError("PainStorage already initialized".into()))?;
// Warm the in-memory caches from SQLite
let aggregator = pain_store();
aggregator.load_from_storage().await?;
let generator = solution_store();
generator.load_from_storage().await?;
debug!("[init_pain_storage] Pain storage initialized successfully");
Ok(())
}
/// List all pain points for an agent.
// @reserved: no frontend UI yet
#[tauri::command]
pub async fn butler_list_pain_points(agent_id: String) -> std::result::Result<Vec<PainPoint>, String> {
Ok(pain_store().list_pain_points(&agent_id).await)
}
/// Record a new pain point from conversation analysis.
// @reserved: no frontend UI yet
#[tauri::command]
pub async fn butler_record_pain_point(
agent_id: String,
user_id: String,
summary: String,
category: String,
severity: String,
user_said: String,
why_flagged: String,
) -> std::result::Result<PainPoint, String> {
let sev = match severity.as_str() {
"high" => PainSeverity::High,
"medium" => PainSeverity::Medium,
_ => PainSeverity::Low,
};
let pain = PainPoint::new(&agent_id, &user_id, &summary, &category, sev, &user_said, &why_flagged);
pain_store().merge_or_create(pain).await.map_err(|e| e.to_string())
}
/// List all proposals for an agent.
// @reserved: no frontend UI yet
#[tauri::command]
pub async fn butler_list_proposals(agent_id: String) -> std::result::Result<Vec<Proposal>, String> {
let store = pain_store();
let gen = solution_store();
let pains = store.list_pain_points(&agent_id).await;
let mut all_proposals = Vec::new();
for pain in &pains {
let proposals = gen.list_proposals(&[pain.id.clone()]).await;
all_proposals.extend(proposals);
}
Ok(all_proposals)
}
/// Generate a solution for a high-confidence pain point.
// @reserved: no frontend UI yet
#[tauri::command]
pub async fn butler_generate_solution(pain_id: String) -> std::result::Result<Proposal, String> {
let store = pain_store();
let gen = solution_store();
let all_pains = store.list_all_pain_points().await;
let pain = all_pains
.into_iter()
.find(|p| p.id == pain_id)
.ok_or_else(|| format!("Pain point {} not found", pain_id))?;
if pain.confidence < 0.7 {
return Err("Pain point confidence too low to generate solution".into());
}
let proposal = gen.generate_solution(&pain).await;
store
.update_status(&pain_id, PainStatus::Solving)
.await
.map_err(|e| e.to_string())?;
Ok(proposal)
}
/// Update the status of a proposal.
// @reserved: no frontend UI yet
#[tauri::command]
pub async fn butler_update_proposal_status(
proposal_id: String,
status: String,
) -> std::result::Result<(), String> {
let new_status = match status.as_str() {
"accepted" => ProposalStatus::Accepted,
"rejected" => ProposalStatus::Rejected,
"completed" => ProposalStatus::Completed,
_ => ProposalStatus::Pending,
};
solution_store()
.update_status(&proposal_id, new_status)
.await
.ok_or_else(|| format!("Proposal {} not found", proposal_id))?;
Ok(())
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use zclaw_types::Message;
#[test]
fn test_pain_point_serialization() {
let pp = PainPoint::new(
"agent-1", "user-1", "出口包装合规反复出错",
"compliance", PainSeverity::High, "又要改包装", "用户第三次提到包装问题",
);
assert_eq!(pp.summary, "出口包装合规反复出错");
assert_eq!(pp.occurrence_count, 1);
assert_eq!(pp.status, PainStatus::Detected);
let json = serde_json::to_string(&pp).unwrap();
let decoded: PainPoint = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.summary, "出口包装合规反复出错");
assert_eq!(decoded.severity, PainSeverity::High);
}
#[test]
fn test_merge_similar_pain_points() {
let aggregator = PainAggregator::new();
let rt = tokio::runtime::Runtime::new().unwrap();
let p1 = PainPoint::new("agent-1", "user-1", "出口包装不合格",
"compliance", PainSeverity::Medium, "包装被退了", "用户首次提到");
let merged1 = rt.block_on(aggregator.merge_or_create(p1)).unwrap();
assert_eq!(merged1.occurrence_count, 1);
let p2 = PainPoint::new("agent-1", "user-1", "包装不合格又要改",
"compliance", PainSeverity::High, "又要改包装", "用户第二次提到");
let merged2 = rt.block_on(aggregator.merge_or_create(p2)).unwrap();
assert_eq!(merged2.occurrence_count, 2);
assert!(merged2.confidence > 0.3);
assert_eq!(merged2.status, PainStatus::Confirmed);
}
#[test]
fn test_different_categories_not_merged() {
let aggregator = PainAggregator::new();
let rt = tokio::runtime::Runtime::new().unwrap();
let p1 = PainPoint::new("agent-1", "user-1", "出口包装不合格",
"compliance", PainSeverity::Medium, "包装被退", "包装问题");
let p2 = PainPoint::new("agent-1", "user-1", "客户投诉服务态度",
"customer", PainSeverity::High, "客户又投诉了", "客户问题");
rt.block_on(aggregator.merge_or_create(p1)).unwrap();
let merged2 = rt.block_on(aggregator.merge_or_create(p2)).unwrap();
assert_eq!(merged2.occurrence_count, 1);
}
#[test]
fn test_confidence_calculation() {
let mut pain = PainPoint::new("agent-1", "user-1", "test",
"general", PainSeverity::High, "test", "test");
pain.occurrence_count = 3;
pain.last_seen = Utc::now();
let conf = PainAggregator::calculate_confidence(&pain);
assert!(conf > 0.5, "3 occurrences + High severity should be > 0.5, got {}", conf);
assert!(conf <= 1.0);
}
#[test]
fn test_analyze_frustration_signals() {
let messages = vec![
Message::user("这批货又被退了"),
Message::assistant("很抱歉听到这个消息"),
Message::user("又要改包装,烦死了,第三次了"),
];
let result = analyze_for_pain_signals(&messages);
assert!(result.is_some());
let analysis = result.unwrap();
assert!(analysis.found);
assert_eq!(analysis.severity, PainSeverity::High);
assert!(!analysis.summary.is_empty());
}
#[test]
fn test_analyze_no_frustration() {
let messages = vec![
Message::user("今天天气不错"),
Message::assistant("是的,天气很好"),
];
assert!(analyze_for_pain_signals(&messages).is_none());
}
#[test]
fn test_classify_category() {
assert_eq!(classify_category("出口包装被退了"), "logistics");
assert_eq!(classify_category("合规标准变了"), "compliance");
assert_eq!(classify_category("客户投诉很多"), "customer");
assert_eq!(classify_category("报价太低了"), "pricing");
assert_eq!(classify_category("系统又崩了"), "technology");
assert_eq!(classify_category("随便聊聊天"), "general");
}
#[test]
fn test_severity_ordering() {
let messages = vec![
Message::user("这又来了"),
Message::user("还是不行"),
];
let result = analyze_for_pain_signals(&messages);
assert!(result.is_some());
assert_eq!(result.unwrap().severity, PainSeverity::Medium);
}
#[test]
fn test_get_high_confidence_filters() {
let aggregator = PainAggregator::new();
let rt = tokio::runtime::Runtime::new().unwrap();
for i in 0..3 {
let p = PainPoint::new("agent-1", "user-1", "包装不合格反复出错",
"compliance", PainSeverity::High,
&format!("{}次出现", i + 1), "recurring");
rt.block_on(aggregator.merge_or_create(p)).unwrap();
}
let high = rt.block_on(aggregator.get_high_confidence_pains("agent-1", 0.5));
assert!(!high.is_empty());
assert!(high[0].confidence >= 0.5);
assert_eq!(high[0].occurrence_count, 3);
}
}