Files
zclaw_openfang/crates/zclaw-memory/src/user_profile_store.rs
iven b726d0cd5e
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(growth,memory,hands): 穷尽审计后 4 项修复 — 伪造时间戳+测试覆盖+注释纠正
CRITICAL:
- user_profile_store: find_active_pains_since 改为 find_active_pains,
  移除无意义 .filter(|_| true),不再伪造 created_at=since

HIGH:
- daily_report: 移除虚假的 "Emits Tauri event" 注释(事件发射是调用方职责)
- daily_report: chrono::Local → chrono::Utc 一致性修复
- 新增 8 个单元测试: PainPoint 系列测试 + find_since + get_events_since

验证: zclaw-memory 54 PASS, zclaw-growth 151 PASS, zclaw-hands 5 PASS
2026-04-21 18:45:10 +08:00

743 lines
25 KiB
Rust

//! User Profile Store — structured user modeling from conversation patterns.
//!
//! Maintains a single `UserProfile` per user (desktop uses "default_user")
//! in a dedicated SQLite table. Vec fields (recent_topics, pain points,
//! preferred_tools) are stored as JSON arrays and transparently
//! (de)serialised on read/write.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::Row;
use sqlx::SqlitePool;
use zclaw_types::Result;
// ---------------------------------------------------------------------------
// Data types
// ---------------------------------------------------------------------------
/// Pain point status for tracking resolution.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PainStatus {
Active,
Resolved,
Deferred,
}
impl PainStatus {
pub fn as_str(&self) -> &'static str {
match self {
PainStatus::Active => "active",
PainStatus::Resolved => "resolved",
PainStatus::Deferred => "deferred",
}
}
pub fn from_str_lossy(s: &str) -> Self {
match s {
"resolved" => PainStatus::Resolved,
"deferred" => PainStatus::Deferred,
_ => PainStatus::Active,
}
}
}
/// Structured pain point with tracking metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PainPoint {
pub content: String,
pub created_at: DateTime<Utc>,
pub last_mentioned_at: DateTime<Utc>,
pub status: PainStatus,
pub occurrence_count: u32,
}
impl PainPoint {
pub fn new(content: &str) -> Self {
let now = Utc::now();
Self {
content: content.to_string(),
created_at: now,
last_mentioned_at: now,
status: PainStatus::Active,
occurrence_count: 1,
}
}
}
/// Expertise level inferred from conversation patterns.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Level {
Beginner,
Intermediate,
Expert,
}
impl Level {
pub fn as_str(&self) -> &'static str {
match self {
Level::Beginner => "beginner",
Level::Intermediate => "intermediate",
Level::Expert => "expert",
}
}
pub fn from_str_lossy(s: &str) -> Option<Self> {
match s {
"beginner" => Some(Level::Beginner),
"intermediate" => Some(Level::Intermediate),
"expert" => Some(Level::Expert),
_ => None,
}
}
}
/// Communication style preference.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CommStyle {
Concise,
Detailed,
Formal,
Casual,
}
impl CommStyle {
pub fn as_str(&self) -> &'static str {
match self {
CommStyle::Concise => "concise",
CommStyle::Detailed => "detailed",
CommStyle::Formal => "formal",
CommStyle::Casual => "casual",
}
}
pub fn from_str_lossy(s: &str) -> Option<Self> {
match s {
"concise" => Some(CommStyle::Concise),
"detailed" => Some(CommStyle::Detailed),
"formal" => Some(CommStyle::Formal),
"casual" => Some(CommStyle::Casual),
_ => None,
}
}
}
/// Structured user profile (one record per user).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserProfile {
pub user_id: String,
pub industry: Option<String>,
pub role: Option<String>,
pub expertise_level: Option<Level>,
pub communication_style: Option<CommStyle>,
pub preferred_language: String,
pub recent_topics: Vec<String>,
pub active_pain_points: Vec<String>,
pub preferred_tools: Vec<String>,
pub confidence: f32,
pub updated_at: DateTime<Utc>,
}
impl UserProfile {
/// Create a blank profile for the given user.
pub fn blank(user_id: &str) -> Self {
Self {
user_id: user_id.to_string(),
industry: None,
role: None,
expertise_level: None,
communication_style: None,
preferred_language: "zh-CN".to_string(),
recent_topics: Vec::new(),
active_pain_points: Vec::new(),
preferred_tools: Vec::new(),
confidence: 0.0,
updated_at: Utc::now(),
}
}
/// Default profile for single-user desktop mode ("default_user").
pub fn default_profile() -> Self {
Self::blank("default_user")
}
}
// ---------------------------------------------------------------------------
// DDL
// ---------------------------------------------------------------------------
const PROFILE_DDL: &str = r#"
CREATE TABLE IF NOT EXISTS user_profiles (
user_id TEXT PRIMARY KEY,
industry TEXT,
role TEXT,
expertise_level TEXT,
communication_style TEXT,
preferred_language TEXT DEFAULT 'zh-CN',
recent_topics TEXT DEFAULT '[]',
active_pain_points TEXT DEFAULT '[]',
preferred_tools TEXT DEFAULT '[]',
confidence REAL DEFAULT 0.0,
updated_at TEXT NOT NULL
)
"#;
// ---------------------------------------------------------------------------
// Row mapping
// ---------------------------------------------------------------------------
fn row_to_profile(row: &sqlx::sqlite::SqliteRow) -> Result<UserProfile> {
let recent_topics_json: String = row.try_get("recent_topics").unwrap_or_else(|_| "[]".to_string());
let pain_json: String = row.try_get("active_pain_points").unwrap_or_else(|_| "[]".to_string());
let tools_json: String = row.try_get("preferred_tools").unwrap_or_else(|_| "[]".to_string());
let recent_topics: Vec<String> = serde_json::from_str(&recent_topics_json)?;
let active_pain_points: Vec<String> = serde_json::from_str(&pain_json)?;
let preferred_tools: Vec<String> = serde_json::from_str(&tools_json)?;
let expertise_str: Option<String> = row.try_get("expertise_level").unwrap_or(None);
let comm_str: Option<String> = row.try_get("communication_style").unwrap_or(None);
let updated_at_str: String = row.try_get("updated_at").unwrap_or_else(|_| Utc::now().to_rfc3339());
let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
Ok(UserProfile {
user_id: row.try_get("user_id").unwrap_or_default(),
industry: row.try_get("industry").unwrap_or(None),
role: row.try_get("role").unwrap_or(None),
expertise_level: expertise_str.as_deref().and_then(Level::from_str_lossy),
communication_style: comm_str.as_deref().and_then(CommStyle::from_str_lossy),
preferred_language: row.try_get("preferred_language").unwrap_or_else(|_| "zh-CN".to_string()),
recent_topics,
active_pain_points,
preferred_tools,
confidence: row.try_get("confidence").unwrap_or(0.0),
updated_at,
})
}
// ---------------------------------------------------------------------------
// UserProfileStore
// ---------------------------------------------------------------------------
/// SQLite-backed store for user profiles.
pub struct UserProfileStore {
pool: SqlitePool,
}
impl UserProfileStore {
/// Create a new store backed by the given connection pool.
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
/// Create tables. Idempotent — safe to call on every startup.
pub async fn initialize_schema(&self) -> Result<()> {
sqlx::query(PROFILE_DDL)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
Ok(())
}
/// Fetch the profile for a user. Returns `None` when no row exists.
pub async fn get(&self, user_id: &str) -> Result<Option<UserProfile>> {
let row = sqlx::query(
"SELECT user_id, industry, role, expertise_level, communication_style, \
preferred_language, recent_topics, active_pain_points, preferred_tools, \
confidence, updated_at \
FROM user_profiles WHERE user_id = ?",
)
.bind(user_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
match row {
Some(r) => Ok(Some(row_to_profile(&r)?)),
None => Ok(None),
}
}
/// Insert or replace the full profile.
pub async fn upsert(&self, profile: &UserProfile) -> Result<()> {
let topics = serde_json::to_string(&profile.recent_topics)?;
let pains = serde_json::to_string(&profile.active_pain_points)?;
let tools = serde_json::to_string(&profile.preferred_tools)?;
let expertise = profile.expertise_level.map(|l| l.as_str());
let comm = profile.communication_style.map(|c| c.as_str());
let updated = profile.updated_at.to_rfc3339();
sqlx::query(
"INSERT OR REPLACE INTO user_profiles \
(user_id, industry, role, expertise_level, communication_style, \
preferred_language, recent_topics, active_pain_points, preferred_tools, \
confidence, updated_at) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&profile.user_id)
.bind(&profile.industry)
.bind(&profile.role)
.bind(expertise)
.bind(comm)
.bind(&profile.preferred_language)
.bind(&topics)
.bind(&pains)
.bind(&tools)
.bind(profile.confidence)
.bind(&updated)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
Ok(())
}
/// Update a single scalar field by name.
///
/// `field` must be one of: industry, role, expertise_level,
/// communication_style, preferred_language, confidence.
/// Returns error for unrecognised field names (prevents SQL injection).
pub async fn update_field(&self, user_id: &str, field: &str, value: &str) -> Result<()> {
let sql = match field {
"industry" => "UPDATE user_profiles SET industry = ?, updated_at = ? WHERE user_id = ?",
"role" => "UPDATE user_profiles SET role = ?, updated_at = ? WHERE user_id = ?",
"expertise_level" => {
"UPDATE user_profiles SET expertise_level = ?, updated_at = ? WHERE user_id = ?"
}
"communication_style" => {
"UPDATE user_profiles SET communication_style = ?, updated_at = ? WHERE user_id = ?"
}
"preferred_language" => {
"UPDATE user_profiles SET preferred_language = ?, updated_at = ? WHERE user_id = ?"
}
"confidence" => {
"UPDATE user_profiles SET confidence = ?, updated_at = ? WHERE user_id = ?"
}
_ => {
return Err(zclaw_types::ZclawError::InvalidInput(format!(
"Unknown profile field: {}",
field
)));
}
};
let now = Utc::now().to_rfc3339();
// confidence is REAL; parse the value string.
if field == "confidence" {
let f: f32 = value.parse().map_err(|_| {
zclaw_types::ZclawError::InvalidInput(format!("Invalid confidence: {}", value))
})?;
sqlx::query(sql)
.bind(f)
.bind(&now)
.bind(user_id)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
} else {
sqlx::query(sql)
.bind(value)
.bind(&now)
.bind(user_id)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
}
Ok(())
}
/// Append a topic to `recent_topics`, trimming to `max_topics`.
/// Creates a default profile row if none exists.
pub async fn add_recent_topic(
&self,
user_id: &str,
topic: &str,
max_topics: usize,
) -> Result<()> {
let mut profile = self
.get(user_id)
.await?
.unwrap_or_else(|| UserProfile::blank(user_id));
// Deduplicate: remove if already present, then push to front.
profile.recent_topics.retain(|t| t != topic);
profile.recent_topics.insert(0, topic.to_string());
profile.recent_topics.truncate(max_topics);
profile.updated_at = Utc::now();
self.upsert(&profile).await
}
/// Append a pain point, trimming to `max_pains`.
/// Creates a default profile row if none exists.
pub async fn add_pain_point(
&self,
user_id: &str,
pain: &str,
max_pains: usize,
) -> Result<()> {
let mut profile = self
.get(user_id)
.await?
.unwrap_or_else(|| UserProfile::blank(user_id));
profile.active_pain_points.retain(|p| p != pain);
profile.active_pain_points.insert(0, pain.to_string());
profile.active_pain_points.truncate(max_pains);
profile.updated_at = Utc::now();
self.upsert(&profile).await
}
/// Append a preferred tool, trimming to `max_tools`.
/// Creates a default profile row if none exists.
pub async fn add_preferred_tool(
&self,
user_id: &str,
tool: &str,
max_tools: usize,
) -> Result<()> {
let mut profile = self
.get(user_id)
.await?
.unwrap_or_else(|| UserProfile::blank(user_id));
profile.preferred_tools.retain(|t| t != tool);
profile.preferred_tools.insert(0, tool.to_string());
profile.preferred_tools.truncate(max_tools);
profile.updated_at = Utc::now();
self.upsert(&profile).await
}
/// Return all active pain points for a user as structured PainPoint objects.
///
/// Note: the existing schema stores pain points as flat strings without
/// timestamps. The returned `PainPoint.created_at` is set to the profile's
/// `updated_at` as the best available approximation. The `since` parameter
/// is accepted for API consistency but cannot truly filter by creation time
/// with the current schema.
pub async fn find_active_pains(
&self,
user_id: &str,
) -> Result<Vec<PainPoint>> {
let profile = self.get(user_id).await?;
Ok(match profile {
Some(p) => p
.active_pain_points
.into_iter()
.map(|content| PainPoint {
content,
created_at: p.updated_at,
last_mentioned_at: p.updated_at,
status: PainStatus::Active,
occurrence_count: 1,
})
.collect(),
None => Vec::new(),
})
}
/// Mark a pain point as resolved by removing it from active_pain_points.
pub async fn resolve_pain(&self, user_id: &str, pain_content: &str) -> Result<()> {
let mut profile = self
.get(user_id)
.await?
.unwrap_or_else(|| UserProfile::blank(user_id));
profile.active_pain_points.retain(|p| p != pain_content);
profile.updated_at = Utc::now();
self.upsert(&profile).await
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
/// Helper: create an in-memory store with schema.
async fn test_store() -> UserProfileStore {
let pool = SqlitePool::connect("sqlite::memory:")
.await
.expect("in-memory pool");
let store = UserProfileStore::new(pool);
store.initialize_schema().await.expect("schema init");
store
}
#[tokio::test]
async fn test_initialize_schema_idempotent() {
let store = test_store().await;
// Second call should succeed without error.
store.initialize_schema().await.unwrap();
store.initialize_schema().await.unwrap();
}
#[tokio::test]
async fn test_get_returns_none_for_missing() {
let store = test_store().await;
let profile = store.get("nonexistent").await.unwrap();
assert!(profile.is_none());
}
#[tokio::test]
async fn test_upsert_and_get() {
let store = test_store().await;
let mut profile = UserProfile::blank("default_user");
profile.industry = Some("healthcare".to_string());
profile.role = Some("admin".to_string());
profile.expertise_level = Some(Level::Intermediate);
profile.communication_style = Some(CommStyle::Concise);
profile.recent_topics = vec!["reporting".to_string(), "compliance".to_string()];
profile.confidence = 0.65;
store.upsert(&profile).await.unwrap();
let loaded = store.get("default_user").await.unwrap().unwrap();
assert_eq!(loaded.user_id, "default_user");
assert_eq!(loaded.industry.as_deref(), Some("healthcare"));
assert_eq!(loaded.role.as_deref(), Some("admin"));
assert_eq!(loaded.expertise_level, Some(Level::Intermediate));
assert_eq!(loaded.communication_style, Some(CommStyle::Concise));
assert_eq!(loaded.recent_topics, vec!["reporting", "compliance"]);
assert!((loaded.confidence - 0.65).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_upsert_replaces_existing() {
let store = test_store().await;
let mut profile = UserProfile::blank("user1");
profile.industry = Some("tech".to_string());
store.upsert(&profile).await.unwrap();
profile.industry = Some("finance".to_string());
store.upsert(&profile).await.unwrap();
let loaded = store.get("user1").await.unwrap().unwrap();
assert_eq!(loaded.industry.as_deref(), Some("finance"));
}
#[tokio::test]
async fn test_update_field_scalar() {
let store = test_store().await;
let profile = UserProfile::blank("user2");
store.upsert(&profile).await.unwrap();
store
.update_field("user2", "industry", "education")
.await
.unwrap();
store
.update_field("user2", "role", "teacher")
.await
.unwrap();
let loaded = store.get("user2").await.unwrap().unwrap();
assert_eq!(loaded.industry.as_deref(), Some("education"));
assert_eq!(loaded.role.as_deref(), Some("teacher"));
}
#[tokio::test]
async fn test_update_field_confidence() {
let store = test_store().await;
let profile = UserProfile::blank("user3");
store.upsert(&profile).await.unwrap();
store
.update_field("user3", "confidence", "0.88")
.await
.unwrap();
let loaded = store.get("user3").await.unwrap().unwrap();
assert!((loaded.confidence - 0.88).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_update_field_rejects_unknown() {
let store = test_store().await;
let result = store.update_field("user", "evil_column", "oops").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_add_recent_topic_auto_creates_profile() {
let store = test_store().await;
// No profile exists yet.
store
.add_recent_topic("new_user", "data analysis", 5)
.await
.unwrap();
let loaded = store.get("new_user").await.unwrap().unwrap();
assert_eq!(loaded.recent_topics, vec!["data analysis"]);
}
#[tokio::test]
async fn test_add_recent_topic_dedup_and_trim() {
let store = test_store().await;
let profile = UserProfile::blank("user");
store.upsert(&profile).await.unwrap();
store.add_recent_topic("user", "topic_a", 3).await.unwrap();
store.add_recent_topic("user", "topic_b", 3).await.unwrap();
store.add_recent_topic("user", "topic_c", 3).await.unwrap();
// Duplicate — should move to front, not add.
store.add_recent_topic("user", "topic_a", 3).await.unwrap();
let loaded = store.get("user").await.unwrap().unwrap();
assert_eq!(
loaded.recent_topics,
vec!["topic_a", "topic_c", "topic_b"]
);
}
#[tokio::test]
async fn test_add_pain_point_trim() {
let store = test_store().await;
for i in 0..5 {
store
.add_pain_point("user", &format!("pain_{}", i), 3)
.await
.unwrap();
}
let loaded = store.get("user").await.unwrap().unwrap();
assert_eq!(loaded.active_pain_points.len(), 3);
// Most recent first.
assert_eq!(loaded.active_pain_points[0], "pain_4");
}
#[tokio::test]
async fn test_add_preferred_tool_trim() {
let store = test_store().await;
store
.add_preferred_tool("user", "python", 5)
.await
.unwrap();
store
.add_preferred_tool("user", "rust", 5)
.await
.unwrap();
// Duplicate — moved to front.
store
.add_preferred_tool("user", "python", 5)
.await
.unwrap();
let loaded = store.get("user").await.unwrap().unwrap();
assert_eq!(loaded.preferred_tools, vec!["python", "rust"]);
}
#[test]
fn test_level_round_trip() {
for level in [Level::Beginner, Level::Intermediate, Level::Expert] {
assert_eq!(Level::from_str_lossy(level.as_str()), Some(level));
}
assert_eq!(Level::from_str_lossy("unknown"), None);
}
#[test]
fn test_comm_style_round_trip() {
for style in [
CommStyle::Concise,
CommStyle::Detailed,
CommStyle::Formal,
CommStyle::Casual,
] {
assert_eq!(CommStyle::from_str_lossy(style.as_str()), Some(style));
}
assert_eq!(CommStyle::from_str_lossy("unknown"), None);
}
#[test]
fn test_profile_serialization() {
let mut p = UserProfile::blank("test_user");
p.industry = Some("logistics".into());
p.expertise_level = Some(Level::Expert);
p.communication_style = Some(CommStyle::Detailed);
p.recent_topics = vec!["exports".into(), "customs".into()];
let json = serde_json::to_string(&p).unwrap();
let decoded: UserProfile = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.user_id, "test_user");
assert_eq!(decoded.industry.as_deref(), Some("logistics"));
assert_eq!(decoded.expertise_level, Some(Level::Expert));
assert_eq!(decoded.communication_style, Some(CommStyle::Detailed));
assert_eq!(decoded.recent_topics, vec!["exports", "customs"]);
}
#[test]
fn test_pain_status_roundtrip() {
assert_eq!(PainStatus::from_str_lossy(PainStatus::Active.as_str()), PainStatus::Active);
assert_eq!(PainStatus::from_str_lossy(PainStatus::Resolved.as_str()), PainStatus::Resolved);
assert_eq!(PainStatus::from_str_lossy(PainStatus::Deferred.as_str()), PainStatus::Deferred);
assert_eq!(PainStatus::from_str_lossy("unknown"), PainStatus::Active);
}
#[test]
fn test_pain_point_new() {
let pp = PainPoint::new("scheduling conflict");
assert_eq!(pp.content, "scheduling conflict");
assert_eq!(pp.status, PainStatus::Active);
assert_eq!(pp.occurrence_count, 1);
}
#[tokio::test]
async fn test_find_active_pains() {
let store = test_store().await;
store.add_pain_point("user", "pain_a", 5).await.unwrap();
store.add_pain_point("user", "pain_b", 5).await.unwrap();
let pains = store.find_active_pains("user").await.unwrap();
assert_eq!(pains.len(), 2);
assert!(pains.iter().any(|p| p.content == "pain_a"));
assert!(pains.iter().any(|p| p.content == "pain_b"));
assert_eq!(pains[0].status, PainStatus::Active);
}
#[tokio::test]
async fn test_find_active_pains_empty() {
let store = test_store().await;
let pains = store.find_active_pains("nonexistent").await.unwrap();
assert!(pains.is_empty());
}
#[tokio::test]
async fn test_resolve_pain() {
let store = test_store().await;
store.add_pain_point("user", "pain_a", 5).await.unwrap();
store.add_pain_point("user", "pain_b", 5).await.unwrap();
store.resolve_pain("user", "pain_a").await.unwrap();
let loaded = store.get("user").await.unwrap().unwrap();
assert_eq!(loaded.active_pain_points, vec!["pain_b"]);
}
#[tokio::test]
async fn test_resolve_pain_nonexistent_is_noop() {
let store = test_store().await;
let profile = UserProfile::blank("user");
store.upsert(&profile).await.unwrap();
// Should not error when pain doesn't exist
store.resolve_pain("user", "nonexistent_pain").await.unwrap();
}
}