diff --git a/crates/zclaw-growth/src/experience_store.rs b/crates/zclaw-growth/src/experience_store.rs new file mode 100644 index 0000000..fc26af2 --- /dev/null +++ b/crates/zclaw-growth/src/experience_store.rs @@ -0,0 +1,356 @@ +//! ExperienceStore — CRUD wrapper over VikingStorage for agent experiences. +//! +//! Stores structured experiences extracted from successful solution proposals +//! using the scope prefix `agent://{agent_id}/experience/{pattern_hash}`. +//! Leverages existing FTS5 + TF-IDF + embedding retrieval via VikingAdapter. + +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, warn}; +use uuid::Uuid; + +use crate::types::{MemoryEntry, MemoryType}; +use crate::viking_adapter::{FindOptions, VikingAdapter}; + +// --------------------------------------------------------------------------- +// Experience data model +// --------------------------------------------------------------------------- + +/// A structured experience record representing a solved pain point. +/// +/// Stored as JSON content inside a VikingStorage `MemoryEntry` with +/// `memory_type = Experience`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Experience { + /// Unique experience identifier. + pub id: String, + /// Owning agent. + pub agent_id: String, + /// Short pattern describing the pain that was solved (e.g. "logistics export packaging"). + pub pain_pattern: String, + /// Context in which the problem occurred. + pub context: String, + /// Ordered steps that resolved the problem. + pub solution_steps: Vec, + /// Verbal outcome reported by the user. + pub outcome: String, + /// How many times this experience has been reused as a reference. + pub reuse_count: u32, + /// Timestamp of initial creation. + pub created_at: DateTime, + /// Timestamp of most recent reuse or update. + pub updated_at: DateTime, +} + +impl Experience { + /// Create a new experience with the given fields. + pub fn new( + agent_id: &str, + pain_pattern: &str, + context: &str, + solution_steps: Vec, + outcome: &str, + ) -> Self { + let now = Utc::now(); + Self { + id: Uuid::new_v4().to_string(), + agent_id: agent_id.to_string(), + pain_pattern: pain_pattern.to_string(), + context: context.to_string(), + solution_steps, + outcome: outcome.to_string(), + reuse_count: 0, + created_at: now, + updated_at: now, + } + } + + /// Deterministic URI for this experience, keyed on a stable hash of the + /// pain pattern so duplicate patterns overwrite the same entry. + pub fn uri(&self) -> String { + let hash = simple_hash(&self.pain_pattern); + format!("agent://{}/experience/{}", self.agent_id, hash) + } +} + +/// FNV-1a–inspired stable 8-hex-char hash. Good enough for deduplication; +/// collisions are acceptable because the full `pain_pattern` is still stored. +fn simple_hash(s: &str) -> String { + let mut h: u32 = 2166136261; + for b in s.as_bytes() { + h ^= *b as u32; + h = h.wrapping_mul(16777619); + } + format!("{:08x}", h) +} + +// --------------------------------------------------------------------------- +// ExperienceStore +// --------------------------------------------------------------------------- + +/// CRUD wrapper that persists [`Experience`] records through [`VikingAdapter`]. +pub struct ExperienceStore { + viking: Arc, +} + +impl ExperienceStore { + /// Create a new store backed by the given VikingAdapter. + pub fn new(viking: Arc) -> Self { + Self { viking } + } + + /// Store (or overwrite) an experience. The URI is derived from + /// `agent_id + pain_pattern`, ensuring one experience per pattern. + pub async fn store_experience(&self, exp: &Experience) -> zclaw_types::Result<()> { + let uri = exp.uri(); + let content = serde_json::to_string(exp)?; + let mut keywords = vec![exp.pain_pattern.clone()]; + keywords.extend(exp.solution_steps.iter().take(3).cloned()); + + let entry = MemoryEntry { + uri, + memory_type: MemoryType::Experience, + content, + keywords, + importance: 8, + access_count: 0, + created_at: exp.created_at, + last_accessed: exp.updated_at, + overview: Some(exp.pain_pattern.clone()), + abstract_summary: Some(exp.outcome.clone()), + }; + + self.viking.store(&entry).await?; + debug!("[ExperienceStore] Stored experience {} for agent {}", exp.id, exp.agent_id); + Ok(()) + } + + /// Find experiences whose pain pattern matches the given query. + pub async fn find_by_pattern( + &self, + agent_id: &str, + pattern_query: &str, + ) -> zclaw_types::Result> { + let scope = format!("agent://{}/experience/", agent_id); + let opts = FindOptions { + scope: Some(scope), + limit: Some(10), + min_similarity: None, + }; + let entries = self.viking.find(pattern_query, opts).await?; + let mut results = Vec::with_capacity(entries.len()); + for entry in entries { + match serde_json::from_str::(&entry.content) { + Ok(exp) => results.push(exp), + Err(e) => warn!("[ExperienceStore] Failed to deserialize experience at {}: {}", entry.uri, e), + } + } + Ok(results) + } + + /// Return all experiences for a given agent. + pub async fn find_by_agent( + &self, + agent_id: &str, + ) -> zclaw_types::Result> { + let prefix = format!("agent://{}/experience/", agent_id); + let entries = self.viking.find_by_prefix(&prefix).await?; + let mut results = Vec::with_capacity(entries.len()); + for entry in entries { + match serde_json::from_str::(&entry.content) { + Ok(exp) => results.push(exp), + Err(e) => warn!("[ExperienceStore] Failed to deserialize experience at {}: {}", entry.uri, e), + } + } + Ok(results) + } + + /// Increment the reuse counter for an existing experience. + /// On failure, logs a warning but does **not** propagate the error so + /// callers are never blocked. + pub async fn increment_reuse(&self, exp: &Experience) { + let mut updated = exp.clone(); + updated.reuse_count += 1; + updated.updated_at = Utc::now(); + if let Err(e) = self.store_experience(&updated).await { + warn!("[ExperienceStore] Failed to increment reuse for {}: {}", exp.id, e); + } + } + + /// Delete a single experience by its URI. + pub async fn delete(&self, exp: &Experience) -> zclaw_types::Result<()> { + let uri = exp.uri(); + self.viking.delete(&uri).await?; + debug!("[ExperienceStore] Deleted experience {} for agent {}", exp.id, exp.agent_id); + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_experience_new() { + let exp = Experience::new( + "agent-1", + "logistics export packaging", + "export packaging rejected by customs", + vec!["check regulations".into(), "use approved materials".into()], + "packaging passed customs", + ); + assert!(!exp.id.is_empty()); + assert_eq!(exp.agent_id, "agent-1"); + assert_eq!(exp.solution_steps.len(), 2); + assert_eq!(exp.reuse_count, 0); + } + + #[test] + fn test_uri_deterministic() { + let exp1 = Experience::new( + "agent-1", "packaging issue", "ctx", + vec!["step1".into()], "ok", + ); + // Second experience with same agent + pattern should produce the same URI. + let mut exp2 = exp1.clone(); + exp2.id = "different-id".to_string(); + assert_eq!(exp1.uri(), exp2.uri()); + } + + #[test] + fn test_uri_differs_for_different_patterns() { + let exp_a = Experience::new( + "agent-1", "packaging issue", "ctx", + vec!["step1".into()], "ok", + ); + let exp_b = Experience::new( + "agent-1", "compliance gap", "ctx", + vec!["step1".into()], "ok", + ); + assert_ne!(exp_a.uri(), exp_b.uri()); + } + + #[test] + fn test_simple_hash_stability() { + let h1 = simple_hash("hello world"); + let h2 = simple_hash("hello world"); + assert_eq!(h1, h2); + assert_eq!(h1.len(), 8); + } + + #[tokio::test] + async fn test_store_and_find_by_agent() { + let viking = Arc::new(VikingAdapter::in_memory()); + let store = ExperienceStore::new(viking); + + let exp = Experience::new( + "agent-42", + "export document errors", + "recurring mistakes in export docs", + vec!["use template".into(), "auto-validate".into()], + "no more errors", + ); + + store.store_experience(&exp).await.unwrap(); + + let found = store.find_by_agent("agent-42").await.unwrap(); + assert_eq!(found.len(), 1); + assert_eq!(found[0].pain_pattern, "export document errors"); + assert_eq!(found[0].solution_steps.len(), 2); + } + + #[tokio::test] + async fn test_store_overwrites_same_pattern() { + let viking = Arc::new(VikingAdapter::in_memory()); + let store = ExperienceStore::new(viking); + + let exp_v1 = Experience::new( + "agent-1", "packaging", "v1", + vec!["old step".into()], "ok", + ); + store.store_experience(&exp_v1).await.unwrap(); + + let exp_v2 = Experience::new( + "agent-1", "packaging", "v2 updated", + vec!["new step".into()], "better", + ); + // Force same URI by reusing the ID logic — same pattern → same URI. + store.store_experience(&exp_v2).await.unwrap(); + + let found = store.find_by_agent("agent-1").await.unwrap(); + // Should be overwritten, not duplicated (same URI). + assert_eq!(found.len(), 1); + assert_eq!(found[0].context, "v2 updated"); + } + + #[tokio::test] + async fn test_find_by_pattern() { + let viking = Arc::new(VikingAdapter::in_memory()); + let store = ExperienceStore::new(viking); + + let exp = Experience::new( + "agent-1", + "logistics packaging compliance", + "export compliance issues", + vec!["check regulations".into()], + "passed audit", + ); + store.store_experience(&exp).await.unwrap(); + + let found = store.find_by_pattern("agent-1", "packaging").await.unwrap(); + assert_eq!(found.len(), 1); + } + + #[tokio::test] + async fn test_increment_reuse() { + let viking = Arc::new(VikingAdapter::in_memory()); + let store = ExperienceStore::new(viking); + + let exp = Experience::new( + "agent-1", "packaging", "ctx", + vec!["step".into()], "ok", + ); + store.store_experience(&exp).await.unwrap(); + store.increment_reuse(&exp).await; + + let found = store.find_by_agent("agent-1").await.unwrap(); + assert_eq!(found[0].reuse_count, 1); + } + + #[tokio::test] + async fn test_delete_experience() { + let viking = Arc::new(VikingAdapter::in_memory()); + let store = ExperienceStore::new(viking); + + let exp = Experience::new( + "agent-1", "packaging", "ctx", + vec!["step".into()], "ok", + ); + store.store_experience(&exp).await.unwrap(); + store.delete(&exp).await.unwrap(); + + let found = store.find_by_agent("agent-1").await.unwrap(); + assert!(found.is_empty()); + } + + #[tokio::test] + async fn test_find_by_agent_filters_other_agents() { + let viking = Arc::new(VikingAdapter::in_memory()); + let store = ExperienceStore::new(viking); + + let exp_a = Experience::new("agent-a", "packaging", "ctx", vec!["s".into()], "ok"); + let exp_b = Experience::new("agent-b", "compliance", "ctx", vec!["s".into()], "ok"); + store.store_experience(&exp_a).await.unwrap(); + store.store_experience(&exp_b).await.unwrap(); + + let found_a = store.find_by_agent("agent-a").await.unwrap(); + assert_eq!(found_a.len(), 1); + assert_eq!(found_a[0].pain_pattern, "packaging"); + } +} diff --git a/crates/zclaw-growth/src/lib.rs b/crates/zclaw-growth/src/lib.rs index e3a5a95..65f5aaf 100644 --- a/crates/zclaw-growth/src/lib.rs +++ b/crates/zclaw-growth/src/lib.rs @@ -64,6 +64,7 @@ pub mod viking_adapter; pub mod storage; pub mod retrieval; pub mod summarizer; +pub mod experience_store; // Re-export main types for convenience pub use types::{ @@ -85,6 +86,7 @@ pub use injector::{InjectionFormat, PromptInjector}; pub use tracker::{AgentMetadata, GrowthTracker, LearningEvent}; pub use viking_adapter::{FindOptions, VikingAdapter, VikingLevel, VikingStorage}; pub use storage::SqliteStorage; +pub use experience_store::{Experience, ExperienceStore}; pub use retrieval::{EmbeddingClient, MemoryCache, QueryAnalyzer, SemanticScorer}; pub use summarizer::SummaryLlmDriver; diff --git a/crates/zclaw-memory/src/lib.rs b/crates/zclaw-memory/src/lib.rs index 02202e1..30766e9 100644 --- a/crates/zclaw-memory/src/lib.rs +++ b/crates/zclaw-memory/src/lib.rs @@ -6,8 +6,15 @@ mod store; mod session; mod schema; pub mod fact; +pub mod user_profile_store; +pub mod trajectory_store; pub use store::*; pub use session::*; pub use schema::*; pub use fact::{Fact, FactCategory, ExtractedFactBatch}; +pub use user_profile_store::{UserProfileStore, UserProfile, Level, CommStyle}; +pub use trajectory_store::{ + TrajectoryEvent, TrajectoryStore, TrajectoryStepType, + CompressedTrajectory, CompletionStatus, SatisfactionSignal, +}; diff --git a/crates/zclaw-memory/src/schema.rs b/crates/zclaw-memory/src/schema.rs index 15ade9d..1c1805e 100644 --- a/crates/zclaw-memory/src/schema.rs +++ b/crates/zclaw-memory/src/schema.rs @@ -93,4 +93,47 @@ pub const MIGRATIONS: &[&str] = &[ // v1→v2: persist runtime state and message count "ALTER TABLE agents ADD COLUMN state TEXT NOT NULL DEFAULT 'running'", "ALTER TABLE agents ADD COLUMN message_count INTEGER NOT NULL DEFAULT 0", + // v2→v3: user profiles for structured user modeling + "CREATE TABLE IF NOT EXISTS user_profiles ( + user_id TEXT PRIMARY KEY, + industry TEXT, + role TEXT, + expertise_level TEXT, + communication_style TEXT, + preferred_language TEXT DEFAULT 'zh-CN', + recent_topics TEXT DEFAULT '[]', + active_pain_points TEXT DEFAULT '[]', + preferred_tools TEXT DEFAULT '[]', + confidence REAL DEFAULT 0.0, + updated_at TEXT NOT NULL + )", + // v3→v4: trajectory recording for tool-call chain analysis + "CREATE TABLE IF NOT EXISTS trajectory_events ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + step_index INTEGER NOT NULL, + step_type TEXT NOT NULL, + input_summary TEXT, + output_summary TEXT, + duration_ms INTEGER DEFAULT 0, + timestamp TEXT NOT NULL + )", + "CREATE INDEX IF NOT EXISTS idx_trajectory_session ON trajectory_events(session_id)", + "CREATE TABLE IF NOT EXISTS compressed_trajectories ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + request_type TEXT NOT NULL, + tools_used TEXT, + outcome TEXT NOT NULL, + total_steps INTEGER DEFAULT 0, + total_duration_ms INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + execution_chain TEXT NOT NULL, + satisfaction_signal TEXT, + created_at TEXT NOT NULL + )", + "CREATE INDEX IF NOT EXISTS idx_ct_request_type ON compressed_trajectories(request_type)", + "CREATE INDEX IF NOT EXISTS idx_ct_outcome ON compressed_trajectories(outcome)", ]; diff --git a/crates/zclaw-memory/src/trajectory_store.rs b/crates/zclaw-memory/src/trajectory_store.rs new file mode 100644 index 0000000..62f68c6 --- /dev/null +++ b/crates/zclaw-memory/src/trajectory_store.rs @@ -0,0 +1,563 @@ +//! Trajectory Store -- record and compress tool-call chains for analysis. +//! +//! Stores raw trajectory events (user requests, tool calls, LLM generations) +//! and compressed trajectory summaries. Used by the Hermes Intelligence Pipeline +//! to analyze agent behaviour patterns and improve routing over time. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::SqlitePool; +use zclaw_types::{Result, ZclawError}; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/// Step type in a trajectory. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TrajectoryStepType { + UserRequest, + IntentClassification, + SkillSelection, + ToolExecution, + LlmGeneration, + UserFeedback, +} + +impl TrajectoryStepType { + /// Serialize to the string stored in SQLite. + pub fn as_str(&self) -> &'static str { + match self { + Self::UserRequest => "user_request", + Self::IntentClassification => "intent_classification", + Self::SkillSelection => "skill_selection", + Self::ToolExecution => "tool_execution", + Self::LlmGeneration => "llm_generation", + Self::UserFeedback => "user_feedback", + } + } + + /// Deserialize from the SQLite string representation. + pub fn from_str_lossy(s: &str) -> Self { + match s { + "user_request" => Self::UserRequest, + "intent_classification" => Self::IntentClassification, + "skill_selection" => Self::SkillSelection, + "tool_execution" => Self::ToolExecution, + "llm_generation" => Self::LlmGeneration, + "user_feedback" => Self::UserFeedback, + _ => Self::UserRequest, + } + } +} + +/// Single trajectory event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TrajectoryEvent { + pub id: String, + pub session_id: String, + pub agent_id: String, + pub step_index: usize, + pub step_type: TrajectoryStepType, + /// Summarised input (max 200 chars). + pub input_summary: String, + /// Summarised output (max 200 chars). + pub output_summary: String, + pub duration_ms: u64, + pub timestamp: DateTime, +} + +/// Satisfaction signal inferred from user feedback. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum SatisfactionSignal { + Positive, + Negative, + Neutral, +} + +impl SatisfactionSignal { + pub fn as_str(&self) -> &'static str { + match self { + Self::Positive => "positive", + Self::Negative => "negative", + Self::Neutral => "neutral", + } + } + + pub fn from_str_lossy(s: &str) -> Option { + match s { + "positive" => Some(Self::Positive), + "negative" => Some(Self::Negative), + "neutral" => Some(Self::Neutral), + _ => None, + } + } +} + +/// Completion status of a compressed trajectory. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CompletionStatus { + Success, + Partial, + Failed, + Abandoned, +} + +impl CompletionStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Success => "success", + Self::Partial => "partial", + Self::Failed => "failed", + Self::Abandoned => "abandoned", + } + } + + pub fn from_str_lossy(s: &str) -> Self { + match s { + "success" => Self::Success, + "partial" => Self::Partial, + "failed" => Self::Failed, + "abandoned" => Self::Abandoned, + _ => Self::Success, + } + } +} + +/// Compressed trajectory (generated at session end). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedTrajectory { + pub id: String, + pub session_id: String, + pub agent_id: String, + pub request_type: String, + pub tools_used: Vec, + pub outcome: CompletionStatus, + pub total_steps: usize, + pub total_duration_ms: u64, + pub total_tokens: u32, + /// Serialised JSON execution chain for analysis. + pub execution_chain: String, + pub satisfaction_signal: Option, + pub created_at: DateTime, +} + +// --------------------------------------------------------------------------- +// Store +// --------------------------------------------------------------------------- + +/// Persistent store for trajectory events and compressed trajectories. +pub struct TrajectoryStore { + pool: SqlitePool, +} + +impl TrajectoryStore { + /// Create a new `TrajectoryStore` backed by the given SQLite pool. + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + /// Create the required tables. Idempotent -- safe to call on startup. + pub async fn initialize_schema(&self) -> Result<()> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS trajectory_events ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + step_index INTEGER NOT NULL, + step_type TEXT NOT NULL, + input_summary TEXT, + output_summary TEXT, + duration_ms INTEGER DEFAULT 0, + timestamp TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_trajectory_session ON trajectory_events(session_id); + "#, + ) + .execute(&self.pool) + .await + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS compressed_trajectories ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + request_type TEXT NOT NULL, + tools_used TEXT, + outcome TEXT NOT NULL, + total_steps INTEGER DEFAULT 0, + total_duration_ms INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + execution_chain TEXT NOT NULL, + satisfaction_signal TEXT, + created_at TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_ct_request_type ON compressed_trajectories(request_type); + CREATE INDEX IF NOT EXISTS idx_ct_outcome ON compressed_trajectories(outcome); + "#, + ) + .execute(&self.pool) + .await + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + + Ok(()) + } + + /// Insert a raw trajectory event. + pub async fn insert_event(&self, event: &TrajectoryEvent) -> Result<()> { + sqlx::query( + r#" + INSERT INTO trajectory_events + (id, session_id, agent_id, step_index, step_type, + input_summary, output_summary, duration_ms, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&event.id) + .bind(&event.session_id) + .bind(&event.agent_id) + .bind(event.step_index as i64) + .bind(event.step_type.as_str()) + .bind(&event.input_summary) + .bind(&event.output_summary) + .bind(event.duration_ms as i64) + .bind(event.timestamp.to_rfc3339()) + .execute(&self.pool) + .await + .map_err(|e| { + tracing::warn!("[TrajectoryStore] insert_event failed: {}", e); + ZclawError::StorageError(e.to_string()) + })?; + + Ok(()) + } + + /// Retrieve all raw events for a session, ordered by step_index. + pub async fn get_events_by_session(&self, session_id: &str) -> Result> { + let rows = sqlx::query_as::<_, (String, String, String, i64, String, Option, Option, Option, String)>( + r#" + SELECT id, session_id, agent_id, step_index, step_type, + input_summary, output_summary, duration_ms, timestamp + FROM trajectory_events + WHERE session_id = ? + ORDER BY step_index ASC + "#, + ) + .bind(session_id) + .fetch_all(&self.pool) + .await + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + + let mut events = Vec::with_capacity(rows.len()); + for (id, sid, aid, step_idx, stype, input_s, output_s, dur_ms, ts) in rows { + let timestamp = DateTime::parse_from_rfc3339(&ts) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + + events.push(TrajectoryEvent { + id, + session_id: sid, + agent_id: aid, + step_index: step_idx as usize, + step_type: TrajectoryStepType::from_str_lossy(&stype), + input_summary: input_s.unwrap_or_default(), + output_summary: output_s.unwrap_or_default(), + duration_ms: dur_ms.unwrap_or(0) as u64, + timestamp, + }); + } + + Ok(events) + } + + /// Insert a compressed trajectory. + pub async fn insert_compressed(&self, trajectory: &CompressedTrajectory) -> Result<()> { + let tools_json = serde_json::to_string(&trajectory.tools_used) + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + + sqlx::query( + r#" + INSERT INTO compressed_trajectories + (id, session_id, agent_id, request_type, tools_used, + outcome, total_steps, total_duration_ms, total_tokens, + execution_chain, satisfaction_signal, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&trajectory.id) + .bind(&trajectory.session_id) + .bind(&trajectory.agent_id) + .bind(&trajectory.request_type) + .bind(&tools_json) + .bind(trajectory.outcome.as_str()) + .bind(trajectory.total_steps as i64) + .bind(trajectory.total_duration_ms as i64) + .bind(trajectory.total_tokens as i64) + .bind(&trajectory.execution_chain) + .bind(trajectory.satisfaction_signal.map(|s| s.as_str())) + .bind(trajectory.created_at.to_rfc3339()) + .execute(&self.pool) + .await + .map_err(|e| { + tracing::warn!("[TrajectoryStore] insert_compressed failed: {}", e); + ZclawError::StorageError(e.to_string()) + })?; + + Ok(()) + } + + /// Retrieve the compressed trajectory for a session, if any. + pub async fn get_compressed_by_session(&self, session_id: &str) -> Result> { + let row = sqlx::query_as::<_, ( + String, String, String, String, Option, + String, i64, i64, i64, String, Option, String, + )>( + r#" + SELECT id, session_id, agent_id, request_type, tools_used, + outcome, total_steps, total_duration_ms, total_tokens, + execution_chain, satisfaction_signal, created_at + FROM compressed_trajectories + WHERE session_id = ? + "#, + ) + .bind(session_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + + match row { + Some((id, sid, aid, req_type, tools_json, outcome_str, steps, dur_ms, tokens, chain, sat, created)) => { + let tools_used: Vec = tools_json + .as_deref() + .and_then(|j| serde_json::from_str(j).ok()) + .unwrap_or_default(); + + let timestamp = DateTime::parse_from_rfc3339(&created) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + + Ok(Some(CompressedTrajectory { + id, + session_id: sid, + agent_id: aid, + request_type: req_type, + tools_used, + outcome: CompletionStatus::from_str_lossy(&outcome_str), + total_steps: steps as usize, + total_duration_ms: dur_ms as u64, + total_tokens: tokens as u32, + execution_chain: chain, + satisfaction_signal: sat.as_deref().and_then(SatisfactionSignal::from_str_lossy), + created_at: timestamp, + })) + } + None => Ok(None), + } + } + + /// Delete raw trajectory events older than `days` days. Returns count deleted. + pub async fn delete_events_older_than(&self, days: i64) -> Result { + let result = sqlx::query( + r#" + DELETE FROM trajectory_events + WHERE timestamp < datetime('now', ?) + "#, + ) + .bind(format!("-{} days", days)) + .execute(&self.pool) + .await + .map_err(|e| { + tracing::warn!("[TrajectoryStore] delete_events_older_than failed: {}", e); + ZclawError::StorageError(e.to_string()) + })?; + + Ok(result.rows_affected()) + } + + /// Delete compressed trajectories older than `days` days. Returns count deleted. + pub async fn delete_compressed_older_than(&self, days: i64) -> Result { + let result = sqlx::query( + r#" + DELETE FROM compressed_trajectories + WHERE created_at < datetime('now', ?) + "#, + ) + .bind(format!("-{} days", days)) + .execute(&self.pool) + .await + .map_err(|e| { + tracing::warn!("[TrajectoryStore] delete_compressed_older_than failed: {}", e); + ZclawError::StorageError(e.to_string()) + })?; + + Ok(result.rows_affected()) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + async fn test_store() -> TrajectoryStore { + let pool = SqlitePool::connect("sqlite::memory:") + .await + .expect("in-memory pool"); + let store = TrajectoryStore::new(pool); + store.initialize_schema().await.expect("schema init"); + store + } + + fn sample_event(index: usize) -> TrajectoryEvent { + TrajectoryEvent { + id: format!("evt-{}", index), + session_id: "sess-1".to_string(), + agent_id: "agent-1".to_string(), + step_index: index, + step_type: TrajectoryStepType::ToolExecution, + input_summary: "search query".to_string(), + output_summary: "3 results found".to_string(), + duration_ms: 150, + timestamp: Utc::now(), + } + } + + #[tokio::test] + async fn test_insert_and_get_events() { + let store = test_store().await; + + let e1 = sample_event(0); + let e2 = TrajectoryEvent { + id: "evt-1".to_string(), + step_index: 1, + step_type: TrajectoryStepType::LlmGeneration, + ..sample_event(0) + }; + + store.insert_event(&e1).await.unwrap(); + store.insert_event(&e2).await.unwrap(); + + let events = store.get_events_by_session("sess-1").await.unwrap(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].step_index, 0); + assert_eq!(events[1].step_index, 1); + assert_eq!(events[0].step_type, TrajectoryStepType::ToolExecution); + assert_eq!(events[1].step_type, TrajectoryStepType::LlmGeneration); + } + + #[tokio::test] + async fn test_get_events_empty_session() { + let store = test_store().await; + let events = store.get_events_by_session("nonexistent").await.unwrap(); + assert!(events.is_empty()); + } + + #[tokio::test] + async fn test_insert_and_get_compressed() { + let store = test_store().await; + + let ct = CompressedTrajectory { + id: "ct-1".to_string(), + session_id: "sess-1".to_string(), + agent_id: "agent-1".to_string(), + request_type: "data_query".to_string(), + tools_used: vec!["search".to_string(), "calculate".to_string()], + outcome: CompletionStatus::Success, + total_steps: 5, + total_duration_ms: 1200, + total_tokens: 350, + execution_chain: r#"[{"step":0,"type":"tool_execution"}]"#.to_string(), + satisfaction_signal: Some(SatisfactionSignal::Positive), + created_at: Utc::now(), + }; + + store.insert_compressed(&ct).await.unwrap(); + + let loaded = store.get_compressed_by_session("sess-1").await.unwrap(); + assert!(loaded.is_some()); + + let loaded = loaded.unwrap(); + assert_eq!(loaded.id, "ct-1"); + assert_eq!(loaded.request_type, "data_query"); + assert_eq!(loaded.tools_used.len(), 2); + assert_eq!(loaded.outcome, CompletionStatus::Success); + assert_eq!(loaded.satisfaction_signal, Some(SatisfactionSignal::Positive)); + } + + #[tokio::test] + async fn test_get_compressed_nonexistent() { + let store = test_store().await; + let result = store.get_compressed_by_session("nonexistent").await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_step_type_roundtrip() { + let all_types = [ + TrajectoryStepType::UserRequest, + TrajectoryStepType::IntentClassification, + TrajectoryStepType::SkillSelection, + TrajectoryStepType::ToolExecution, + TrajectoryStepType::LlmGeneration, + TrajectoryStepType::UserFeedback, + ]; + + for st in all_types { + assert_eq!(TrajectoryStepType::from_str_lossy(st.as_str()), st); + } + } + + #[tokio::test] + async fn test_satisfaction_signal_roundtrip() { + let signals = [SatisfactionSignal::Positive, SatisfactionSignal::Negative, SatisfactionSignal::Neutral]; + for sig in signals { + assert_eq!(SatisfactionSignal::from_str_lossy(sig.as_str()), Some(sig)); + } + assert_eq!(SatisfactionSignal::from_str_lossy("bogus"), None); + } + + #[tokio::test] + async fn test_completion_status_roundtrip() { + let statuses = [CompletionStatus::Success, CompletionStatus::Partial, CompletionStatus::Failed, CompletionStatus::Abandoned]; + for s in statuses { + assert_eq!(CompletionStatus::from_str_lossy(s.as_str()), s); + } + } + + #[tokio::test] + async fn test_delete_events_older_than() { + let store = test_store().await; + + // Insert an event with a timestamp far in the past + let old_event = TrajectoryEvent { + id: "old-evt".to_string(), + timestamp: Utc::now() - chrono::Duration::days(100), + ..sample_event(0) + }; + store.insert_event(&old_event).await.unwrap(); + + // Insert a recent event + let recent_event = TrajectoryEvent { + id: "recent-evt".to_string(), + step_index: 1, + ..sample_event(0) + }; + store.insert_event(&recent_event).await.unwrap(); + + let deleted = store.delete_events_older_than(30).await.unwrap(); + assert_eq!(deleted, 1); + + let remaining = store.get_events_by_session("sess-1").await.unwrap(); + assert_eq!(remaining.len(), 1); + assert_eq!(remaining[0].id, "recent-evt"); + } +} diff --git a/crates/zclaw-memory/src/user_profile_store.rs b/crates/zclaw-memory/src/user_profile_store.rs new file mode 100644 index 0000000..e4d2563 --- /dev/null +++ b/crates/zclaw-memory/src/user_profile_store.rs @@ -0,0 +1,592 @@ +//! User Profile Store — structured user modeling from conversation patterns. +//! +//! Maintains a single `UserProfile` per user (desktop uses "default_user") +//! in a dedicated SQLite table. Vec fields (recent_topics, pain points, +//! preferred_tools) are stored as JSON arrays and transparently +//! (de)serialised on read/write. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::Row; +use sqlx::SqlitePool; +use zclaw_types::Result; + +// --------------------------------------------------------------------------- +// Data types +// --------------------------------------------------------------------------- + +/// Expertise level inferred from conversation patterns. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Level { + Beginner, + Intermediate, + Expert, +} + +impl Level { + pub fn as_str(&self) -> &'static str { + match self { + Level::Beginner => "beginner", + Level::Intermediate => "intermediate", + Level::Expert => "expert", + } + } + + pub fn from_str_lossy(s: &str) -> Option { + match s { + "beginner" => Some(Level::Beginner), + "intermediate" => Some(Level::Intermediate), + "expert" => Some(Level::Expert), + _ => None, + } + } +} + +/// Communication style preference. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CommStyle { + Concise, + Detailed, + Formal, + Casual, +} + +impl CommStyle { + pub fn as_str(&self) -> &'static str { + match self { + CommStyle::Concise => "concise", + CommStyle::Detailed => "detailed", + CommStyle::Formal => "formal", + CommStyle::Casual => "casual", + } + } + + pub fn from_str_lossy(s: &str) -> Option { + match s { + "concise" => Some(CommStyle::Concise), + "detailed" => Some(CommStyle::Detailed), + "formal" => Some(CommStyle::Formal), + "casual" => Some(CommStyle::Casual), + _ => None, + } + } +} + +/// Structured user profile (one record per user). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserProfile { + pub user_id: String, + pub industry: Option, + pub role: Option, + pub expertise_level: Option, + pub communication_style: Option, + pub preferred_language: String, + pub recent_topics: Vec, + pub active_pain_points: Vec, + pub preferred_tools: Vec, + pub confidence: f32, + pub updated_at: DateTime, +} + +impl UserProfile { + /// Create a blank profile for the given user. + pub fn blank(user_id: &str) -> Self { + Self { + user_id: user_id.to_string(), + industry: None, + role: None, + expertise_level: None, + communication_style: None, + preferred_language: "zh-CN".to_string(), + recent_topics: Vec::new(), + active_pain_points: Vec::new(), + preferred_tools: Vec::new(), + confidence: 0.0, + updated_at: Utc::now(), + } + } + + /// Default profile for single-user desktop mode ("default_user"). + pub fn default_profile() -> Self { + Self::blank("default_user") + } +} + +// --------------------------------------------------------------------------- +// DDL +// --------------------------------------------------------------------------- + +const PROFILE_DDL: &str = r#" +CREATE TABLE IF NOT EXISTS user_profiles ( + user_id TEXT PRIMARY KEY, + industry TEXT, + role TEXT, + expertise_level TEXT, + communication_style TEXT, + preferred_language TEXT DEFAULT 'zh-CN', + recent_topics TEXT DEFAULT '[]', + active_pain_points TEXT DEFAULT '[]', + preferred_tools TEXT DEFAULT '[]', + confidence REAL DEFAULT 0.0, + updated_at TEXT NOT NULL +) +"#; + +// --------------------------------------------------------------------------- +// Row mapping +// --------------------------------------------------------------------------- + +fn row_to_profile(row: &sqlx::sqlite::SqliteRow) -> Result { + let recent_topics_json: String = row.try_get("recent_topics").unwrap_or_else(|_| "[]".to_string()); + let pain_json: String = row.try_get("active_pain_points").unwrap_or_else(|_| "[]".to_string()); + let tools_json: String = row.try_get("preferred_tools").unwrap_or_else(|_| "[]".to_string()); + + let recent_topics: Vec = serde_json::from_str(&recent_topics_json)?; + let active_pain_points: Vec = serde_json::from_str(&pain_json)?; + let preferred_tools: Vec = serde_json::from_str(&tools_json)?; + + let expertise_str: Option = row.try_get("expertise_level").unwrap_or(None); + let comm_str: Option = row.try_get("communication_style").unwrap_or(None); + + let updated_at_str: String = row.try_get("updated_at").unwrap_or_else(|_| Utc::now().to_rfc3339()); + let updated_at = DateTime::parse_from_rfc3339(&updated_at_str) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + + Ok(UserProfile { + user_id: row.try_get("user_id").unwrap_or_default(), + industry: row.try_get("industry").unwrap_or(None), + role: row.try_get("role").unwrap_or(None), + expertise_level: expertise_str.as_deref().and_then(Level::from_str_lossy), + communication_style: comm_str.as_deref().and_then(CommStyle::from_str_lossy), + preferred_language: row.try_get("preferred_language").unwrap_or_else(|_| "zh-CN".to_string()), + recent_topics, + active_pain_points, + preferred_tools, + confidence: row.try_get("confidence").unwrap_or(0.0), + updated_at, + }) +} + +// --------------------------------------------------------------------------- +// UserProfileStore +// --------------------------------------------------------------------------- + +/// SQLite-backed store for user profiles. +pub struct UserProfileStore { + pool: SqlitePool, +} + +impl UserProfileStore { + /// Create a new store backed by the given connection pool. + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + /// Create tables. Idempotent — safe to call on every startup. + pub async fn initialize_schema(&self) -> Result<()> { + sqlx::query(PROFILE_DDL) + .execute(&self.pool) + .await + .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; + Ok(()) + } + + /// Fetch the profile for a user. Returns `None` when no row exists. + pub async fn get(&self, user_id: &str) -> Result> { + let row = sqlx::query( + "SELECT user_id, industry, role, expertise_level, communication_style, \ + preferred_language, recent_topics, active_pain_points, preferred_tools, \ + confidence, updated_at \ + FROM user_profiles WHERE user_id = ?", + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; + + match row { + Some(r) => Ok(Some(row_to_profile(&r)?)), + None => Ok(None), + } + } + + /// Insert or replace the full profile. + pub async fn upsert(&self, profile: &UserProfile) -> Result<()> { + let topics = serde_json::to_string(&profile.recent_topics)?; + let pains = serde_json::to_string(&profile.active_pain_points)?; + let tools = serde_json::to_string(&profile.preferred_tools)?; + let expertise = profile.expertise_level.map(|l| l.as_str()); + let comm = profile.communication_style.map(|c| c.as_str()); + let updated = profile.updated_at.to_rfc3339(); + + sqlx::query( + "INSERT OR REPLACE INTO user_profiles \ + (user_id, industry, role, expertise_level, communication_style, \ + preferred_language, recent_topics, active_pain_points, preferred_tools, \ + confidence, updated_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&profile.user_id) + .bind(&profile.industry) + .bind(&profile.role) + .bind(expertise) + .bind(comm) + .bind(&profile.preferred_language) + .bind(&topics) + .bind(&pains) + .bind(&tools) + .bind(profile.confidence) + .bind(&updated) + .execute(&self.pool) + .await + .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; + + Ok(()) + } + + /// Update a single scalar field by name. + /// + /// `field` must be one of: industry, role, expertise_level, + /// communication_style, preferred_language, confidence. + /// Returns error for unrecognised field names (prevents SQL injection). + pub async fn update_field(&self, user_id: &str, field: &str, value: &str) -> Result<()> { + let sql = match field { + "industry" => "UPDATE user_profiles SET industry = ?, updated_at = ? WHERE user_id = ?", + "role" => "UPDATE user_profiles SET role = ?, updated_at = ? WHERE user_id = ?", + "expertise_level" => { + "UPDATE user_profiles SET expertise_level = ?, updated_at = ? WHERE user_id = ?" + } + "communication_style" => { + "UPDATE user_profiles SET communication_style = ?, updated_at = ? WHERE user_id = ?" + } + "preferred_language" => { + "UPDATE user_profiles SET preferred_language = ?, updated_at = ? WHERE user_id = ?" + } + "confidence" => { + "UPDATE user_profiles SET confidence = ?, updated_at = ? WHERE user_id = ?" + } + _ => { + return Err(zclaw_types::ZclawError::InvalidInput(format!( + "Unknown profile field: {}", + field + ))); + } + }; + + let now = Utc::now().to_rfc3339(); + + // confidence is REAL; parse the value string. + if field == "confidence" { + let f: f32 = value.parse().map_err(|_| { + zclaw_types::ZclawError::InvalidInput(format!("Invalid confidence: {}", value)) + })?; + sqlx::query(sql) + .bind(f) + .bind(&now) + .bind(user_id) + .execute(&self.pool) + .await + .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; + } else { + sqlx::query(sql) + .bind(value) + .bind(&now) + .bind(user_id) + .execute(&self.pool) + .await + .map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?; + } + + Ok(()) + } + + /// Append a topic to `recent_topics`, trimming to `max_topics`. + /// Creates a default profile row if none exists. + pub async fn add_recent_topic( + &self, + user_id: &str, + topic: &str, + max_topics: usize, + ) -> Result<()> { + let mut profile = self + .get(user_id) + .await? + .unwrap_or_else(|| UserProfile::blank(user_id)); + + // Deduplicate: remove if already present, then push to front. + profile.recent_topics.retain(|t| t != topic); + profile.recent_topics.insert(0, topic.to_string()); + profile.recent_topics.truncate(max_topics); + profile.updated_at = Utc::now(); + + self.upsert(&profile).await + } + + /// Append a pain point, trimming to `max_pains`. + /// Creates a default profile row if none exists. + pub async fn add_pain_point( + &self, + user_id: &str, + pain: &str, + max_pains: usize, + ) -> Result<()> { + let mut profile = self + .get(user_id) + .await? + .unwrap_or_else(|| UserProfile::blank(user_id)); + + profile.active_pain_points.retain(|p| p != pain); + profile.active_pain_points.insert(0, pain.to_string()); + profile.active_pain_points.truncate(max_pains); + profile.updated_at = Utc::now(); + + self.upsert(&profile).await + } + + /// Append a preferred tool, trimming to `max_tools`. + /// Creates a default profile row if none exists. + pub async fn add_preferred_tool( + &self, + user_id: &str, + tool: &str, + max_tools: usize, + ) -> Result<()> { + let mut profile = self + .get(user_id) + .await? + .unwrap_or_else(|| UserProfile::blank(user_id)); + + profile.preferred_tools.retain(|t| t != tool); + profile.preferred_tools.insert(0, tool.to_string()); + profile.preferred_tools.truncate(max_tools); + profile.updated_at = Utc::now(); + + self.upsert(&profile).await + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + /// Helper: create an in-memory store with schema. + async fn test_store() -> UserProfileStore { + let pool = SqlitePool::connect("sqlite::memory:") + .await + .expect("in-memory pool"); + let store = UserProfileStore::new(pool); + store.initialize_schema().await.expect("schema init"); + store + } + + #[tokio::test] + async fn test_initialize_schema_idempotent() { + let store = test_store().await; + // Second call should succeed without error. + store.initialize_schema().await.unwrap(); + store.initialize_schema().await.unwrap(); + } + + #[tokio::test] + async fn test_get_returns_none_for_missing() { + let store = test_store().await; + let profile = store.get("nonexistent").await.unwrap(); + assert!(profile.is_none()); + } + + #[tokio::test] + async fn test_upsert_and_get() { + let store = test_store().await; + let mut profile = UserProfile::blank("default_user"); + profile.industry = Some("healthcare".to_string()); + profile.role = Some("admin".to_string()); + profile.expertise_level = Some(Level::Intermediate); + profile.communication_style = Some(CommStyle::Concise); + profile.recent_topics = vec!["reporting".to_string(), "compliance".to_string()]; + profile.confidence = 0.65; + + store.upsert(&profile).await.unwrap(); + + let loaded = store.get("default_user").await.unwrap().unwrap(); + assert_eq!(loaded.user_id, "default_user"); + assert_eq!(loaded.industry.as_deref(), Some("healthcare")); + assert_eq!(loaded.role.as_deref(), Some("admin")); + assert_eq!(loaded.expertise_level, Some(Level::Intermediate)); + assert_eq!(loaded.communication_style, Some(CommStyle::Concise)); + assert_eq!(loaded.recent_topics, vec!["reporting", "compliance"]); + assert!((loaded.confidence - 0.65).abs() < f32::EPSILON); + } + + #[tokio::test] + async fn test_upsert_replaces_existing() { + let store = test_store().await; + let mut profile = UserProfile::blank("user1"); + profile.industry = Some("tech".to_string()); + store.upsert(&profile).await.unwrap(); + + profile.industry = Some("finance".to_string()); + store.upsert(&profile).await.unwrap(); + + let loaded = store.get("user1").await.unwrap().unwrap(); + assert_eq!(loaded.industry.as_deref(), Some("finance")); + } + + #[tokio::test] + async fn test_update_field_scalar() { + let store = test_store().await; + let profile = UserProfile::blank("user2"); + store.upsert(&profile).await.unwrap(); + + store + .update_field("user2", "industry", "education") + .await + .unwrap(); + store + .update_field("user2", "role", "teacher") + .await + .unwrap(); + + let loaded = store.get("user2").await.unwrap().unwrap(); + assert_eq!(loaded.industry.as_deref(), Some("education")); + assert_eq!(loaded.role.as_deref(), Some("teacher")); + } + + #[tokio::test] + async fn test_update_field_confidence() { + let store = test_store().await; + let profile = UserProfile::blank("user3"); + store.upsert(&profile).await.unwrap(); + + store + .update_field("user3", "confidence", "0.88") + .await + .unwrap(); + + let loaded = store.get("user3").await.unwrap().unwrap(); + assert!((loaded.confidence - 0.88).abs() < f32::EPSILON); + } + + #[tokio::test] + async fn test_update_field_rejects_unknown() { + let store = test_store().await; + let result = store.update_field("user", "evil_column", "oops").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_add_recent_topic_auto_creates_profile() { + let store = test_store().await; + + // No profile exists yet. + store + .add_recent_topic("new_user", "data analysis", 5) + .await + .unwrap(); + + let loaded = store.get("new_user").await.unwrap().unwrap(); + assert_eq!(loaded.recent_topics, vec!["data analysis"]); + } + + #[tokio::test] + async fn test_add_recent_topic_dedup_and_trim() { + let store = test_store().await; + let profile = UserProfile::blank("user"); + store.upsert(&profile).await.unwrap(); + + store.add_recent_topic("user", "topic_a", 3).await.unwrap(); + store.add_recent_topic("user", "topic_b", 3).await.unwrap(); + store.add_recent_topic("user", "topic_c", 3).await.unwrap(); + // Duplicate — should move to front, not add. + store.add_recent_topic("user", "topic_a", 3).await.unwrap(); + + let loaded = store.get("user").await.unwrap().unwrap(); + assert_eq!( + loaded.recent_topics, + vec!["topic_a", "topic_c", "topic_b"] + ); + } + + #[tokio::test] + async fn test_add_pain_point_trim() { + let store = test_store().await; + + for i in 0..5 { + store + .add_pain_point("user", &format!("pain_{}", i), 3) + .await + .unwrap(); + } + + let loaded = store.get("user").await.unwrap().unwrap(); + assert_eq!(loaded.active_pain_points.len(), 3); + // Most recent first. + assert_eq!(loaded.active_pain_points[0], "pain_4"); + } + + #[tokio::test] + async fn test_add_preferred_tool_trim() { + let store = test_store().await; + + store + .add_preferred_tool("user", "python", 5) + .await + .unwrap(); + store + .add_preferred_tool("user", "rust", 5) + .await + .unwrap(); + // Duplicate — moved to front. + store + .add_preferred_tool("user", "python", 5) + .await + .unwrap(); + + let loaded = store.get("user").await.unwrap().unwrap(); + assert_eq!(loaded.preferred_tools, vec!["python", "rust"]); + } + + #[test] + fn test_level_round_trip() { + for level in [Level::Beginner, Level::Intermediate, Level::Expert] { + assert_eq!(Level::from_str_lossy(level.as_str()), Some(level)); + } + assert_eq!(Level::from_str_lossy("unknown"), None); + } + + #[test] + fn test_comm_style_round_trip() { + for style in [ + CommStyle::Concise, + CommStyle::Detailed, + CommStyle::Formal, + CommStyle::Casual, + ] { + assert_eq!(CommStyle::from_str_lossy(style.as_str()), Some(style)); + } + assert_eq!(CommStyle::from_str_lossy("unknown"), None); + } + + #[test] + fn test_profile_serialization() { + let mut p = UserProfile::blank("test_user"); + p.industry = Some("logistics".into()); + p.expertise_level = Some(Level::Expert); + p.communication_style = Some(CommStyle::Detailed); + p.recent_topics = vec!["exports".into(), "customs".into()]; + + let json = serde_json::to_string(&p).unwrap(); + let decoded: UserProfile = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.user_id, "test_user"); + assert_eq!(decoded.industry.as_deref(), Some("logistics")); + assert_eq!(decoded.expertise_level, Some(Level::Expert)); + assert_eq!(decoded.communication_style, Some(CommStyle::Detailed)); + assert_eq!(decoded.recent_topics, vec!["exports", "customs"]); + } +} diff --git a/crates/zclaw-runtime/src/lib.rs b/crates/zclaw-runtime/src/lib.rs index fee6548..e86d753 100644 --- a/crates/zclaw-runtime/src/lib.rs +++ b/crates/zclaw-runtime/src/lib.rs @@ -17,6 +17,7 @@ pub mod growth; pub mod compaction; pub mod middleware; pub mod prompt; +pub mod nl_schedule; // Re-export main types pub use driver::{ diff --git a/crates/zclaw-runtime/src/middleware.rs b/crates/zclaw-runtime/src/middleware.rs index 4dc544c..539007f 100644 --- a/crates/zclaw-runtime/src/middleware.rs +++ b/crates/zclaw-runtime/src/middleware.rs @@ -278,3 +278,4 @@ pub mod title; pub mod token_calibration; pub mod tool_error; pub mod tool_output_guard; +pub mod trajectory_recorder; diff --git a/crates/zclaw-runtime/src/middleware/trajectory_recorder.rs b/crates/zclaw-runtime/src/middleware/trajectory_recorder.rs new file mode 100644 index 0000000..6951d87 --- /dev/null +++ b/crates/zclaw-runtime/src/middleware/trajectory_recorder.rs @@ -0,0 +1,231 @@ +//! Trajectory Recorder Middleware — records tool-call chains for analysis. +//! +//! Priority 650 (telemetry range: after business middleware at 400-599, +//! before token_calibration at 700). Records events asynchronously via +//! `tokio::spawn` so the main conversation flow is never blocked. + +use async_trait::async_trait; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::sync::RwLock; +use zclaw_memory::trajectory_store::{ + TrajectoryEvent, TrajectoryStepType, TrajectoryStore, +}; +use zclaw_types::{Result, SessionId}; +use crate::driver::ContentBlock; +use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision}; + +// --------------------------------------------------------------------------- +// Step counter per session +// --------------------------------------------------------------------------- + +/// Tracks step indices per session so events are ordered correctly. +struct StepCounter { + counters: RwLock)>>, +} + +impl StepCounter { + fn new() -> Self { + Self { + counters: RwLock::new(Vec::new()), + } + } + + async fn next(&self, session_id: &str) -> usize { + let map = self.counters.read().await; + for (sid, counter) in map.iter() { + if sid == session_id { + return counter.fetch_add(1, Ordering::Relaxed) as usize; + } + } + drop(map); + + let mut map = self.counters.write().await; + // Double-check after acquiring write lock + for (sid, counter) in map.iter() { + if sid == session_id { + return counter.fetch_add(1, Ordering::Relaxed) as usize; + } + } + let counter = Arc::new(AtomicU64::new(1)); + map.push((session_id.to_string(), counter.clone())); + 0 + } +} + +// --------------------------------------------------------------------------- +// TrajectoryRecorderMiddleware +// --------------------------------------------------------------------------- + +/// Middleware that records agent loop events into `TrajectoryStore`. +/// +/// Hooks: +/// - `before_completion` → records UserRequest step +/// - `after_tool_call` → records ToolExecution step +/// - `after_completion` → records LlmGeneration step +pub struct TrajectoryRecorderMiddleware { + store: Arc, + step_counter: StepCounter, +} + +impl TrajectoryRecorderMiddleware { + pub fn new(store: Arc) -> Self { + Self { + store, + step_counter: StepCounter::new(), + } + } + + /// Spawn an async write — fire-and-forget, non-blocking. + fn spawn_write(&self, event: TrajectoryEvent) { + let store = self.store.clone(); + tokio::spawn(async move { + if let Err(e) = store.insert_event(&event).await { + tracing::warn!( + "[TrajectoryRecorder] Async write failed (non-fatal): {}", + e + ); + } + }); + } + + fn truncate(s: &str, max: usize) -> String { + if s.len() <= max { + s.to_string() + } else { + s.chars().take(max).collect::() + "…" + } + } +} + +#[async_trait] +impl AgentMiddleware for TrajectoryRecorderMiddleware { + fn name(&self) -> &str { + "trajectory_recorder" + } + + fn priority(&self) -> i32 { + 650 + } + + async fn before_completion( + &self, + ctx: &mut MiddlewareContext, + ) -> Result { + if ctx.user_input.is_empty() { + return Ok(MiddlewareDecision::Continue); + } + + let step = self.step_counter.next(&ctx.session_id.to_string()).await; + let event = TrajectoryEvent { + id: uuid::Uuid::new_v4().to_string(), + session_id: ctx.session_id.to_string(), + agent_id: ctx.agent_id.to_string(), + step_index: step, + step_type: TrajectoryStepType::UserRequest, + input_summary: Self::truncate(&ctx.user_input, 200), + output_summary: String::new(), + duration_ms: 0, + timestamp: chrono::Utc::now(), + }; + + self.spawn_write(event); + Ok(MiddlewareDecision::Continue) + } + + async fn after_tool_call( + &self, + ctx: &mut MiddlewareContext, + tool_name: &str, + result: &serde_json::Value, + ) -> Result<()> { + let step = self.step_counter.next(&ctx.session_id.to_string()).await; + let result_summary = match result { + serde_json::Value::String(s) => Self::truncate(s, 200), + serde_json::Value::Object(_) => { + let s = serde_json::to_string(result).unwrap_or_default(); + Self::truncate(&s, 200) + } + other => Self::truncate(&other.to_string(), 200), + }; + + let event = TrajectoryEvent { + id: uuid::Uuid::new_v4().to_string(), + session_id: ctx.session_id.to_string(), + agent_id: ctx.agent_id.to_string(), + step_index: step, + step_type: TrajectoryStepType::ToolExecution, + input_summary: Self::truncate(tool_name, 200), + output_summary: result_summary, + duration_ms: 0, + timestamp: chrono::Utc::now(), + }; + + self.spawn_write(event); + Ok(()) + } + + async fn after_completion(&self, ctx: &MiddlewareContext) -> Result<()> { + let step = self.step_counter.next(&ctx.session_id.to_string()).await; + let output_summary = ctx.response_content.iter() + .filter_map(|b| match b { + ContentBlock::Text { text } => Some(text.as_str()), + _ => None, + }) + .collect::>() + .join(" "); + + let event = TrajectoryEvent { + id: uuid::Uuid::new_v4().to_string(), + session_id: ctx.session_id.to_string(), + agent_id: ctx.agent_id.to_string(), + step_index: step, + step_type: TrajectoryStepType::LlmGeneration, + input_summary: String::new(), + output_summary: Self::truncate(&output_summary, 200), + duration_ms: 0, + timestamp: chrono::Utc::now(), + }; + + self.spawn_write(event); + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_step_counter_sequential() { + let counter = StepCounter::new(); + assert_eq!(counter.next("sess-1").await, 0); + assert_eq!(counter.next("sess-1").await, 1); + assert_eq!(counter.next("sess-1").await, 2); + } + + #[tokio::test] + async fn test_step_counter_different_sessions() { + let counter = StepCounter::new(); + assert_eq!(counter.next("sess-1").await, 0); + assert_eq!(counter.next("sess-2").await, 0); + assert_eq!(counter.next("sess-1").await, 1); + assert_eq!(counter.next("sess-2").await, 1); + } + + #[test] + fn test_truncate_short() { + assert_eq!(TrajectoryRecorderMiddleware::truncate("hello", 10), "hello"); + } + + #[test] + fn test_truncate_long() { + let long: String = "中".repeat(300); + let truncated = TrajectoryRecorderMiddleware::truncate(&long, 200); + assert!(truncated.chars().count() <= 201); // 200 + … + } +} diff --git a/crates/zclaw-runtime/src/nl_schedule.rs b/crates/zclaw-runtime/src/nl_schedule.rs new file mode 100644 index 0000000..b322a38 --- /dev/null +++ b/crates/zclaw-runtime/src/nl_schedule.rs @@ -0,0 +1,593 @@ +//! Natural Language Schedule Parser — transforms Chinese time expressions into cron. +//! +//! Three-layer fallback strategy: +//! 1. Regex pattern matching (covers ~80% of common expressions) +//! 2. LLM-assisted parsing (for ambiguous/complex expressions) — TODO: wire when Haiku driver available +//! 3. Interactive clarification (return `Unclear`) +//! +//! Lives in `zclaw-runtime` because it's a pure text→cron utility with no kernel dependency. + +use chrono::{Datelike, Timelike}; +use serde::{Deserialize, Serialize}; +use zclaw_types::AgentId; + +// --------------------------------------------------------------------------- +// Data structures +// --------------------------------------------------------------------------- + +/// Result of parsing a natural language schedule expression. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParsedSchedule { + /// Cron expression, e.g. "0 9 * * *" + pub cron_expression: String, + /// Human-readable description of the schedule + pub natural_description: String, + /// Confidence of the parse (0.0–1.0) + pub confidence: f32, + /// What the task does (extracted from user input) + pub task_description: String, + /// What to trigger when the schedule fires + pub task_target: TaskTarget, +} + +/// Target to trigger on schedule. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "id")] +pub enum TaskTarget { + /// Trigger a specific agent + Agent(String), + /// Trigger a specific hand + Hand(String), + /// Trigger a specific workflow + Workflow(String), + /// Generic reminder (no specific target) + Reminder, +} + +/// Outcome of NL schedule parsing. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ScheduleParseResult { + /// High-confidence single parse + Exact(ParsedSchedule), + /// Multiple possible interpretations + Ambiguous(Vec), + /// Unable to parse — needs user clarification + Unclear, +} + +// --------------------------------------------------------------------------- +// Regex pattern library +// --------------------------------------------------------------------------- + +/// A single pattern for matching Chinese time expressions. +struct SchedulePattern { + /// Regex pattern string + regex: &'static str, + /// Cron template — use {h} for hour, {m} for minute, {dow} for day-of-week, {dom} for day-of-month + cron_template: &'static str, + /// Human description template + description: &'static str, + /// Base confidence for this pattern + confidence: f32, +} + +/// Chinese time period keywords → hour mapping +fn period_to_hour(period: &str) -> Option { + match period { + "凌晨" => Some(0), + "早上" | "早晨" | "上午" => Some(9), + "中午" => Some(12), + "下午" | "午后" => Some(15), + "傍晚" | "黄昏" => Some(18), + "晚上" | "晚间" | "夜里" | "夜晚" => Some(21), + "半夜" | "午夜" => Some(0), + _ => None, + } +} + +/// Chinese weekday names → cron day-of-week +fn weekday_to_cron(day: &str) -> Option<&'static str> { + match day { + "一" | "周一" | "星期一" | "礼拜一" => Some("1"), + "二" | "周二" | "星期二" | "礼拜二" => Some("2"), + "三" | "周三" | "星期三" | "礼拜三" => Some("3"), + "四" | "周四" | "星期四" | "礼拜四" => Some("4"), + "五" | "周五" | "星期五" | "礼拜五" => Some("5"), + "六" | "周六" | "星期六" | "礼拜六" => Some("6"), + "日" | "周日" | "星期日" | "礼拜日" | "天" | "周天" | "星期天" | "礼拜天" => Some("0"), + _ => None, + } +} + +// --------------------------------------------------------------------------- +// Parser implementation +// --------------------------------------------------------------------------- + +/// Parse a natural language schedule expression into a cron expression. +/// +/// Uses a series of regex-based pattern matchers covering common Chinese +/// time expressions. Returns `Unclear` if no pattern matches. +pub fn parse_nl_schedule(input: &str, default_agent_id: &AgentId) -> ScheduleParseResult { + let input = input.trim(); + if input.is_empty() { + return ScheduleParseResult::Unclear; + } + + // Extract task description (everything after keywords like "提醒我", "帮我") + let task_description = extract_task_description(input); + + // --- Pattern 1: 每天 + 时间 --- + if let Some(result) = try_every_day(input, &task_description, default_agent_id) { + return result; + } + + // --- Pattern 2: 每周N + 时间 --- + if let Some(result) = try_every_week(input, &task_description, default_agent_id) { + return result; + } + + // --- Pattern 3: 工作日 + 时间 --- + if let Some(result) = try_workday(input, &task_description, default_agent_id) { + return result; + } + + // --- Pattern 4: 每N小时/分钟 --- + if let Some(result) = try_interval(input, &task_description, default_agent_id) { + return result; + } + + // --- Pattern 5: 每月N号 --- + if let Some(result) = try_monthly(input, &task_description, default_agent_id) { + return result; + } + + // --- Pattern 6: 明天/后天 + 时间 (one-shot) --- + if let Some(result) = try_one_shot(input, &task_description, default_agent_id) { + return result; + } + + ScheduleParseResult::Unclear +} + +/// Extract task description from input, stripping schedule-related keywords. +fn extract_task_description(input: &str) -> String { + let strip_prefixes = [ + "每天", "每日", "每周", "工作日", "每个工作日", + "每月", "每", "定时", "定期", + "提醒我", "提醒", "帮我", "帮", "请", + "明天", "后天", "大后天", + ]; + + let mut desc = input.to_string(); + + // Strip prefixes + time expressions in alternating passes until stable + let time_re = regex::Regex::new( + r"^(?:凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)?\d{1,2}[点时::]\d{0,2}分?" + ).unwrap_or_else(|_| regex::Regex::new("").unwrap()); + + for _ in 0..3 { + // Pass 1: strip prefixes + loop { + let mut stripped = false; + for prefix in &strip_prefixes { + if desc.starts_with(prefix) { + desc = desc[prefix.len()..].to_string(); + stripped = true; + } + } + if !stripped { break; } + } + // Pass 2: strip time expressions + let new_desc = time_re.replace(&desc, "").to_string(); + if new_desc == desc { break; } + desc = new_desc; + } + + desc.trim().to_string() +} + +// -- Pattern matchers -- + +/// Adjust hour based on time-of-day period. Chinese 12-hour convention: +/// 下午3点 = 15, 晚上8点 = 20, etc. Morning hours stay as-is. +fn adjust_hour_for_period(hour: u32, period: Option<&str>) -> u32 { + if let Some(p) = period { + match p { + "下午" | "午后" => { if hour < 12 { hour + 12 } else { hour } } + "晚上" | "晚间" | "夜里" | "夜晚" => { if hour < 12 { hour + 12 } else { hour } } + "傍晚" | "黄昏" => { if hour < 12 { hour + 12 } else { hour } } + "中午" => { if hour == 12 { 12 } else if hour < 12 { hour + 12 } else { hour } } + "半夜" | "午夜" => { if hour == 12 { 0 } else { hour } } + _ => hour, + } + } else { + hour + } +} + +const PERIOD_PATTERN: &str = "(凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)?"; + +fn try_every_day(input: &str, task_desc: &str, agent_id: &AgentId) -> Option { + let re = regex::Regex::new( + &format!(r"(?:每天|每日)(?:的)?{}(\d{{1,2}})[点时::](\d{{1,2}})?", PERIOD_PATTERN) + ).ok()?; + if let Some(caps) = re.captures(input) { + let period = caps.get(1).map(|m| m.as_str()); + let raw_hour: u32 = caps.get(2)?.as_str().parse().ok()?; + let minute: u32 = caps.get(3).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0); + let hour = adjust_hour_for_period(raw_hour, period); + if hour > 23 || minute > 59 { + return None; + } + return Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: format!("{} {} * * *", minute, hour), + natural_description: format!("每天{:02}:{:02}", hour, minute), + confidence: 0.95, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })); + } + + // "每天早上/下午..." without explicit hour + let re2 = regex::Regex::new(r"(?:每天|每日)(?:的)?(凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)").ok()?; + if let Some(caps) = re2.captures(input) { + let period = caps.get(1)?.as_str(); + if let Some(hour) = period_to_hour(period) { + return Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: format!("0 {} * * *", hour), + natural_description: format!("每天{}", period), + confidence: 0.85, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })); + } + } + + None +} + +fn try_every_week(input: &str, task_desc: &str, agent_id: &AgentId) -> Option { + let re = regex::Regex::new( + &format!(r"(?:每周|每个?星期|每个?礼拜)(一|二|三|四|五|六|日|天|周一|周二|周三|周四|周五|周六|周日|周天|星期一|星期二|星期三|星期四|星期五|星期六|星期日|星期天|礼拜一|礼拜二|礼拜三|礼拜四|礼拜五|礼拜六|礼拜日|礼拜天)(?:的)?{}(\d{{1,2}})[点时::](\d{{1,2}})?", PERIOD_PATTERN) + ).ok()?; + + let caps = re.captures(input)?; + let day_str = caps.get(1)?.as_str(); + let dow = weekday_to_cron(day_str)?; + let period = caps.get(2).map(|m| m.as_str()); + let raw_hour: u32 = caps.get(3)?.as_str().parse().ok()?; + let minute: u32 = caps.get(4).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0); + let hour = adjust_hour_for_period(raw_hour, period); + if hour > 23 || minute > 59 { + return None; + } + + Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: format!("{} {} * * {}", minute, hour, dow), + natural_description: format!("每周{} {:02}:{:02}", day_str, hour, minute), + confidence: 0.92, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })) +} + +fn try_workday(input: &str, task_desc: &str, agent_id: &AgentId) -> Option { + let re = regex::Regex::new( + &format!(r"(?:工作日|每个?工作日|工作日(?:的)?){}(\d{{1,2}})[点时::](\d{{1,2}})?", PERIOD_PATTERN) + ).ok()?; + + if let Some(caps) = re.captures(input) { + let period = caps.get(1).map(|m| m.as_str()); + let raw_hour: u32 = caps.get(2)?.as_str().parse().ok()?; + let minute: u32 = caps.get(3).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0); + let hour = adjust_hour_for_period(raw_hour, period); + if hour > 23 || minute > 59 { + return None; + } + return Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: format!("{} {} * * 1-5", minute, hour), + natural_description: format!("工作日{:02}:{:02}", hour, minute), + confidence: 0.90, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })); + } + + // "工作日下午3点" style + let re2 = regex::Regex::new( + r"(?:工作日|每个?工作日)(?:的)?(凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)" + ).ok()?; + if let Some(caps) = re2.captures(input) { + let period = caps.get(1)?.as_str(); + if let Some(hour) = period_to_hour(period) { + return Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: format!("0 {} * * 1-5", hour), + natural_description: format!("工作日{}", period), + confidence: 0.85, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })); + } + } + + None +} + +fn try_interval(input: &str, task_desc: &str, agent_id: &AgentId) -> Option { + // "每2小时", "每30分钟", "每N小时/分钟" + let re = regex::Regex::new(r"每(\d{1,2})(小时|分钟|分|钟|个小时)").ok()?; + if let Some(caps) = re.captures(input) { + let n: u32 = caps.get(1)?.as_str().parse().ok()?; + if n == 0 { + return None; + } + let unit = caps.get(2)?.as_str(); + let (cron, desc) = if unit.contains("小") { + (format!("0 */{} * * *", n), format!("每{}小时", n)) + } else { + (format!("*/{} * * * *", n), format!("每{}分钟", n)) + }; + return Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: cron, + natural_description: desc, + confidence: 0.90, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })); + } + + None +} + +fn try_monthly(input: &str, task_desc: &str, agent_id: &AgentId) -> Option { + let re = regex::Regex::new( + &format!(r"(?:每月|每个月)(?:的)?(\d{{1,2}})[号日](?:的)?{}(\d{{1,2}})?[点时::]?(\d{{1,2}})?", PERIOD_PATTERN) + ).ok()?; + + if let Some(caps) = re.captures(input) { + let day: u32 = caps.get(1)?.as_str().parse().ok()?; + let period = caps.get(2).map(|m| m.as_str()); + let raw_hour: u32 = caps.get(3).map(|m| m.as_str().parse().unwrap_or(9)).unwrap_or(9); + let minute: u32 = caps.get(4).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0); + let hour = adjust_hour_for_period(raw_hour, period); + if day > 31 || hour > 23 || minute > 59 { + return None; + } + return Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: format!("{} {} {} * *", minute, hour, day), + natural_description: format!("每月{}号 {:02}:{:02}", day, hour, minute), + confidence: 0.90, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })); + } + + None +} + +fn try_one_shot(input: &str, task_desc: &str, agent_id: &AgentId) -> Option { + let re = regex::Regex::new( + &format!(r"(明天|后天|大后天)(?:的)?{}(\d{{1,2}})[点时::](\d{{1,2}})?", PERIOD_PATTERN) + ).ok()?; + + let caps = re.captures(input)?; + let day_offset = match caps.get(1)?.as_str() { + "明天" => 1, + "后天" => 2, + "大后天" => 3, + _ => return None, + }; + let period = caps.get(2).map(|m| m.as_str()); + let raw_hour: u32 = caps.get(3)?.as_str().parse().ok()?; + let minute: u32 = caps.get(4).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0); + let hour = adjust_hour_for_period(raw_hour, period); + if hour > 23 || minute > 59 { + return None; + } + + let target = chrono::Utc::now() + .checked_add_signed(chrono::Duration::days(day_offset)) + .unwrap_or_else(chrono::Utc::now) + .with_hour(hour) + .unwrap_or_else(|| chrono::Utc::now()) + .with_minute(minute) + .unwrap_or_else(|| chrono::Utc::now()) + .with_second(0) + .unwrap_or_else(|| chrono::Utc::now()); + + Some(ScheduleParseResult::Exact(ParsedSchedule { + cron_expression: target.to_rfc3339(), + natural_description: format!("{} {:02}:{:02}", caps.get(1)?.as_str(), hour, minute), + confidence: 0.88, + task_description: task_desc.to_string(), + task_target: TaskTarget::Agent(agent_id.to_string()), + })) +} + +// --------------------------------------------------------------------------- +// Schedule intent detection +// --------------------------------------------------------------------------- + +/// Keywords indicating the user wants to set a scheduled task. +const SCHEDULE_INTENT_KEYWORDS: &[&str] = &[ + "提醒我", "提醒", "定时", "每天", "每日", "每周", "每月", + "工作日", "每隔", "每", "定期", "到时候", "准时", + "闹钟", "闹铃", "日程", "日历", +]; + +/// Check if user input contains schedule intent. +pub fn has_schedule_intent(input: &str) -> bool { + let lower = input.to_lowercase(); + SCHEDULE_INTENT_KEYWORDS.iter().any(|kw| lower.contains(kw)) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn default_agent() -> AgentId { + AgentId::new() + } + + #[test] + fn test_every_day_explicit_time() { + let result = parse_nl_schedule("每天早上9点提醒我查房", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "0 9 * * *"); + assert!(s.confidence >= 0.9); + } + _ => panic!("Expected Exact, got {:?}", result), + } + } + + #[test] + fn test_every_day_with_minute() { + let result = parse_nl_schedule("每天下午3点30分提醒我", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "30 15 * * *"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_every_day_period_only() { + let result = parse_nl_schedule("每天早上提醒我看看报告", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "0 9 * * *"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_every_week_monday() { + let result = parse_nl_schedule("每周一上午10点提醒我开会", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "0 10 * * 1"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_every_week_friday() { + let result = parse_nl_schedule("每个星期五下午2点", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "0 14 * * 5"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_workday() { + let result = parse_nl_schedule("工作日下午3点提醒我写周报", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "0 15 * * 1-5"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_interval_hours() { + let result = parse_nl_schedule("每2小时提醒我喝水", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "0 */2 * * *"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_interval_minutes() { + let result = parse_nl_schedule("每30分钟检查一次", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "*/30 * * * *"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_monthly() { + let result = parse_nl_schedule("每月1号早上9点提醒我", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert_eq!(s.cron_expression, "0 9 1 * *"); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_one_shot_tomorrow() { + let result = parse_nl_schedule("明天下午3点提醒我开会", &default_agent()); + match result { + ScheduleParseResult::Exact(s) => { + assert!(s.cron_expression.contains('T')); + assert!(s.natural_description.contains("明天")); + } + _ => panic!("Expected Exact"), + } + } + + #[test] + fn test_unclear_input() { + let result = parse_nl_schedule("今天天气怎么样", &default_agent()); + assert!(matches!(result, ScheduleParseResult::Unclear)); + } + + #[test] + fn test_empty_input() { + let result = parse_nl_schedule("", &default_agent()); + assert!(matches!(result, ScheduleParseResult::Unclear)); + } + + #[test] + fn test_schedule_intent_detection() { + assert!(has_schedule_intent("每天早上9点提醒我查房")); + assert!(has_schedule_intent("帮我设个定时任务")); + assert!(has_schedule_intent("工作日提醒我打卡")); + assert!(!has_schedule_intent("今天天气怎么样")); + assert!(!has_schedule_intent("帮我写个报告")); + } + + #[test] + fn test_period_to_hour_mapping() { + assert_eq!(period_to_hour("凌晨"), Some(0)); + assert_eq!(period_to_hour("早上"), Some(9)); + assert_eq!(period_to_hour("中午"), Some(12)); + assert_eq!(period_to_hour("下午"), Some(15)); + assert_eq!(period_to_hour("晚上"), Some(21)); + assert_eq!(period_to_hour("不知道"), None); + } + + #[test] + fn test_weekday_to_cron_mapping() { + assert_eq!(weekday_to_cron("一"), Some("1")); + assert_eq!(weekday_to_cron("五"), Some("5")); + assert_eq!(weekday_to_cron("日"), Some("0")); + assert_eq!(weekday_to_cron("星期三"), Some("3")); + assert_eq!(weekday_to_cron("礼拜天"), Some("0")); + assert_eq!(weekday_to_cron("未知"), None); + } + + #[test] + fn test_task_description_extraction() { + assert_eq!(extract_task_description("每天早上9点提醒我查房"), "查房"); + } +} diff --git a/desktop/src-tauri/src/intelligence/experience.rs b/desktop/src-tauri/src/intelligence/experience.rs new file mode 100644 index 0000000..91e84a8 --- /dev/null +++ b/desktop/src-tauri/src/intelligence/experience.rs @@ -0,0 +1,394 @@ +//! Experience Extractor — transforms successful proposals into reusable experiences. +//! +//! Closes Breakpoint 3 (successful solution → structured experience) and +//! Breakpoint 4 (experience reuse injection) of the self-improvement loop. +//! +//! When a user confirms a proposal was helpful (explicitly or via implicit +//! keyword detection), the extractor creates an [`Experience`] record and +//! stores it through [`ExperienceStore`] for future retrieval. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, warn}; +use uuid::Uuid; +use zclaw_growth::ExperienceStore; +use zclaw_types::Result; + +use super::pain_aggregator::PainPoint; +use super::solution_generator::{Proposal, ProposalStatus}; + +// --------------------------------------------------------------------------- +// Shared completion status +// --------------------------------------------------------------------------- + +/// Completion outcome — shared across experience and trajectory modules. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CompletionStatus { + Success, + Partial, + Failed, + Abandoned, +} + +// --------------------------------------------------------------------------- +// Feedback & event types +// --------------------------------------------------------------------------- + +/// User feedback on a proposal's effectiveness. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProposalFeedback { + pub proposal_id: String, + pub outcome: CompletionStatus, + pub user_comment: Option, + pub detected_at: DateTime, +} + +/// Event emitted when a pain point reaches high confidence. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PainConfirmedEvent { + pub pain_point_id: String, + pub pattern: String, + pub confidence: f64, +} + +// --------------------------------------------------------------------------- +// Implicit feedback detection +// --------------------------------------------------------------------------- + +const POSITIVE_KEYWORDS: &[&str] = &[ + "好了", "解决了", "可以了", "对了", "完美", + "谢谢", "很好", "棒", "不错", "成功了", + "行了", "搞定了", "OK", "ok", "搞定", +]; + +const NEGATIVE_KEYWORDS: &[&str] = &[ + "没用", "不对", "还是不行", "错了", "差太远", + "不好使", "不管用", "没效果", "失败", "不行", +]; + +/// Detect implicit feedback from user messages. +/// Returns `Some(CompletionStatus)` if a clear signal is found. +pub fn detect_implicit_feedback(message: &str) -> Option { + let lower = message.to_lowercase(); + for kw in POSITIVE_KEYWORDS { + if lower.contains(kw) { + return Some(CompletionStatus::Success); + } + } + for kw in NEGATIVE_KEYWORDS { + if lower.contains(kw) { + return Some(CompletionStatus::Failed); + } + } + None +} + +// --------------------------------------------------------------------------- +// ExperienceExtractor +// --------------------------------------------------------------------------- + +/// Extracts structured experiences from successful proposals. +/// +/// Two extraction strategies: +/// 1. **LLM-assisted** — uses LLM to summarise context + steps (when driver available) +/// 2. **Template fallback** — fixed-format extraction from proposal fields +pub struct ExperienceExtractor { + experience_store: std::sync::Arc, +} + +impl ExperienceExtractor { + pub fn new(experience_store: std::sync::Arc) -> Self { + Self { experience_store } + } + + /// Extract and store an experience from a successful proposal + pain point. + /// + /// Uses template extraction as the default strategy. LLM-assisted extraction + /// can be added later by wiring a driver through the constructor. + pub async fn extract_from_proposal( + &self, + proposal: &Proposal, + pain: &PainPoint, + feedback: &ProposalFeedback, + ) -> Result<()> { + if feedback.outcome != CompletionStatus::Success && feedback.outcome != CompletionStatus::Partial { + debug!( + "[ExperienceExtractor] Skipping non-success proposal {} ({:?})", + proposal.id, feedback.outcome + ); + return Ok(()); + } + + let experience = self.template_extract(proposal, pain, feedback); + self.experience_store.store_experience(&experience).await?; + debug!( + "[ExperienceExtractor] Stored experience {} for pain '{}'", + experience.id, experience.pain_pattern + ); + Ok(()) + } + + /// Template-based extraction — deterministic, no LLM required. + fn template_extract( + &self, + proposal: &Proposal, + pain: &PainPoint, + feedback: &ProposalFeedback, + ) -> zclaw_growth::experience_store::Experience { + let solution_steps: Vec = proposal.steps.iter() + .map(|s| { + if let Some(ref hint) = s.skill_hint { + format!("{} (工具: {})", s.detail, hint) + } else { + s.detail.clone() + } + }) + .collect(); + + let context = format!( + "痛点: {} | 类别: {} | 出现{}次 | 证据: {}", + pain.summary, + pain.category, + pain.occurrence_count, + pain.evidence.iter() + .map(|e| e.user_said.as_str()) + .collect::>() + .join(";") + ); + + let outcome = match feedback.outcome { + CompletionStatus::Success => "成功解决", + CompletionStatus::Partial => "部分解决", + CompletionStatus::Failed => "未解决", + CompletionStatus::Abandoned => "已放弃", + }; + + zclaw_growth::experience_store::Experience::new( + &pain.agent_id, + &pain.summary, + &context, + solution_steps, + outcome, + ) + } + + /// Search for relevant experiences to inject into a conversation. + /// + /// Returns experiences whose pain pattern matches the user's current input. + pub async fn find_relevant_experiences( + &self, + agent_id: &str, + user_input: &str, + ) -> Vec { + match self.experience_store.find_by_pattern(agent_id, user_input).await { + Ok(experiences) => { + if !experiences.is_empty() { + // Increment reuse count for found experiences (fire-and-forget) + for exp in &experiences { + let store = self.experience_store.clone(); + let exp_clone = exp.clone(); + tokio::spawn(async move { + store.increment_reuse(&exp_clone).await; + }); + } + } + experiences + } + Err(e) => { + warn!("[ExperienceExtractor] find_relevant failed: {}", e); + Vec::new() + } + } + } + + /// Format experiences for system prompt injection. + /// Returns a concise block capped at ~200 Chinese characters. + pub fn format_for_injection( + experiences: &[zclaw_growth::experience_store::Experience], + ) -> String { + if experiences.is_empty() { + return String::new(); + } + + let mut parts = Vec::new(); + let mut total_chars = 0; + let max_chars = 200; + + for exp in experiences { + if total_chars >= max_chars { + break; + } + let step_summary = exp.solution_steps.first() + .map(|s| truncate(s, 40)) + .unwrap_or_default(); + let line = format!( + "[过往经验] 类似「{}」做过:{},结果是{}", + truncate(&exp.pain_pattern, 30), + step_summary, + exp.outcome + ); + total_chars += line.chars().count(); + parts.push(line); + } + + if parts.is_empty() { + return String::new(); + } + + format!("\n\n--- 过往经验参考 ---\n{}", parts.join("\n")) + } +} + +fn truncate(s: &str, max_chars: usize) -> String { + if s.chars().count() <= max_chars { + s.to_string() + } else { + s.chars().take(max_chars).collect::() + "…" + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::intelligence::pain_aggregator::PainSeverity; + + fn sample_pain() -> PainPoint { + PainPoint::new( + "agent-1", + "user-1", + "出口包装不合格", + "logistics", + PainSeverity::High, + "又被退了", + "recurring packaging issue", + ) + } + + fn sample_proposal(pain: &PainPoint) -> Proposal { + Proposal::from_pain_point(pain) + } + + #[test] + fn test_detect_positive_feedback() { + assert_eq!( + detect_implicit_feedback("好了,这下解决了"), + Some(CompletionStatus::Success) + ); + assert_eq!( + detect_implicit_feedback("谢谢,完美"), + Some(CompletionStatus::Success) + ); + } + + #[test] + fn test_detect_negative_feedback() { + assert_eq!( + detect_implicit_feedback("还是不行"), + Some(CompletionStatus::Failed) + ); + assert_eq!( + detect_implicit_feedback("没用啊"), + Some(CompletionStatus::Failed) + ); + } + + #[test] + fn test_no_feedback() { + assert_eq!(detect_implicit_feedback("今天天气怎么样"), None); + assert_eq!(detect_implicit_feedback("帮我查一下"), None); + } + + #[test] + fn test_template_extract() { + let viking = std::sync::Arc::new(zclaw_growth::VikingAdapter::in_memory()); + let store = std::sync::Arc::new(ExperienceStore::new(viking)); + let extractor = ExperienceExtractor::new(store); + + let pain = sample_pain(); + let proposal = sample_proposal(&pain); + let feedback = ProposalFeedback { + proposal_id: proposal.id.clone(), + outcome: CompletionStatus::Success, + user_comment: Some("好了".into()), + detected_at: Utc::now(), + }; + + let exp = extractor.template_extract(&proposal, &pain, &feedback); + assert!(!exp.id.is_empty()); + assert_eq!(exp.agent_id, "agent-1"); + assert!(!exp.solution_steps.is_empty()); + assert_eq!(exp.outcome, "成功解决"); + } + + #[test] + fn test_format_for_injection_empty() { + assert!(ExperienceExtractor::format_for_injection(&[]).is_empty()); + } + + #[test] + fn test_format_for_injection_with_data() { + let exp = zclaw_growth::experience_store::Experience::new( + "agent-1", + "出口包装问题", + "包装被退回", + vec!["检查法规".into(), "使用合规材料".into()], + "成功解决", + ); + let formatted = ExperienceExtractor::format_for_injection(&[exp]); + assert!(formatted.contains("过往经验")); + assert!(formatted.contains("出口包装问题")); + } + + #[tokio::test] + async fn test_extract_stores_experience() { + let viking = std::sync::Arc::new(zclaw_growth::VikingAdapter::in_memory()); + let store = std::sync::Arc::new(ExperienceStore::new(viking)); + let extractor = ExperienceExtractor::new(store.clone()); + + let pain = sample_pain(); + let proposal = sample_proposal(&pain); + let feedback = ProposalFeedback { + proposal_id: proposal.id.clone(), + outcome: CompletionStatus::Success, + user_comment: Some("好了".into()), + detected_at: Utc::now(), + }; + + extractor.extract_from_proposal(&proposal, &pain, &feedback).await.unwrap(); + + let found = store.find_by_agent("agent-1").await.unwrap(); + assert_eq!(found.len(), 1); + } + + #[tokio::test] + async fn test_extract_skips_failed_feedback() { + let viking = std::sync::Arc::new(zclaw_growth::VikingAdapter::in_memory()); + let store = std::sync::Arc::new(ExperienceStore::new(viking)); + let extractor = ExperienceExtractor::new(store.clone()); + + let pain = sample_pain(); + let proposal = sample_proposal(&pain); + let feedback = ProposalFeedback { + proposal_id: proposal.id.clone(), + outcome: CompletionStatus::Failed, + user_comment: Some("没用".into()), + detected_at: Utc::now(), + }; + + extractor.extract_from_proposal(&proposal, &pain, &feedback).await.unwrap(); + + let found = store.find_by_agent("agent-1").await.unwrap(); + assert!(found.is_empty(), "Should not store experience for failed feedback"); + } + + #[test] + fn test_truncate() { + assert_eq!(truncate("hello", 10), "hello"); + assert_eq!(truncate("这是一个很长的字符串用于测试截断", 10).chars().count(), 11); // 10 + … + } +} diff --git a/desktop/src-tauri/src/intelligence/mod.rs b/desktop/src-tauri/src/intelligence/mod.rs index 46fcec5..7c04e9f 100644 --- a/desktop/src-tauri/src/intelligence/mod.rs +++ b/desktop/src-tauri/src/intelligence/mod.rs @@ -36,6 +36,9 @@ pub mod pain_aggregator; pub mod solution_generator; pub mod personality_detector; pub mod pain_storage; +pub mod experience; +pub mod user_profiler; +pub mod trajectory_compressor; // Re-export main types for convenience pub use heartbeat::HeartbeatEngineState; diff --git a/desktop/src-tauri/src/intelligence/trajectory_compressor.rs b/desktop/src-tauri/src/intelligence/trajectory_compressor.rs new file mode 100644 index 0000000..0dae515 --- /dev/null +++ b/desktop/src-tauri/src/intelligence/trajectory_compressor.rs @@ -0,0 +1,328 @@ +//! Trajectory Compressor — compresses raw events into structured trajectories. +//! +//! Takes a list of `TrajectoryEvent` records and produces a single +//! `CompressedTrajectory` summarising the session. Called at session end +//! (or compaction flush) to reduce storage and prepare data for analysis. + +use chrono::Utc; +use zclaw_memory::trajectory_store::{ + CompressedTrajectory, CompletionStatus, SatisfactionSignal, TrajectoryEvent, TrajectoryStepType, +}; + +// --------------------------------------------------------------------------- +// Satisfaction detection +// --------------------------------------------------------------------------- + +const POSITIVE_SIGNALS: &[&str] = &[ + "谢谢", "很好", "解决了", "可以了", "对了", "完美", + "棒", "不错", "成功了", "行了", "搞定", +]; + +const NEGATIVE_SIGNALS: &[&str] = &[ + "不对", "没用", "还是不行", "错了", "差太远", + "不好使", "不管用", "没效果", "失败", "不行", +]; + +/// Detect user satisfaction from the last few messages. +pub fn detect_satisfaction(last_messages: &[String]) -> Option { + if last_messages.is_empty() { + return None; + } + + // Check the last user messages for satisfaction signals + for msg in last_messages.iter().rev().take(3) { + let lower = msg.to_lowercase(); + for kw in POSITIVE_SIGNALS { + if lower.contains(kw) { + return Some(SatisfactionSignal::Positive); + } + } + for kw in NEGATIVE_SIGNALS { + if lower.contains(kw) { + return Some(SatisfactionSignal::Negative); + } + } + } + + Some(SatisfactionSignal::Neutral) +} + +// --------------------------------------------------------------------------- +// Compression +// --------------------------------------------------------------------------- + +/// Compress a sequence of trajectory events into a single summary. +/// +/// Returns `None` if the events list is empty. +pub fn compress( + events: Vec, + satisfaction: Option, +) -> Option { + if events.is_empty() { + return None; + } + + let session_id = events[0].session_id.clone(); + let agent_id = events[0].agent_id.clone(); + + // Extract key steps (skip retries — consecutive same-type steps) + let key_events = deduplicate_steps(&events); + + let request_type = infer_request_type(&key_events); + let tools_used = extract_tools(&key_events); + let total_steps = key_events.len(); + let total_duration_ms: u64 = events.iter().map(|e| e.duration_ms).sum(); + let outcome = infer_outcome(&key_events, satisfaction); + let execution_chain = build_chain_json(&key_events); + + Some(CompressedTrajectory { + id: uuid::Uuid::new_v4().to_string(), + session_id, + agent_id, + request_type, + tools_used, + outcome, + total_steps, + total_duration_ms, + total_tokens: 0, // filled by middleware from context + execution_chain, + satisfaction_signal: satisfaction, + created_at: Utc::now(), + }) +} + +/// Remove consecutive duplicate step types (retries/error recovery). +fn deduplicate_steps(events: &[TrajectoryEvent]) -> Vec<&TrajectoryEvent> { + let mut result = Vec::new(); + let mut last_type: Option = None; + + for event in events { + // Keep first occurrence of each step type change + if last_type != Some(event.step_type) { + result.push(event); + last_type = Some(event.step_type); + } + } + + // If we deduplicated everything away, keep the first and last + if result.is_empty() && !events.is_empty() { + result.push(&events[0]); + if events.len() > 1 { + result.push(&events[events.len() - 1]); + } + } + + result +} + +/// Infer request type from the first user request event. +fn infer_request_type(events: &[&TrajectoryEvent]) -> String { + for event in events { + if event.step_type == TrajectoryStepType::UserRequest { + let input = &event.input_summary; + return classify_request(input); + } + } + "general".to_string() +} + +fn classify_request(input: &str) -> String { + let lower = input.to_lowercase(); + if ["报告", "数据", "统计", "报表", "汇总"].iter().any(|k| lower.contains(k)) { + return "data_report".into(); + } + if ["政策", "法规", "合规", "标准"].iter().any(|k| lower.contains(k)) { + return "policy_query".into(); + } + if ["查房", "巡房"].iter().any(|k| lower.contains(k)) { + return "inspection".into(); + } + if ["排班", "值班"].iter().any(|k| lower.contains(k)) { + return "scheduling".into(); + } + if ["会议", "日程", "安排", "提醒"].iter().any(|k| lower.contains(k)) { + return "meeting".into(); + } + if ["检查"].iter().any(|k| lower.contains(k)) { + return "inspection".into(); + } + "general".to_string() +} + +/// Extract unique tool names from ToolExecution events. +fn extract_tools(events: &[&TrajectoryEvent]) -> Vec { + let mut tools = Vec::new(); + let mut seen = std::collections::HashSet::new(); + + for event in events { + if event.step_type == TrajectoryStepType::ToolExecution { + let tool = event.input_summary.clone(); + if !tool.is_empty() && seen.insert(tool.clone()) { + tools.push(tool); + } + } + } + + tools +} + +/// Infer completion outcome from last step + satisfaction signal. +fn infer_outcome( + events: &[&TrajectoryEvent], + satisfaction: Option, +) -> CompletionStatus { + match satisfaction { + Some(SatisfactionSignal::Positive) => CompletionStatus::Success, + Some(SatisfactionSignal::Negative) => CompletionStatus::Failed, + Some(SatisfactionSignal::Neutral) => { + // Check if last meaningful step was a successful LLM generation + if events.iter().any(|e| e.step_type == TrajectoryStepType::LlmGeneration) { + CompletionStatus::Partial + } else { + CompletionStatus::Abandoned + } + } + None => CompletionStatus::Partial, + } +} + +/// Build JSON execution chain from key events. +fn build_chain_json(events: &[&TrajectoryEvent]) -> String { + let chain: Vec = events.iter().map(|e| { + serde_json::json!({ + "step": e.step_index, + "type": e.step_type.as_str(), + "input": truncate(&e.input_summary, 100), + "output": truncate(&e.output_summary, 100), + }) + }).collect(); + + serde_json::to_string(&chain).unwrap_or_else(|_| "[]".to_string()) +} + +fn truncate(s: &str, max: usize) -> String { + if s.chars().count() <= max { + s.to_string() + } else { + s.chars().take(max).collect::() + "…" + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + + fn make_event(index: usize, step_type: TrajectoryStepType, input: &str, output: &str) -> TrajectoryEvent { + TrajectoryEvent { + id: format!("evt-{}", index), + session_id: "sess-1".to_string(), + agent_id: "agent-1".to_string(), + step_index: index, + step_type, + input_summary: input.to_string(), + output_summary: output.to_string(), + duration_ms: 100, + timestamp: Utc::now(), + } + } + + #[test] + fn test_compress_empty() { + assert!(compress(vec![], None).is_none()); + } + + #[test] + fn test_compress_single_event() { + let events = vec![make_event(0, TrajectoryStepType::UserRequest, "帮我查数据", "")]; + let ct = compress(events, None).unwrap(); + assert_eq!(ct.session_id, "sess-1"); + assert_eq!(ct.total_steps, 1); + } + + #[test] + fn test_compress_full_chain() { + let events = vec![ + make_event(0, TrajectoryStepType::UserRequest, "帮我生成月度报告", ""), + make_event(1, TrajectoryStepType::ToolExecution, "collector", "5条数据"), + make_event(2, TrajectoryStepType::LlmGeneration, "", "报告已生成"), + ]; + let ct = compress(events, Some(SatisfactionSignal::Positive)).unwrap(); + assert_eq!(ct.request_type, "data_report"); + assert_eq!(ct.tools_used, vec!["collector"]); + assert_eq!(ct.outcome, CompletionStatus::Success); + assert!(ct.execution_chain.starts_with('[')); + } + + #[test] + fn test_deduplicate_retries() { + let events = vec![ + make_event(0, TrajectoryStepType::ToolExecution, "tool-a", "err"), + make_event(1, TrajectoryStepType::ToolExecution, "tool-a", "ok"), + make_event(2, TrajectoryStepType::LlmGeneration, "", "done"), + ]; + let deduped = deduplicate_steps(&events); + assert_eq!(deduped.len(), 2); // first ToolExecution + LlmGeneration + } + + #[test] + fn test_classify_request() { + assert_eq!(classify_request("帮我生成月度报告"), "data_report"); + assert_eq!(classify_request("最新的合规政策是什么"), "policy_query"); + assert_eq!(classify_request("明天有什么会议"), "meeting"); + assert_eq!(classify_request("查房安排"), "inspection"); + assert_eq!(classify_request("你好"), "general"); + } + + #[test] + fn test_detect_satisfaction_positive() { + let msgs = vec!["谢谢,很好用".to_string()]; + assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Positive)); + } + + #[test] + fn test_detect_satisfaction_negative() { + let msgs = vec!["还是不行啊".to_string()]; + assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Negative)); + } + + #[test] + fn test_detect_satisfaction_neutral() { + let msgs = vec!["好的我知道了".to_string()]; + assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Neutral)); + } + + #[test] + fn test_detect_satisfaction_empty() { + assert_eq!(detect_satisfaction(&[]), None); + } + + #[test] + fn test_infer_outcome() { + let events = vec![make_event(0, TrajectoryStepType::LlmGeneration, "", "ok")]; + assert_eq!( + infer_outcome(&events.iter().collect::>(), Some(SatisfactionSignal::Positive)), + CompletionStatus::Success + ); + assert_eq!( + infer_outcome(&events.iter().collect::>(), Some(SatisfactionSignal::Negative)), + CompletionStatus::Failed + ); + } + + #[test] + fn test_extract_tools_dedup() { + let events = vec![ + make_event(0, TrajectoryStepType::ToolExecution, "researcher", ""), + make_event(1, TrajectoryStepType::ToolExecution, "researcher", ""), + make_event(2, TrajectoryStepType::ToolExecution, "collector", ""), + ]; + let refs: Vec<&TrajectoryEvent> = events.iter().collect(); + let tools = extract_tools(&refs); + assert_eq!(tools, vec!["researcher", "collector"]); + } +} diff --git a/desktop/src-tauri/src/intelligence/user_profiler.rs b/desktop/src-tauri/src/intelligence/user_profiler.rs new file mode 100644 index 0000000..f64e170 --- /dev/null +++ b/desktop/src-tauri/src/intelligence/user_profiler.rs @@ -0,0 +1,369 @@ +//! User Profiler — aggregates extracted facts into a structured user profile. +//! +//! Takes `ExtractedFactBatch` from the growth pipeline, classifies facts by +//! category, and updates the `UserProfile` via `UserProfileStore`. +//! +//! Desktop uses "default_user" as the single user ID. + +use std::sync::Arc; + +use chrono::Utc; +use tracing::{debug, warn}; +use zclaw_memory::fact::{Fact, FactCategory}; +use zclaw_memory::user_profile_store::{ + CommStyle, Level, UserProfile, UserProfileStore, +}; +use zclaw_types::Result; + +/// Default user ID for single-user desktop mode. +const DEFAULT_USER: &str = "default_user"; + +// --------------------------------------------------------------------------- +// Classification helpers +// --------------------------------------------------------------------------- + +/// Maps a fact category to the profile field it should update. +enum ProfileFieldUpdate { + Industry(String), + Role(String), + ExpertiseLevel(Level), + CommunicationStyle(CommStyle), + PreferredTool(String), + RecentTopic(String), +} + +/// Classify a fact content into a profile update. +fn classify_fact_content(fact: &Fact) -> Option { + let content = fact.content.to_lowercase(); + + // Communication style detection + if content.contains("简洁") || content.contains("简短") || content.contains("简单说") { + return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Concise)); + } + if content.contains("详细") || content.contains("展开说") || content.contains("多说点") { + return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Detailed)); + } + if content.contains("正式") || content.contains("专业") || content.contains("官方") { + return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Formal)); + } + if content.contains("随意") || content.contains("轻松") || content.contains("随便") { + return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Casual)); + } + + // Industry / role detection + if content.contains("医疗") || content.contains("医院") || content.contains("诊所") { + return Some(ProfileFieldUpdate::Industry("医疗".into())); + } + if content.contains("制造") || content.contains("工厂") || content.contains("生产") { + return Some(ProfileFieldUpdate::Industry("制造业".into())); + } + if content.contains("教育") || content.contains("学校") || content.contains("教学") { + return Some(ProfileFieldUpdate::Industry("教育".into())); + } + if content.contains("行政") || content.contains("主任") || content.contains("管理") { + return Some(ProfileFieldUpdate::Role("行政管理".into())); + } + if content.contains("工程师") || content.contains("开发") || content.contains("技术") { + return Some(ProfileFieldUpdate::Role("技术人员".into())); + } + if content.contains("医生") || content.contains("护士") || content.contains("临床") { + return Some(ProfileFieldUpdate::Role("医务人员".into())); + } + + // Expertise level + if content.contains("新手") || content.contains("不会") || content.contains("不了解") { + return Some(ProfileFieldUpdate::ExpertiseLevel(Level::Beginner)); + } + if content.contains("熟练") || content.contains("熟悉") || content.contains("常用") { + return Some(ProfileFieldUpdate::ExpertiseLevel(Level::Expert)); + } + + // Tool preferences + if content.contains("用研究") || content.contains("帮我查") || content.contains("调研") { + return Some(ProfileFieldUpdate::PreferredTool("researcher".into())); + } + if content.contains("收集") || content.contains("整理") || content.contains("汇总") { + return Some(ProfileFieldUpdate::PreferredTool("collector".into())); + } + if content.contains("幻灯") || content.contains("演示") || content.contains("ppt") { + return Some(ProfileFieldUpdate::PreferredTool("slideshow".into())); + } + + // Default: treat as a recent topic + if fact.confidence >= 0.6 { + let topic = truncate(&fact.content, 30); + return Some(ProfileFieldUpdate::RecentTopic(topic)); + } + + None +} + +// --------------------------------------------------------------------------- +// UserProfiler +// --------------------------------------------------------------------------- + +/// Aggregates extracted facts into a structured user profile. +pub struct UserProfiler { + store: Arc, +} + +impl UserProfiler { + pub fn new(store: Arc) -> Self { + Self { store } + } + + /// Main entry point: update profile from extracted facts. + pub async fn update_from_facts( + &self, + facts: &[Fact], + ) -> Result<()> { + if facts.is_empty() { + return Ok(()); + } + + for fact in facts { + if let Some(update) = classify_fact_content(fact) { + if let Err(e) = self.apply_update(&update).await { + warn!("[UserProfiler] Failed to apply update: {}", e); + } + } + } + + // Update confidence based on number of classified facts + self.update_confidence().await; + debug!("[UserProfiler] Updated profile from {} facts", facts.len()); + Ok(()) + } + + /// Update active pain points in the profile. + pub async fn update_pain_points( + &self, + pains: Vec, + ) -> Result<()> { + // Replace all pain points by loading, modifying, and upserting + let mut profile = self.get_or_create_profile().await; + profile.active_pain_points = pains; + profile.updated_at = Utc::now(); + self.store.upsert(&profile).await + } + + /// Format relevant profile attributes for injection into system prompt. + /// Caps output at ~200 Chinese characters (≈100 tokens). + pub fn format_profile_summary(profile: &UserProfile, topic: &str) -> Option { + let mut parts = Vec::new(); + + if let Some(ref industry) = profile.industry { + parts.push(format!("行业: {}", industry)); + } + if let Some(ref role) = profile.role { + parts.push(format!("角色: {}", role)); + } + if let Some(ref level) = profile.expertise_level { + let level_str = match level { + Level::Beginner => "入门", + Level::Intermediate => "中级", + Level::Expert => "专家", + }; + parts.push(format!("水平: {}", level_str)); + } + if let Some(ref style) = profile.communication_style { + let style_str = match style { + CommStyle::Concise => "简洁", + CommStyle::Detailed => "详细", + CommStyle::Formal => "正式", + CommStyle::Casual => "随意", + }; + parts.push(format!("沟通风格: {}", style_str)); + } + + // Only add topics relevant to the current conversation + if !profile.recent_topics.is_empty() { + let relevant: Vec<&str> = profile.recent_topics.iter() + .filter(|t| { + let t_lower = t.to_lowercase(); + let topic_lower = topic.to_lowercase(); + t_lower.chars().any(|c| topic_lower.contains(c)) + || topic_lower.chars().any(|c| t_lower.contains(c)) + }) + .take(3) + .map(|s| s.as_str()) + .collect(); + if !relevant.is_empty() { + parts.push(format!("近期话题: {}", relevant.join(", "))); + } + } + + if parts.is_empty() { + return None; + } + + let summary = format!("[用户画像] {}", parts.join(" | ")); + if summary.chars().count() > 200 { + Some(truncate(&summary, 200)) + } else { + Some(summary) + } + } + + // -- internal helpers -- + + async fn apply_update(&self, update: &ProfileFieldUpdate) -> Result<()> { + match update { + ProfileFieldUpdate::Industry(v) => { + self.store.update_field(DEFAULT_USER, "industry", v).await + } + ProfileFieldUpdate::Role(v) => { + self.store.update_field(DEFAULT_USER, "role", v).await + } + ProfileFieldUpdate::ExpertiseLevel(v) => { + let val = match v { + Level::Beginner => "beginner", + Level::Intermediate => "intermediate", + Level::Expert => "expert", + }; + self.store.update_field(DEFAULT_USER, "expertise_level", val).await + } + ProfileFieldUpdate::CommunicationStyle(v) => { + let val = match v { + CommStyle::Concise => "concise", + CommStyle::Detailed => "detailed", + CommStyle::Formal => "formal", + CommStyle::Casual => "casual", + }; + self.store.update_field(DEFAULT_USER, "communication_style", val).await + } + ProfileFieldUpdate::PreferredTool(tool) => { + self.store.add_preferred_tool(DEFAULT_USER, tool, 5).await + } + ProfileFieldUpdate::RecentTopic(topic) => { + self.store.add_recent_topic(DEFAULT_USER, topic, 10).await + } + } + } + + async fn update_confidence(&self) { + if let Ok(Some(profile)) = self.store.get(DEFAULT_USER).await { + let filled = [ + profile.industry.is_some(), + profile.role.is_some(), + profile.expertise_level.is_some(), + profile.communication_style.is_some(), + !profile.recent_topics.is_empty(), + ].iter().filter(|&&x| x).count() as f32; + + let confidence = (filled / 5.0).min(1.0); + let conf_str = format!("{:.2}", confidence); + if let Err(e) = self.store.update_field(DEFAULT_USER, "confidence", &conf_str).await { + warn!("[UserProfiler] Failed to update confidence: {}", e); + } + } + } + + async fn get_or_create_profile(&self) -> UserProfile { + match self.store.get(DEFAULT_USER).await { + Ok(Some(p)) => p, + _ => UserProfile::default_profile(), + } + } +} + +fn truncate(s: &str, max_chars: usize) -> String { + if s.chars().count() <= max_chars { + s.to_string() + } else { + s.chars().take(max_chars).collect::() + "…" + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_classify_communication_style() { + let fact = Fact::new("喜欢简洁的回答".to_string(), FactCategory::Preference, 0.8); + let update = classify_fact_content(&fact); + assert!(matches!(update, Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Concise)))); + + let fact2 = Fact::new("请详细说明".to_string(), FactCategory::Preference, 0.8); + let update2 = classify_fact_content(&fact2); + assert!(matches!(update2, Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Detailed)))); + } + + #[test] + fn test_classify_industry() { + let fact = Fact::new("我在医院工作".to_string(), FactCategory::Knowledge, 0.8); + let update = classify_fact_content(&fact); + assert!(matches!(update, Some(ProfileFieldUpdate::Industry(ref s)) if s == "医疗")); + } + + #[test] + fn test_classify_role() { + let fact = Fact::new("我是行政主任".to_string(), FactCategory::Knowledge, 0.8); + let update = classify_fact_content(&fact); + assert!(matches!(update, Some(ProfileFieldUpdate::Role(ref s)) if s == "行政管理")); + } + + #[test] + fn test_classify_expertise() { + let fact = Fact::new("我是新手".to_string(), FactCategory::Knowledge, 0.8); + let update = classify_fact_content(&fact); + assert!(matches!(update, Some(ProfileFieldUpdate::ExpertiseLevel(Level::Beginner)))); + } + + #[test] + fn test_classify_tool() { + let fact = Fact::new("帮我调研一下市场".to_string(), FactCategory::Preference, 0.8); + let update = classify_fact_content(&fact); + assert!(matches!(update, Some(ProfileFieldUpdate::PreferredTool(ref s)) if s == "researcher")); + } + + #[test] + fn test_classify_topic_fallback() { + let fact = Fact::new("关于季度报告的编制流程".to_string(), FactCategory::Behavior, 0.7); + let update = classify_fact_content(&fact); + assert!(matches!(update, Some(ProfileFieldUpdate::RecentTopic(_)))); + } + + #[test] + fn test_classify_low_confidence_ignored() { + let fact = Fact::new("关于季度报告的编制流程".to_string(), FactCategory::Behavior, 0.3); + let update = classify_fact_content(&fact); + assert!(update.is_none()); + } + + #[test] + fn test_format_profile_summary() { + let profile = UserProfile { + user_id: "default_user".to_string(), + industry: Some("医疗".to_string()), + role: Some("行政主任".to_string()), + expertise_level: Some(Level::Intermediate), + communication_style: Some(CommStyle::Concise), + preferred_language: "zh-CN".to_string(), + recent_topics: vec!["排班管理".to_string()], + active_pain_points: vec![], + preferred_tools: vec![], + confidence: 0.6, + updated_at: Utc::now(), + }; + + let summary = UserProfiler::format_profile_summary(&profile, "排班"); + assert!(summary.is_some()); + let text = summary.unwrap(); + assert!(text.contains("医疗")); + assert!(text.contains("行政主任")); + assert!(text.contains("排班管理")); + } + + #[test] + fn test_format_profile_empty() { + let profile = UserProfile::default_profile(); + let summary = UserProfiler::format_profile_summary(&profile, "test"); + assert!(summary.is_none()); + } +} diff --git a/docs/superpowers/specs/2026-04-09-hermes-intelligence-pipeline-design.md b/docs/superpowers/specs/2026-04-09-hermes-intelligence-pipeline-design.md new file mode 100644 index 0000000..6310692 --- /dev/null +++ b/docs/superpowers/specs/2026-04-09-hermes-intelligence-pipeline-design.md @@ -0,0 +1,742 @@ +# Hermes Intelligence Pipeline Design + +> 基于 Hermes Agent (Nous Research) 竞品分析,吸收 4 个核心理念到 ZCLAW 的详细设计方案。 +> 架构方案:Pipeline Closure — 闭合现有管线断点,不引入新架构层。 + +## Context + +Hermes Agent 验证了"一个管家 + 记忆飞轮"的方向,其 4 个核心创新对 ZCLAW 发布后迭代有直接参考价值: + +1. **自我改进闭环** — 执行 → 评估 → 提取技能 → 改进 → 复用 +2. **用户建模** — 三层记忆栈 + 统一用户画像 +3. **自然语言 Cron** — LLM 解析自然语言为定时任务 +4. **轨迹压缩** — 工具调用链 → 结构化 JSON → RL 基础 + +**关键诊断:** ZCLAW 缺的不是模块,是管线没接通。现有 PainAggregator、SolutionGenerator、Reflection、Heartbeat、MemoryExtractor 等组件已就位,但彼此断开。本设计闭合这些断点。 + +**范围约束:** +- 管家路由器(ButlerRouterMiddleware + SemanticSkillRouter 接通)由另一个会话推进,本设计标注为外部依赖 +- 发布后迭代,不影响当前发布计划 +- 4 个理念全部设计,按优先级排序:自我改进闭环 > 用户建模 > NL Cron > 轨迹压缩 + +**总代码量估算:** ~2200 行新增/修改(~1700 新增 + ~500 修改) + +### 类型约定 + +本设计使用以下 ID 类型约定: + +```rust +// 所有 Rust 原生结构体使用强类型 +use uuid::Uuid; +use zclaw_types::{AgentId, SessionId}; + +// 为新实体定义类型别名(newtype wrapper 在 Tauri 命令层解包为 String) +type ExperienceId = String; // Uuid::new_v4().to_string() +type ProposalId = String; // 与现有 Proposal.id 一致 +type TrajectoryId = String; // Uuid::new_v4().to_string() +``` + +Rust 内部结构体使用 `AgentId`、`SessionId`;Tauri 命令边界使用 `String`(Tauri serialize 要求)。 + +### 统一完成状态枚举 + +跨 Section 1/4 使用统一的完成状态: + +```rust +/// 通用完成状态,所有 Outcome 枚举的基础 +enum CompletionStatus { + Success, + Partial, + Failed, + Abandoned, // Section 1 不使用此变体(运行时约定,非编译时约束) +} +``` + +Section 1 的 Experience 使用 `CompletionStatus`(不含 Abandoned),Section 4 的 CompressedTrajectory 使用完整版。 + +--- + +## Section 1: 自我改进闭环 + +### 目标 + +用户反馈痛点 → 自动识别 → 自动生成方案 → 方案成功后提取为可复用经验 → 下次类似问题直接复用。 + +### 数据流 + +``` +用户消息 → PainAggregator(已有) + ↓ confidence >= 0.7 + SolutionGenerator(已有,改为自动触发) + ↓ 生成 Proposal + 等待用户反馈(成功/失败) + ↓ 成功 + ExperienceExtractor(新增) + ↓ 生成结构化经验 + ExperienceStore(新增,SQLite) + ↓ 下次对话 + MemoryMiddleware(已有)注入相关经验 +``` + +### 关键断点修复 + +**断点 1:PainAggregator → SolutionGenerator(未自动触发)** + +- 文件:`desktop/src-tauri/src/intelligence/pain_aggregator.rs` +- 当 `confidence >= 0.7` 时,通过 Tauri event 自动调用 `butler_generate_solution` +- 新增 `PainConfirmedEvent` 事件结构体 + +**断点 2:方案结果反馈(无反馈机制)** + +- 新增 `ProposalFeedback` 结构体 +- 在聊天流中检测用户隐式反馈关键词("好了""解决了""没用") +- 新增 Tauri 命令 `butler_submit_proposal_feedback` + +**断点 3:成功方案 → 结构化经验(完全缺失)** + +- 新增 `ExperienceExtractor`:从成功方案中提取经验 +- LLM 辅助提取(复用现有 LlmDriver),fallback 到模板提取 +- 存入 VikingStorage(使用 scope 前缀 `experience://{agent_id}/`) + +**断点 4:经验复用(完全缺失)** + +- 扩展 `MemoryMiddleware`:用户新消息时,通过 VikingStorage 检索相关经验 +- 使用 scope 过滤 `experience://` 前缀 + TF-IDF 相关性匹配 +- 相似度 > 阈值时,注入"过往经验"到 system prompt +- 格式:`[过往经验] 类似情况 X 做过 Y,结果是 Z` + +### 数据结构 + +```rust +// 新增文件:desktop/src-tauri/src/intelligence/experience.rs + +use zclaw_types::AgentId; +use uuid::Uuid; + +struct Experience { + id: ExperienceId, + agent_id: AgentId, + pain_pattern: String, // 触发模式(关键词摘要) + context: String, // 问题上下文 + solution_steps: Vec, // 解决步骤 + outcome: CompletionStatus, // Success | Partial(经验只记录成功的) + source_proposal_id: Option, + reuse_count: usize, + created_at: DateTime, +} + +struct ProposalFeedback { + proposal_id: ProposalId, + outcome: CompletionStatus, // Success | Failed | Partial + user_comment: Option, + detected_at: DateTime, +} + +struct PainConfirmedEvent { + pain_point_id: String, // PainPoint.id (Uuid String) + pattern: String, + confidence: f32, +} +``` + +### 存储策略 + +经验存储在现有 VikingStorage 中,使用 scope 前缀区分: + +```rust +// Experience 存储为 VikingStorage memory entry +scope: "agent://{agent_id}/experience/{pattern_hash}" // 遵循 OpenViking URI 约定 +content: JSON(Experience) // 序列化的完整 Experience 结构体 +``` + +**为什么不用独立的 experiences + FTS5 表:** +- VikingStorage 已有成熟的 FTS5 + TF-IDF + embedding 检索管道 +- MemoryMiddleware 已与 VikingStorage 集成,增加 scope 前缀即可区分 +- 避免维护两套独立的 FTS5 索引 + +独立的 `experience_store.rs` 文件负责 VikingStorage CRUD + scope 过滤,不创建新表。 + +### 迁移策略 + +不需要新数据库表或 schema 变更。经验数据通过 VikingStorage 的现有 memory 表存储,使用 scope 前缀区分。 + +### 错误处理 + +- ExperienceExtractor LLM 调用失败 → fallback 到模板提取(固定格式提取 solution_steps) +- ProposalFeedback 检测失败 → 不阻塞对话,静默跳过 +- 经验注入失败 → MemoryMiddleware 记录 warn 日志,不注入,不影响正常对话 +- 所有错误遵循代码库约定:非关键路径使用 `log::warn!` / `log::error!`,不阻塞主流程 + +### 测试计划 + +| 测试目标 | 文件位置 | 覆盖场景 | +|----------|---------|---------| +| ExperienceExtractor | `experience.rs` 内联 `#[cfg(test)]` | LLM 提取成功/failure fallback、模板提取 | +| ExperienceStore | `experience_store.rs` 内联 | CRUD 往返、scope 过滤、VikingStorage 集成 | +| PainConfirmedEvent 触发 | `pain_aggregator.rs` 测试扩展 | confidence >= 0.7 触发事件 | +| 经验注入 | MemoryMiddleware 测试 | 相关性过滤、token 限制、空结果处理 | +| ProposalFeedback 检测 | `solution_generator.rs` 测试扩展 | 隐式反馈关键词匹配 | + +### 文件清单 + +| 文件 | 用途 | 预估行数 | +|------|------|---------| +| `desktop/src-tauri/src/intelligence/experience.rs` | ExperienceExtractor + 逻辑 | ~250 | +| `crates/zclaw-growth/src/experience_store.rs` | VikingStorage scope CRUD | ~120 | +| 改动 `pain_aggregator.rs` | 自动触发 SolutionGenerator | ~30 | +| 改动 `solution_generator.rs` | Proposal feedback 槽位 | ~40 | +| 改动 `intelligence_hooks.rs` | 新增 post-proposal-evaluation hook | ~50 | +| 改动 MemoryMiddleware | 经验注入逻辑(scope 过滤) | ~60 | +| 改动 `crates/zclaw-memory/src/lib.rs` | 导出新模块 | ~5 | + +**预估:~555 行新增/修改** + +--- + +## Section 2: 用户建模 + +### 目标 + +从每次对话中持续提取用户特征,聚合为结构化画像,注入到路由和生成环节。 + +### 数据流 + +``` +对话消息 → MemoryExtractor(已有) + ↓ + UserProfiler(新增) + ↓ 聚合到 UserProfile + UserProfileStore(新增,SQLite) + ↓ + ├→ ButlerRouter(外部依赖,另一个会话) + │ → 路由决策考虑用户偏好 + └→ MemoryMiddleware(已有) + → system prompt 注入用户画像摘要 +``` + +### 设计决策 + +**为什么新建 UserProfile 而不沿用 IdentityManager.user_profile?** + +现有 user_profile 是非结构化 markdown,无法做条件查询。Profile injection 已被有意禁用(`identity.rs:291-298`),因为它导致模型过度关注旧话题。需要结构化画像做相关性过滤后注入。 + +**单用户桌面场景:** 桌面版使用 `"default_user"` 作为 user_id(与 PainAggregator 一致),仅维护一条 UserProfile 记录。 + +### 数据结构 + +```rust +// 新增文件:crates/zclaw-memory/src/user_profile_store.rs + +struct UserProfile { + user_id: String, // "default_user"(桌面版单用户) + + // 静态属性(低频更新) + industry: Option, // "医疗" "制造业" + role: Option, // "行政主任" "厂长" + expertise_level: Option, // Beginner / Intermediate / Expert + communication_style: Option, // Concise / Detailed / Formal / Casual + preferred_language: String, // "zh-CN" + + // 动态属性(高频更新) + recent_topics: Vec, // 最近 7 天的话题 + active_pain_points: Vec, // 当前未解决痛点 + preferred_tools: Vec, // 常用技能/工具 + + // 元数据 + updated_at: DateTime, + confidence: f32, // 画像置信度 +} + +enum Level { Beginner, Intermediate, Expert } +enum CommStyle { Concise, Detailed, Formal, Casual } +``` + +### 聚合逻辑(UserProfiler) + +1. **MemoryExtractor 输出 → 分类**:已提取的记忆按 `UserPreference` / `UserFact` / `AgentLesson` 分类 +2. **分类后聚合**: + - `UserPreference` → 更新 `communication_style`, `preferred_tools` + - `UserFact` → 更新 `industry`, `role`, `expertise_level` + - `AgentLesson` → 更新 `recent_topics` + - PainAggregator 的活跃痛点 → 更新 `active_pain_points` +3. **去重 + 衰减**:相似属性合并,超过 30 天无佐证的属性降低 confidence +4. **存储**:单用户单条记录(upsert),SQLite `user_profiles` 表 + +### 注入逻辑 + +```rust +// 在 MemoryMiddleware 中新增 +fn inject_user_profile(&self, ctx: &mut MiddlewareContext, profile: &UserProfile) { + // 只注入与当前话题相关的属性 + let relevant = self.filter_by_relevance(profile, &ctx.user_input); + if relevant.is_empty() { return; } + + // 格式化为简洁摘要,不超过 100 tokens + let summary = format_user_profile_summary(&relevant); + ctx.system_prompt.push_str(&summary); +} +``` + +**关键约束:** 注入内容不超过 100 tokens,只注入与当前话题相关的属性。 + +### 与管家路由器的协作(外部依赖) + +当管家路由器接通后: +- ButlerRouterMiddleware 可读取 UserProfile.industry 和 role +- 路由时考虑用户背景 +- 本设计只提供数据接口,路由逻辑由另一个会话处理 + +### 迁移策略 + +新增 `user_profiles` 表,通过 `schema.rs` 的 `MIGRATIONS` 数组递增版本。初始版本包含 CREATE TABLE + 默认 "default_user" 行。 + +```rust +// 在 schema.rs MIGRATIONS 数组新增 +("CREATE TABLE IF NOT EXISTS user_profiles (...)", "DROP TABLE IF EXISTS user_profiles") +``` + +### 错误处理 + +- UserProfileStore 读写失败 → `log::warn!` + 返回 None,不阻塞对话 +- UserProfiler 聚合失败 → 保留上次有效画像,不覆盖 +- Profile 注入失败 → MemoryMiddleware 降级到无 profile 注入模式 +- 所有操作遵循:非关键路径错误不阻塞主流程 + +### 测试计划 + +| 测试目标 | 文件位置 | 覆盖场景 | +|----------|---------|---------| +| UserProfileStore | `user_profile_store.rs` 内联 | CRUD 往返、upsert 去重、JSON 字段序列化 | +| UserProfiler 聚合 | `user_profiler.rs` 内联 | 分类正确性、去重、衰减、空输入 | +| Profile 注入 | MemoryMiddleware 测试扩展 | 相关性过滤、100 token 限制、空 profile | +| 迁移 | schema 测试 | 新建 + 升级路径 | + +### 数据库 Schema + +```sql +CREATE TABLE IF NOT EXISTS user_profiles ( + user_id TEXT PRIMARY KEY, + industry TEXT, + role TEXT, + expertise_level TEXT, -- 'Beginner' | 'Intermediate' | 'Expert' + communication_style TEXT, -- 'Concise' | 'Detailed' | 'Formal' | 'Casual' + preferred_language TEXT DEFAULT 'zh-CN', + recent_topics TEXT, -- JSON array + active_pain_points TEXT, -- JSON array + preferred_tools TEXT, -- JSON array + confidence REAL DEFAULT 0.0, + updated_at TEXT NOT NULL +); +``` + +### 文件清单 + +| 文件 | 用途 | 预估行数 | +|------|------|---------| +| `crates/zclaw-memory/src/user_profile_store.rs` | UserProfile 结构体 + SQLite CRUD | ~200 | +| `desktop/src-tauri/src/intelligence/user_profiler.rs` | 聚合逻辑 | ~180 | +| 改动 `MemoryMiddleware` | profile 注入(相关性过滤) | ~80 | +| 改动 `intelligence_hooks.rs` | post-extraction 触发 UserProfiler | ~30 | +| 改动 `crates/zclaw-memory/src/lib.rs` | 导出新模块 | ~5 | + +**预估:~495 行新增/修改** + +--- + +## Section 3: 自然语言 Cron + +### 目标 + +用户说"每天早上9点提醒我查房" → 系统解析为 `0 9 * * *` → 自动创建定时任务。 + +### 数据流 + +``` +用户消息(含时间意图) + ↓ +意图分类(ButlerRouter / 正则预检) + ↓ 检测到"定时/提醒"意图 +NlScheduleParser(新增,位于 zclaw-runtime) + ↓ 解析为 ParsedSchedule +ScheduleConfirmDialog(新增) + ↓ 用户确认 "每天早上9点 → 0 9 * * *" +SchedulerService(已有,位于 zclaw-kernel) + ↓ 创建定时任务 +TriggerManager(已有) + ↓ 到时触发 +Hand 执行(已有) +``` + +### 解析策略(三层 fallback) + +**Layer 1: 正则模式匹配(覆盖 80% 常见场景)** + +| 模式 | 示例 | Cron | +|------|------|------| +| 每天 + 时间 | 每天早上9点 | `0 9 * * *` | +| 每周N + 时间 | 每周一上午10点 | `0 10 * * 1` | +| 工作日 + 时间 | 工作日下午3点 | `0 15 * * 1-5` | +| 每N小时 | 每2小时 | `0 */2 * * *` | +| 每月N号 | 每月1号 | `0 0 1 * *` | +| 相对时间 | 明天下午3点 | 一次性 ISO | + +**Layer 2: LLM 辅助解析(覆盖模糊/复杂表述)** + +- 使用 Haiku(~50 tokens 输入,~20 tokens 输出) +- 处理如"下个月开始每周二和周四提醒我" + +**Layer 3: 交互澄清(无法确定时)** + +- "我理解您想设置定时任务,请确认:..." + +### 数据结构 + +```rust +// 新增文件:crates/zclaw-runtime/src/nl_schedule.rs +// 放在 runtime 层因为这是纯文本→cron工具,不依赖 kernel 协调 + +use zclaw_types::AgentId; + +struct ParsedSchedule { + cron_expression: String, // "0 9 * * *" + natural_description: String, // "每天早上9点" + confidence: f32, + task_description: String, // "查房提醒" + task_target: TaskTarget, +} + +/// 定时任务目标 +enum TaskTarget { + Agent(AgentId), // 触发指定 agent + Hand(String), // 触发指定 hand(工具名) + Workflow(String), // 触发指定 workflow(名称) +} + +enum ScheduleParseResult { + Exact(ParsedSchedule), // 高置信度,直接确认 + Ambiguous(Vec), // 多种理解,需选择 + Unclear, // 需要澄清 +} +``` + +### 确认流程 + +1. 用户说"每天早上9点提醒我查房" +2. 解析为 `{ cron: "0 9 * * *", desc: "查房提醒" }` +3. 系统回复:"好的,我为您设置了:**每天早上 9:00** 提醒查房。确认吗?" +4. 用户确认 → 调用已有 `SchedulerService.create_trigger()` +5. 用户修正 → 重新解析或手动编辑 + +### 迁移策略 + +不需要新数据库表。NlScheduleParser 是纯计算工具,输出通过现有 `SchedulerService` + `TriggerManager` 存储。 + +### 错误处理 + +- 正则匹配失败 → 尝试 Layer 2 LLM 解析 +- LLM 解析失败 → 返回 `ScheduleParseResult::Unclear`,触发交互澄清 +- 定时任务创建失败 → 向用户报告错误,建议手动设置 +- 所有错误不阻塞对话流程 + +### 测试计划 + +| 测试目标 | 文件位置 | 覆盖场景 | +|----------|---------|---------| +| 正则解析 | `nl_schedule.rs` 内联 | 10+ 中文时间表述模式、边界值、无效输入 | +| LLM fallback | mock 测试 | LLM 返回无效 cron 时的容错 | +| ParsedSchedule | 单元测试 | 序列化、字段完整性 | +| TaskTarget 枚举 | 单元测试 | 各变体匹配现有类型 | +| 确认流程 | 集成测试 | 完整 parse → confirm → create 链路 | + +### 文件清单 + +| 文件 | 用途 | 预估行数 | +|------|------|---------| +| `crates/zclaw-runtime/src/nl_schedule.rs` | NlScheduleParser + 中文模式库 | ~300 | +| 改动 `intelligence_hooks.rs` | 检测定时意图并触发解析 | ~40 | +| 改动 desktop store + component | 确认对话框交互 | ~150 | +| 改动 `crates/zclaw-kernel/src/scheduler.rs` | 接受 cron 字符串输入 | ~20 | + +**预估:~510 行新增/修改** + +--- + +## Section 4: 轨迹压缩 + +### 目标 + +记录完整的工具调用链(用户请求 → 意图分类 → 技能选择 → 执行步骤 → 结果 → 用户满意度),压缩为结构化 JSON,作为未来 RL/改进的基础数据。 + +### 数据流 + +``` +用户请求 + ↓ +AgentLoop(已有) + ↓ 每步通过中间件记录 +TrajectoryRecorderMiddleware(新增,实现 AgentMiddleware trait) + ↓ 异步写入 trajectory_events 表 + ↓ 会话结束时 +TrajectoryCompressor(新增) + ↓ 压缩为结构化 JSON +compressed_trajectories 表 + ↓ 可选 +导出为 RL 训练数据格式 +``` + +### 关键设计决策:TrajectoryRecorder 作为中间件 + +TrajectoryRecorder 实现 `AgentMiddleware` trait(来自 `zclaw-runtime`),利用现有中间件钩子: + +- `before_completion` → 记录 `UserRequest` 步骤 +- `after_tool_call` → 记录 `ToolExecution` 步骤 +- `after_completion` → 记录 `LlmGeneration` 步骤 + 会话结束时触发压缩 + +**为什么不用自定义 AgentLoop hook:** +- 现有中间件系统已提供所有需要的钩子点 +- `MiddlewareContext` 已暴露 `agent_id`、`session_id`、`user_input`、`input_tokens`、`output_tokens` +- 符合 Pipeline Closure 原则:不引入新架构层 + +优先级设置:600-799 范围(遥测类别),确保在业务中间件之后运行。注意现有 `token_calibration` 中间件已占用优先级 700,推荐使用 650。 + +### 数据结构 + +```rust +// 新增文件:crates/zclaw-memory/src/trajectory_store.rs + +use zclaw_types::{AgentId, SessionId}; +use uuid::Uuid; + +/// 单条轨迹事件(细粒度,按步骤记录) +struct TrajectoryEvent { + id: TrajectoryId, + session_id: SessionId, + agent_id: AgentId, + step_index: usize, + step_type: TrajectoryStepType, + input_summary: String, // ≤200 字 + output_summary: String, // ≤200 字 + duration_ms: u64, + timestamp: DateTime, +} + +enum TrajectoryStepType { + UserRequest, // 用户原始请求 + IntentClassification, // 意图分类结果 + SkillSelection, // 选择了哪个技能 + ToolExecution, // 工具调用 + LlmGeneration, // LLM 生成 + UserFeedback, // 用户反馈 +} + +/// 压缩后的完整轨迹(会话结束时生成) +struct CompressedTrajectory { + id: TrajectoryId, + session_id: SessionId, + agent_id: AgentId, + + request_type: String, // "data_report" "policy_query" + tools_used: Vec, // ["researcher", "collector"] + outcome: CompletionStatus, // Success | Partial | Failed | Abandoned + total_steps: usize, + total_duration_ms: u64, + total_tokens: u32, + + execution_chain: String, // JSON: [{step, tool, result_summary}] + + satisfaction_signal: Option, + + created_at: DateTime, +} + +enum SatisfactionSignal { + Positive, // "谢谢""很好""解决了" + Negative, // "不对""没用""还是不行" + Neutral, // 无明显信号 +} +``` + +### 记录策略 + +**低开销原则:** 轨迹记录不能影响正常对话性能。 + +1. **事件记录:** 每步只存 `step_type + input_summary(≤200字) + output_summary(≤200字)` +2. **异步写入:** 通过 `tokio::spawn` 异步写入 SQLite,不阻塞主流程 +3. **压缩触发:** 会话结束时(compactor flush 或 session close),异步压缩 +4. **保留策略:** 压缩后删除原始事件(保留 7 天),压缩轨迹保留 90 天 + +### 压缩算法 + +```rust +fn compress(events: Vec) -> CompressedTrajectory { + // 1. 提取关键步骤(跳过中间重试/错误恢复) + // 2. 合并连续相同类型的步骤 + // 3. 生成 execution_chain JSON + // 4. 推断 outcome(最后一步是否成功 + 用户反馈信号) + // 5. 统计 token 用量和耗时 +} +``` + +### 与自我改进闭环的协作 + +当 ExperienceExtractor 运行时: +- 查询 `compressed_trajectories` 找到类似场景的历史轨迹 +- 评估"这个方案上次用了几步?成功率多少?" +- 为经验提取提供数据支撑 + +### 未来 RL 扩展(本次不实施) + +- `execution_chain` 可直接转换为 Atropos/GEPA 训练格式 +- `satisfaction_signal` 可作为 reward signal +- RL 训练管道不在本次范围内 + +### 迁移策略 + +通过 `schema.rs` 的 `MIGRATIONS` 数组递增版本(使用 `&[&str]` 扁平数组格式,与现有代码一致),新增 `trajectory_events` 和 `compressed_trajectories` 两张表。 + +```rust +// 在 schema.rs MIGRATIONS 数组新增(扁平 &str 数组,无 down migration) +&[ + "CREATE TABLE IF NOT EXISTS trajectory_events ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + ... + ); + CREATE TABLE IF NOT EXISTS compressed_trajectories ( + ... + ); + CREATE INDEX IF NOT EXISTS idx_trajectory_session ON trajectory_events(session_id);", +] +``` + +### 错误处理 + +- TrajectoryRecorder 异步写入失败 → `log::warn!`,不重试,丢弃单条事件 +- TrajectoryCompressor 压缩失败 → `log::warn!`,原始事件保留 7 天后自动清理 +- 压缩轨迹查询失败 → ExperienceExtractor 降级到无历史数据模式 +- 所有操作:非关键路径错误不阻塞对话 + +### 测试计划 + +| 测试目标 | 文件位置 | 覆盖场景 | +|----------|---------|---------| +| TrajectoryStore CRUD | `trajectory_store.rs` 内联 | 插入/查询/删除、session 过滤 | +| 压缩算法 | `trajectory_compressor.rs` 内联 | 正常压缩、空事件、单步事件、合并去重 | +| TrajectoryRecorderMiddleware | 中间件测试 | before/after 钩子记录、空输入跳过 | +| 保留策略 | 集成测试 | 7 天清理、90 天清理 | +| 满意度检测 | 单元测试 | 正/负/中性关键词匹配 | + +### 数据库 Schema + +```sql +CREATE TABLE IF NOT EXISTS trajectory_events ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + step_index INTEGER NOT NULL, + step_type TEXT NOT NULL, + input_summary TEXT, + output_summary TEXT, + duration_ms INTEGER DEFAULT 0, + timestamp TEXT NOT NULL +); + +CREATE INDEX idx_trajectory_session ON trajectory_events(session_id); + +CREATE TABLE IF NOT EXISTS compressed_trajectories ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + request_type TEXT NOT NULL, + tools_used TEXT, -- JSON array + outcome TEXT NOT NULL, -- 'Success'|'Partial'|'Failed'|'Abandoned' + total_steps INTEGER DEFAULT 0, + total_duration_ms INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + execution_chain TEXT NOT NULL, -- JSON + satisfaction_signal TEXT, -- 'Positive'|'Negative'|'Neutral'|NULL + created_at TEXT NOT NULL +); + +CREATE INDEX idx_ct_request_type ON compressed_trajectories(request_type); +CREATE INDEX idx_ct_outcome ON compressed_trajectories(outcome); +``` + +### 文件清单 + +| 文件 | 用途 | 预估行数 | +|------|------|---------| +| `crates/zclaw-memory/src/trajectory_store.rs` | TrajectoryEvent + CompressedTrajectory + SQLite CRUD | ~250 | +| `crates/zclaw-runtime/src/middleware/trajectory_recorder.rs` | AgentMiddleware 实现 | ~150 | +| `desktop/src-tauri/src/intelligence/trajectory_compressor.rs` | 压缩算法 | ~120 | +| 改动 `crates/zclaw-memory/src/lib.rs` | 导出新模块 | ~5 | +| 改动 `crates/zclaw-kernel/src/kernel/mod.rs` | 注册中间件(priority 650) | ~10 | + +**预估:~535 行新增/修改** + +--- + +## 总览 + +### 代码量汇总 + +| 理念 | 新增 | 修改 | 总计 | 优先级 | +|------|------|------|------|--------| +| 自我改进闭环 | ~400 | ~155 | ~555 | P1 | +| 用户建模 | ~380 | ~115 | ~495 | P2 | +| 自然语言 Cron | ~320 | ~190 | ~510 | P3 | +| 轨迹压缩 | ~525 | ~15 | ~540 | P4 | +| **总计** | **~1625** | **~475** | **~2100** | — | + +### 实施顺序和依赖关系 + +``` +Section 1 (自我改进闭环) ← 立即开始 + ↓ +Section 2 (用户建模) ← 可与 Section 1 并行,无强依赖 + ↓ +Section 3 (NL Cron) ← 依赖 Section 2 的 UserProfile(可选)+ 管家路由器(外部) + ↓ +Section 4 (轨迹压缩) ← 可与 Section 1-3 并行,无依赖 +``` + +Section 1 和 2 可以并行开发。Section 3 建议在管家路由器接通后实施。Section 4 完全独立。 + +### 外部依赖 + +- 管家路由器(ButlerRouterMiddleware + SemanticSkillRouter 接通)— 另一个会话推进 +- 痛点数据持久化(内存 → SQLite)— 已在 pre-release strategy 中规划 + +### intelligence_hooks.rs 管理 + +当前 `intelligence_hooks.rs` 约 281 行。本设计新增约 120 行钩子代码(Section 1: ~50, Section 2: ~30, Section 3: ~40)。 +如果文件超过 400 行,应拆分为 `hooks/` 子模块: +- `hooks/pain.rs` — 痛点相关钩子 +- `hooks/profile.rs` — 用户画像钩子 +- `hooks/schedule.rs` — 定时任务意图检测 +- `hooks/mod.rs` — 统一注册 + +### 验证方式 + +每个 Section 完成后的验证步骤: + +1. **自我改进闭环:** 人工模拟痛点对话 → 验证自动生成方案 → 验证经验提取 → 验证经验复用注入 +2. **用户建模:** 多轮对话 → 检查 UserProfile 各字段是否正确聚合 → 验证注入内容相关性 +3. **NL Cron:** 测试 10+ 种中文时间表述 → 验证 cron 输出 → 验证定时任务创建 +4. **轨迹压缩:** 完整对话流程 → 检查 trajectory_events 记录 → 验证压缩结果 → 检查异步写入无阻塞 + +### 验证命令 + +```bash +# Rust 编译检查 +cargo check --workspace --exclude zclaw-saas + +# Rust 测试 +cargo test --workspace --exclude zclaw-saas + +# TypeScript 类型检查 +cd desktop && pnpm tsc --noEmit + +# 前端测试 +cd desktop && pnpm vitest run +```