feat(hermes): implement intelligence pipeline — 4 chunks, 684 tests passing

Hermes Intelligence Pipeline closes breakpoints in ZCLAW's existing
intelligence components with 4 self-contained modules:

Chunk 1 — Self-improvement Loop:
- ExperienceStore (zclaw-growth): FTS5+TF-IDF wrapper with scope prefix
- ExperienceExtractor (desktop/intelligence): template-based extraction
  from successful proposals with implicit keyword detection

Chunk 2 — User Modeling:
- UserProfileStore (zclaw-memory): SQLite-backed structured profiles
  with industry/role/expertise/comm_style/recent_topics/pain_points
- UserProfiler (desktop/intelligence): fact classification by category
  (Preference/Knowledge/Behavior) with profile summary formatting

Chunk 3 — NL Cron Chinese Time Parser:
- NlScheduleParser (zclaw-runtime): 6 pattern matchers for Chinese time
  expressions (每天/每周/工作日/间隔/每月/一次性) producing cron expressions
- Period-aware hour adjustment (下午3点→15, 晚上8点→20)
- Schedule intent detection + task description extraction

Chunk 4 — Trajectory Compression:
- TrajectoryStore (zclaw-memory): trajectory_events + compressed_trajectories
- TrajectoryRecorderMiddleware (zclaw-runtime/middleware): priority 650,
  async non-blocking event recording via tokio::spawn
- TrajectoryCompressor (desktop/intelligence): dedup, request classification,
  satisfaction detection, execution chain JSON

Schema migrations: v2→v3 (user_profiles), v3→v4 (trajectory tables)
This commit is contained in:
iven
2026-04-09 17:47:43 +08:00
parent 0883bb28ff
commit 4b15ead8e7
15 changed files with 4225 additions and 0 deletions

View File

@@ -0,0 +1,356 @@
//! ExperienceStore — CRUD wrapper over VikingStorage for agent experiences.
//!
//! Stores structured experiences extracted from successful solution proposals
//! using the scope prefix `agent://{agent_id}/experience/{pattern_hash}`.
//! Leverages existing FTS5 + TF-IDF + embedding retrieval via VikingAdapter.
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use uuid::Uuid;
use crate::types::{MemoryEntry, MemoryType};
use crate::viking_adapter::{FindOptions, VikingAdapter};
// ---------------------------------------------------------------------------
// Experience data model
// ---------------------------------------------------------------------------
/// A structured experience record representing a solved pain point.
///
/// Stored as JSON content inside a VikingStorage `MemoryEntry` with
/// `memory_type = Experience`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Experience {
/// Unique experience identifier.
pub id: String,
/// Owning agent.
pub agent_id: String,
/// Short pattern describing the pain that was solved (e.g. "logistics export packaging").
pub pain_pattern: String,
/// Context in which the problem occurred.
pub context: String,
/// Ordered steps that resolved the problem.
pub solution_steps: Vec<String>,
/// Verbal outcome reported by the user.
pub outcome: String,
/// How many times this experience has been reused as a reference.
pub reuse_count: u32,
/// Timestamp of initial creation.
pub created_at: DateTime<Utc>,
/// Timestamp of most recent reuse or update.
pub updated_at: DateTime<Utc>,
}
impl Experience {
/// Create a new experience with the given fields.
pub fn new(
agent_id: &str,
pain_pattern: &str,
context: &str,
solution_steps: Vec<String>,
outcome: &str,
) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4().to_string(),
agent_id: agent_id.to_string(),
pain_pattern: pain_pattern.to_string(),
context: context.to_string(),
solution_steps,
outcome: outcome.to_string(),
reuse_count: 0,
created_at: now,
updated_at: now,
}
}
/// Deterministic URI for this experience, keyed on a stable hash of the
/// pain pattern so duplicate patterns overwrite the same entry.
pub fn uri(&self) -> String {
let hash = simple_hash(&self.pain_pattern);
format!("agent://{}/experience/{}", self.agent_id, hash)
}
}
/// FNV-1ainspired stable 8-hex-char hash. Good enough for deduplication;
/// collisions are acceptable because the full `pain_pattern` is still stored.
fn simple_hash(s: &str) -> String {
let mut h: u32 = 2166136261;
for b in s.as_bytes() {
h ^= *b as u32;
h = h.wrapping_mul(16777619);
}
format!("{:08x}", h)
}
// ---------------------------------------------------------------------------
// ExperienceStore
// ---------------------------------------------------------------------------
/// CRUD wrapper that persists [`Experience`] records through [`VikingAdapter`].
pub struct ExperienceStore {
viking: Arc<VikingAdapter>,
}
impl ExperienceStore {
/// Create a new store backed by the given VikingAdapter.
pub fn new(viking: Arc<VikingAdapter>) -> Self {
Self { viking }
}
/// Store (or overwrite) an experience. The URI is derived from
/// `agent_id + pain_pattern`, ensuring one experience per pattern.
pub async fn store_experience(&self, exp: &Experience) -> zclaw_types::Result<()> {
let uri = exp.uri();
let content = serde_json::to_string(exp)?;
let mut keywords = vec![exp.pain_pattern.clone()];
keywords.extend(exp.solution_steps.iter().take(3).cloned());
let entry = MemoryEntry {
uri,
memory_type: MemoryType::Experience,
content,
keywords,
importance: 8,
access_count: 0,
created_at: exp.created_at,
last_accessed: exp.updated_at,
overview: Some(exp.pain_pattern.clone()),
abstract_summary: Some(exp.outcome.clone()),
};
self.viking.store(&entry).await?;
debug!("[ExperienceStore] Stored experience {} for agent {}", exp.id, exp.agent_id);
Ok(())
}
/// Find experiences whose pain pattern matches the given query.
pub async fn find_by_pattern(
&self,
agent_id: &str,
pattern_query: &str,
) -> zclaw_types::Result<Vec<Experience>> {
let scope = format!("agent://{}/experience/", agent_id);
let opts = FindOptions {
scope: Some(scope),
limit: Some(10),
min_similarity: None,
};
let entries = self.viking.find(pattern_query, opts).await?;
let mut results = Vec::with_capacity(entries.len());
for entry in entries {
match serde_json::from_str::<Experience>(&entry.content) {
Ok(exp) => results.push(exp),
Err(e) => warn!("[ExperienceStore] Failed to deserialize experience at {}: {}", entry.uri, e),
}
}
Ok(results)
}
/// Return all experiences for a given agent.
pub async fn find_by_agent(
&self,
agent_id: &str,
) -> zclaw_types::Result<Vec<Experience>> {
let prefix = format!("agent://{}/experience/", agent_id);
let entries = self.viking.find_by_prefix(&prefix).await?;
let mut results = Vec::with_capacity(entries.len());
for entry in entries {
match serde_json::from_str::<Experience>(&entry.content) {
Ok(exp) => results.push(exp),
Err(e) => warn!("[ExperienceStore] Failed to deserialize experience at {}: {}", entry.uri, e),
}
}
Ok(results)
}
/// Increment the reuse counter for an existing experience.
/// On failure, logs a warning but does **not** propagate the error so
/// callers are never blocked.
pub async fn increment_reuse(&self, exp: &Experience) {
let mut updated = exp.clone();
updated.reuse_count += 1;
updated.updated_at = Utc::now();
if let Err(e) = self.store_experience(&updated).await {
warn!("[ExperienceStore] Failed to increment reuse for {}: {}", exp.id, e);
}
}
/// Delete a single experience by its URI.
pub async fn delete(&self, exp: &Experience) -> zclaw_types::Result<()> {
let uri = exp.uri();
self.viking.delete(&uri).await?;
debug!("[ExperienceStore] Deleted experience {} for agent {}", exp.id, exp.agent_id);
Ok(())
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_experience_new() {
let exp = Experience::new(
"agent-1",
"logistics export packaging",
"export packaging rejected by customs",
vec!["check regulations".into(), "use approved materials".into()],
"packaging passed customs",
);
assert!(!exp.id.is_empty());
assert_eq!(exp.agent_id, "agent-1");
assert_eq!(exp.solution_steps.len(), 2);
assert_eq!(exp.reuse_count, 0);
}
#[test]
fn test_uri_deterministic() {
let exp1 = Experience::new(
"agent-1", "packaging issue", "ctx",
vec!["step1".into()], "ok",
);
// Second experience with same agent + pattern should produce the same URI.
let mut exp2 = exp1.clone();
exp2.id = "different-id".to_string();
assert_eq!(exp1.uri(), exp2.uri());
}
#[test]
fn test_uri_differs_for_different_patterns() {
let exp_a = Experience::new(
"agent-1", "packaging issue", "ctx",
vec!["step1".into()], "ok",
);
let exp_b = Experience::new(
"agent-1", "compliance gap", "ctx",
vec!["step1".into()], "ok",
);
assert_ne!(exp_a.uri(), exp_b.uri());
}
#[test]
fn test_simple_hash_stability() {
let h1 = simple_hash("hello world");
let h2 = simple_hash("hello world");
assert_eq!(h1, h2);
assert_eq!(h1.len(), 8);
}
#[tokio::test]
async fn test_store_and_find_by_agent() {
let viking = Arc::new(VikingAdapter::in_memory());
let store = ExperienceStore::new(viking);
let exp = Experience::new(
"agent-42",
"export document errors",
"recurring mistakes in export docs",
vec!["use template".into(), "auto-validate".into()],
"no more errors",
);
store.store_experience(&exp).await.unwrap();
let found = store.find_by_agent("agent-42").await.unwrap();
assert_eq!(found.len(), 1);
assert_eq!(found[0].pain_pattern, "export document errors");
assert_eq!(found[0].solution_steps.len(), 2);
}
#[tokio::test]
async fn test_store_overwrites_same_pattern() {
let viking = Arc::new(VikingAdapter::in_memory());
let store = ExperienceStore::new(viking);
let exp_v1 = Experience::new(
"agent-1", "packaging", "v1",
vec!["old step".into()], "ok",
);
store.store_experience(&exp_v1).await.unwrap();
let exp_v2 = Experience::new(
"agent-1", "packaging", "v2 updated",
vec!["new step".into()], "better",
);
// Force same URI by reusing the ID logic — same pattern → same URI.
store.store_experience(&exp_v2).await.unwrap();
let found = store.find_by_agent("agent-1").await.unwrap();
// Should be overwritten, not duplicated (same URI).
assert_eq!(found.len(), 1);
assert_eq!(found[0].context, "v2 updated");
}
#[tokio::test]
async fn test_find_by_pattern() {
let viking = Arc::new(VikingAdapter::in_memory());
let store = ExperienceStore::new(viking);
let exp = Experience::new(
"agent-1",
"logistics packaging compliance",
"export compliance issues",
vec!["check regulations".into()],
"passed audit",
);
store.store_experience(&exp).await.unwrap();
let found = store.find_by_pattern("agent-1", "packaging").await.unwrap();
assert_eq!(found.len(), 1);
}
#[tokio::test]
async fn test_increment_reuse() {
let viking = Arc::new(VikingAdapter::in_memory());
let store = ExperienceStore::new(viking);
let exp = Experience::new(
"agent-1", "packaging", "ctx",
vec!["step".into()], "ok",
);
store.store_experience(&exp).await.unwrap();
store.increment_reuse(&exp).await;
let found = store.find_by_agent("agent-1").await.unwrap();
assert_eq!(found[0].reuse_count, 1);
}
#[tokio::test]
async fn test_delete_experience() {
let viking = Arc::new(VikingAdapter::in_memory());
let store = ExperienceStore::new(viking);
let exp = Experience::new(
"agent-1", "packaging", "ctx",
vec!["step".into()], "ok",
);
store.store_experience(&exp).await.unwrap();
store.delete(&exp).await.unwrap();
let found = store.find_by_agent("agent-1").await.unwrap();
assert!(found.is_empty());
}
#[tokio::test]
async fn test_find_by_agent_filters_other_agents() {
let viking = Arc::new(VikingAdapter::in_memory());
let store = ExperienceStore::new(viking);
let exp_a = Experience::new("agent-a", "packaging", "ctx", vec!["s".into()], "ok");
let exp_b = Experience::new("agent-b", "compliance", "ctx", vec!["s".into()], "ok");
store.store_experience(&exp_a).await.unwrap();
store.store_experience(&exp_b).await.unwrap();
let found_a = store.find_by_agent("agent-a").await.unwrap();
assert_eq!(found_a.len(), 1);
assert_eq!(found_a[0].pain_pattern, "packaging");
}
}

View File

@@ -64,6 +64,7 @@ pub mod viking_adapter;
pub mod storage; pub mod storage;
pub mod retrieval; pub mod retrieval;
pub mod summarizer; pub mod summarizer;
pub mod experience_store;
// Re-export main types for convenience // Re-export main types for convenience
pub use types::{ pub use types::{
@@ -85,6 +86,7 @@ pub use injector::{InjectionFormat, PromptInjector};
pub use tracker::{AgentMetadata, GrowthTracker, LearningEvent}; pub use tracker::{AgentMetadata, GrowthTracker, LearningEvent};
pub use viking_adapter::{FindOptions, VikingAdapter, VikingLevel, VikingStorage}; pub use viking_adapter::{FindOptions, VikingAdapter, VikingLevel, VikingStorage};
pub use storage::SqliteStorage; pub use storage::SqliteStorage;
pub use experience_store::{Experience, ExperienceStore};
pub use retrieval::{EmbeddingClient, MemoryCache, QueryAnalyzer, SemanticScorer}; pub use retrieval::{EmbeddingClient, MemoryCache, QueryAnalyzer, SemanticScorer};
pub use summarizer::SummaryLlmDriver; pub use summarizer::SummaryLlmDriver;

View File

@@ -6,8 +6,15 @@ mod store;
mod session; mod session;
mod schema; mod schema;
pub mod fact; pub mod fact;
pub mod user_profile_store;
pub mod trajectory_store;
pub use store::*; pub use store::*;
pub use session::*; pub use session::*;
pub use schema::*; pub use schema::*;
pub use fact::{Fact, FactCategory, ExtractedFactBatch}; pub use fact::{Fact, FactCategory, ExtractedFactBatch};
pub use user_profile_store::{UserProfileStore, UserProfile, Level, CommStyle};
pub use trajectory_store::{
TrajectoryEvent, TrajectoryStore, TrajectoryStepType,
CompressedTrajectory, CompletionStatus, SatisfactionSignal,
};

View File

@@ -93,4 +93,47 @@ pub const MIGRATIONS: &[&str] = &[
// v1→v2: persist runtime state and message count // v1→v2: persist runtime state and message count
"ALTER TABLE agents ADD COLUMN state TEXT NOT NULL DEFAULT 'running'", "ALTER TABLE agents ADD COLUMN state TEXT NOT NULL DEFAULT 'running'",
"ALTER TABLE agents ADD COLUMN message_count INTEGER NOT NULL DEFAULT 0", "ALTER TABLE agents ADD COLUMN message_count INTEGER NOT NULL DEFAULT 0",
// v2→v3: user profiles for structured user modeling
"CREATE TABLE IF NOT EXISTS user_profiles (
user_id TEXT PRIMARY KEY,
industry TEXT,
role TEXT,
expertise_level TEXT,
communication_style TEXT,
preferred_language TEXT DEFAULT 'zh-CN',
recent_topics TEXT DEFAULT '[]',
active_pain_points TEXT DEFAULT '[]',
preferred_tools TEXT DEFAULT '[]',
confidence REAL DEFAULT 0.0,
updated_at TEXT NOT NULL
)",
// v3→v4: trajectory recording for tool-call chain analysis
"CREATE TABLE IF NOT EXISTS trajectory_events (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
step_index INTEGER NOT NULL,
step_type TEXT NOT NULL,
input_summary TEXT,
output_summary TEXT,
duration_ms INTEGER DEFAULT 0,
timestamp TEXT NOT NULL
)",
"CREATE INDEX IF NOT EXISTS idx_trajectory_session ON trajectory_events(session_id)",
"CREATE TABLE IF NOT EXISTS compressed_trajectories (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
request_type TEXT NOT NULL,
tools_used TEXT,
outcome TEXT NOT NULL,
total_steps INTEGER DEFAULT 0,
total_duration_ms INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
execution_chain TEXT NOT NULL,
satisfaction_signal TEXT,
created_at TEXT NOT NULL
)",
"CREATE INDEX IF NOT EXISTS idx_ct_request_type ON compressed_trajectories(request_type)",
"CREATE INDEX IF NOT EXISTS idx_ct_outcome ON compressed_trajectories(outcome)",
]; ];

View File

@@ -0,0 +1,563 @@
//! Trajectory Store -- record and compress tool-call chains for analysis.
//!
//! Stores raw trajectory events (user requests, tool calls, LLM generations)
//! and compressed trajectory summaries. Used by the Hermes Intelligence Pipeline
//! to analyze agent behaviour patterns and improve routing over time.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use zclaw_types::{Result, ZclawError};
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/// Step type in a trajectory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TrajectoryStepType {
UserRequest,
IntentClassification,
SkillSelection,
ToolExecution,
LlmGeneration,
UserFeedback,
}
impl TrajectoryStepType {
/// Serialize to the string stored in SQLite.
pub fn as_str(&self) -> &'static str {
match self {
Self::UserRequest => "user_request",
Self::IntentClassification => "intent_classification",
Self::SkillSelection => "skill_selection",
Self::ToolExecution => "tool_execution",
Self::LlmGeneration => "llm_generation",
Self::UserFeedback => "user_feedback",
}
}
/// Deserialize from the SQLite string representation.
pub fn from_str_lossy(s: &str) -> Self {
match s {
"user_request" => Self::UserRequest,
"intent_classification" => Self::IntentClassification,
"skill_selection" => Self::SkillSelection,
"tool_execution" => Self::ToolExecution,
"llm_generation" => Self::LlmGeneration,
"user_feedback" => Self::UserFeedback,
_ => Self::UserRequest,
}
}
}
/// Single trajectory event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrajectoryEvent {
pub id: String,
pub session_id: String,
pub agent_id: String,
pub step_index: usize,
pub step_type: TrajectoryStepType,
/// Summarised input (max 200 chars).
pub input_summary: String,
/// Summarised output (max 200 chars).
pub output_summary: String,
pub duration_ms: u64,
pub timestamp: DateTime<Utc>,
}
/// Satisfaction signal inferred from user feedback.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SatisfactionSignal {
Positive,
Negative,
Neutral,
}
impl SatisfactionSignal {
pub fn as_str(&self) -> &'static str {
match self {
Self::Positive => "positive",
Self::Negative => "negative",
Self::Neutral => "neutral",
}
}
pub fn from_str_lossy(s: &str) -> Option<Self> {
match s {
"positive" => Some(Self::Positive),
"negative" => Some(Self::Negative),
"neutral" => Some(Self::Neutral),
_ => None,
}
}
}
/// Completion status of a compressed trajectory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompletionStatus {
Success,
Partial,
Failed,
Abandoned,
}
impl CompletionStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Success => "success",
Self::Partial => "partial",
Self::Failed => "failed",
Self::Abandoned => "abandoned",
}
}
pub fn from_str_lossy(s: &str) -> Self {
match s {
"success" => Self::Success,
"partial" => Self::Partial,
"failed" => Self::Failed,
"abandoned" => Self::Abandoned,
_ => Self::Success,
}
}
}
/// Compressed trajectory (generated at session end).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressedTrajectory {
pub id: String,
pub session_id: String,
pub agent_id: String,
pub request_type: String,
pub tools_used: Vec<String>,
pub outcome: CompletionStatus,
pub total_steps: usize,
pub total_duration_ms: u64,
pub total_tokens: u32,
/// Serialised JSON execution chain for analysis.
pub execution_chain: String,
pub satisfaction_signal: Option<SatisfactionSignal>,
pub created_at: DateTime<Utc>,
}
// ---------------------------------------------------------------------------
// Store
// ---------------------------------------------------------------------------
/// Persistent store for trajectory events and compressed trajectories.
pub struct TrajectoryStore {
pool: SqlitePool,
}
impl TrajectoryStore {
/// Create a new `TrajectoryStore` backed by the given SQLite pool.
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
/// Create the required tables. Idempotent -- safe to call on startup.
pub async fn initialize_schema(&self) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS trajectory_events (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
step_index INTEGER NOT NULL,
step_type TEXT NOT NULL,
input_summary TEXT,
output_summary TEXT,
duration_ms INTEGER DEFAULT 0,
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_trajectory_session ON trajectory_events(session_id);
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS compressed_trajectories (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
request_type TEXT NOT NULL,
tools_used TEXT,
outcome TEXT NOT NULL,
total_steps INTEGER DEFAULT 0,
total_duration_ms INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
execution_chain TEXT NOT NULL,
satisfaction_signal TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_ct_request_type ON compressed_trajectories(request_type);
CREATE INDEX IF NOT EXISTS idx_ct_outcome ON compressed_trajectories(outcome);
"#,
)
.execute(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
Ok(())
}
/// Insert a raw trajectory event.
pub async fn insert_event(&self, event: &TrajectoryEvent) -> Result<()> {
sqlx::query(
r#"
INSERT INTO trajectory_events
(id, session_id, agent_id, step_index, step_type,
input_summary, output_summary, duration_ms, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&event.id)
.bind(&event.session_id)
.bind(&event.agent_id)
.bind(event.step_index as i64)
.bind(event.step_type.as_str())
.bind(&event.input_summary)
.bind(&event.output_summary)
.bind(event.duration_ms as i64)
.bind(event.timestamp.to_rfc3339())
.execute(&self.pool)
.await
.map_err(|e| {
tracing::warn!("[TrajectoryStore] insert_event failed: {}", e);
ZclawError::StorageError(e.to_string())
})?;
Ok(())
}
/// Retrieve all raw events for a session, ordered by step_index.
pub async fn get_events_by_session(&self, session_id: &str) -> Result<Vec<TrajectoryEvent>> {
let rows = sqlx::query_as::<_, (String, String, String, i64, String, Option<String>, Option<String>, Option<i64>, String)>(
r#"
SELECT id, session_id, agent_id, step_index, step_type,
input_summary, output_summary, duration_ms, timestamp
FROM trajectory_events
WHERE session_id = ?
ORDER BY step_index ASC
"#,
)
.bind(session_id)
.fetch_all(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
let mut events = Vec::with_capacity(rows.len());
for (id, sid, aid, step_idx, stype, input_s, output_s, dur_ms, ts) in rows {
let timestamp = DateTime::parse_from_rfc3339(&ts)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
events.push(TrajectoryEvent {
id,
session_id: sid,
agent_id: aid,
step_index: step_idx as usize,
step_type: TrajectoryStepType::from_str_lossy(&stype),
input_summary: input_s.unwrap_or_default(),
output_summary: output_s.unwrap_or_default(),
duration_ms: dur_ms.unwrap_or(0) as u64,
timestamp,
});
}
Ok(events)
}
/// Insert a compressed trajectory.
pub async fn insert_compressed(&self, trajectory: &CompressedTrajectory) -> Result<()> {
let tools_json = serde_json::to_string(&trajectory.tools_used)
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
sqlx::query(
r#"
INSERT INTO compressed_trajectories
(id, session_id, agent_id, request_type, tools_used,
outcome, total_steps, total_duration_ms, total_tokens,
execution_chain, satisfaction_signal, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&trajectory.id)
.bind(&trajectory.session_id)
.bind(&trajectory.agent_id)
.bind(&trajectory.request_type)
.bind(&tools_json)
.bind(trajectory.outcome.as_str())
.bind(trajectory.total_steps as i64)
.bind(trajectory.total_duration_ms as i64)
.bind(trajectory.total_tokens as i64)
.bind(&trajectory.execution_chain)
.bind(trajectory.satisfaction_signal.map(|s| s.as_str()))
.bind(trajectory.created_at.to_rfc3339())
.execute(&self.pool)
.await
.map_err(|e| {
tracing::warn!("[TrajectoryStore] insert_compressed failed: {}", e);
ZclawError::StorageError(e.to_string())
})?;
Ok(())
}
/// Retrieve the compressed trajectory for a session, if any.
pub async fn get_compressed_by_session(&self, session_id: &str) -> Result<Option<CompressedTrajectory>> {
let row = sqlx::query_as::<_, (
String, String, String, String, Option<String>,
String, i64, i64, i64, String, Option<String>, String,
)>(
r#"
SELECT id, session_id, agent_id, request_type, tools_used,
outcome, total_steps, total_duration_ms, total_tokens,
execution_chain, satisfaction_signal, created_at
FROM compressed_trajectories
WHERE session_id = ?
"#,
)
.bind(session_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
match row {
Some((id, sid, aid, req_type, tools_json, outcome_str, steps, dur_ms, tokens, chain, sat, created)) => {
let tools_used: Vec<String> = tools_json
.as_deref()
.and_then(|j| serde_json::from_str(j).ok())
.unwrap_or_default();
let timestamp = DateTime::parse_from_rfc3339(&created)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
Ok(Some(CompressedTrajectory {
id,
session_id: sid,
agent_id: aid,
request_type: req_type,
tools_used,
outcome: CompletionStatus::from_str_lossy(&outcome_str),
total_steps: steps as usize,
total_duration_ms: dur_ms as u64,
total_tokens: tokens as u32,
execution_chain: chain,
satisfaction_signal: sat.as_deref().and_then(SatisfactionSignal::from_str_lossy),
created_at: timestamp,
}))
}
None => Ok(None),
}
}
/// Delete raw trajectory events older than `days` days. Returns count deleted.
pub async fn delete_events_older_than(&self, days: i64) -> Result<u64> {
let result = sqlx::query(
r#"
DELETE FROM trajectory_events
WHERE timestamp < datetime('now', ?)
"#,
)
.bind(format!("-{} days", days))
.execute(&self.pool)
.await
.map_err(|e| {
tracing::warn!("[TrajectoryStore] delete_events_older_than failed: {}", e);
ZclawError::StorageError(e.to_string())
})?;
Ok(result.rows_affected())
}
/// Delete compressed trajectories older than `days` days. Returns count deleted.
pub async fn delete_compressed_older_than(&self, days: i64) -> Result<u64> {
let result = sqlx::query(
r#"
DELETE FROM compressed_trajectories
WHERE created_at < datetime('now', ?)
"#,
)
.bind(format!("-{} days", days))
.execute(&self.pool)
.await
.map_err(|e| {
tracing::warn!("[TrajectoryStore] delete_compressed_older_than failed: {}", e);
ZclawError::StorageError(e.to_string())
})?;
Ok(result.rows_affected())
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
async fn test_store() -> TrajectoryStore {
let pool = SqlitePool::connect("sqlite::memory:")
.await
.expect("in-memory pool");
let store = TrajectoryStore::new(pool);
store.initialize_schema().await.expect("schema init");
store
}
fn sample_event(index: usize) -> TrajectoryEvent {
TrajectoryEvent {
id: format!("evt-{}", index),
session_id: "sess-1".to_string(),
agent_id: "agent-1".to_string(),
step_index: index,
step_type: TrajectoryStepType::ToolExecution,
input_summary: "search query".to_string(),
output_summary: "3 results found".to_string(),
duration_ms: 150,
timestamp: Utc::now(),
}
}
#[tokio::test]
async fn test_insert_and_get_events() {
let store = test_store().await;
let e1 = sample_event(0);
let e2 = TrajectoryEvent {
id: "evt-1".to_string(),
step_index: 1,
step_type: TrajectoryStepType::LlmGeneration,
..sample_event(0)
};
store.insert_event(&e1).await.unwrap();
store.insert_event(&e2).await.unwrap();
let events = store.get_events_by_session("sess-1").await.unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].step_index, 0);
assert_eq!(events[1].step_index, 1);
assert_eq!(events[0].step_type, TrajectoryStepType::ToolExecution);
assert_eq!(events[1].step_type, TrajectoryStepType::LlmGeneration);
}
#[tokio::test]
async fn test_get_events_empty_session() {
let store = test_store().await;
let events = store.get_events_by_session("nonexistent").await.unwrap();
assert!(events.is_empty());
}
#[tokio::test]
async fn test_insert_and_get_compressed() {
let store = test_store().await;
let ct = CompressedTrajectory {
id: "ct-1".to_string(),
session_id: "sess-1".to_string(),
agent_id: "agent-1".to_string(),
request_type: "data_query".to_string(),
tools_used: vec!["search".to_string(), "calculate".to_string()],
outcome: CompletionStatus::Success,
total_steps: 5,
total_duration_ms: 1200,
total_tokens: 350,
execution_chain: r#"[{"step":0,"type":"tool_execution"}]"#.to_string(),
satisfaction_signal: Some(SatisfactionSignal::Positive),
created_at: Utc::now(),
};
store.insert_compressed(&ct).await.unwrap();
let loaded = store.get_compressed_by_session("sess-1").await.unwrap();
assert!(loaded.is_some());
let loaded = loaded.unwrap();
assert_eq!(loaded.id, "ct-1");
assert_eq!(loaded.request_type, "data_query");
assert_eq!(loaded.tools_used.len(), 2);
assert_eq!(loaded.outcome, CompletionStatus::Success);
assert_eq!(loaded.satisfaction_signal, Some(SatisfactionSignal::Positive));
}
#[tokio::test]
async fn test_get_compressed_nonexistent() {
let store = test_store().await;
let result = store.get_compressed_by_session("nonexistent").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_step_type_roundtrip() {
let all_types = [
TrajectoryStepType::UserRequest,
TrajectoryStepType::IntentClassification,
TrajectoryStepType::SkillSelection,
TrajectoryStepType::ToolExecution,
TrajectoryStepType::LlmGeneration,
TrajectoryStepType::UserFeedback,
];
for st in all_types {
assert_eq!(TrajectoryStepType::from_str_lossy(st.as_str()), st);
}
}
#[tokio::test]
async fn test_satisfaction_signal_roundtrip() {
let signals = [SatisfactionSignal::Positive, SatisfactionSignal::Negative, SatisfactionSignal::Neutral];
for sig in signals {
assert_eq!(SatisfactionSignal::from_str_lossy(sig.as_str()), Some(sig));
}
assert_eq!(SatisfactionSignal::from_str_lossy("bogus"), None);
}
#[tokio::test]
async fn test_completion_status_roundtrip() {
let statuses = [CompletionStatus::Success, CompletionStatus::Partial, CompletionStatus::Failed, CompletionStatus::Abandoned];
for s in statuses {
assert_eq!(CompletionStatus::from_str_lossy(s.as_str()), s);
}
}
#[tokio::test]
async fn test_delete_events_older_than() {
let store = test_store().await;
// Insert an event with a timestamp far in the past
let old_event = TrajectoryEvent {
id: "old-evt".to_string(),
timestamp: Utc::now() - chrono::Duration::days(100),
..sample_event(0)
};
store.insert_event(&old_event).await.unwrap();
// Insert a recent event
let recent_event = TrajectoryEvent {
id: "recent-evt".to_string(),
step_index: 1,
..sample_event(0)
};
store.insert_event(&recent_event).await.unwrap();
let deleted = store.delete_events_older_than(30).await.unwrap();
assert_eq!(deleted, 1);
let remaining = store.get_events_by_session("sess-1").await.unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].id, "recent-evt");
}
}

View File

@@ -0,0 +1,592 @@
//! User Profile Store — structured user modeling from conversation patterns.
//!
//! Maintains a single `UserProfile` per user (desktop uses "default_user")
//! in a dedicated SQLite table. Vec fields (recent_topics, pain points,
//! preferred_tools) are stored as JSON arrays and transparently
//! (de)serialised on read/write.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::Row;
use sqlx::SqlitePool;
use zclaw_types::Result;
// ---------------------------------------------------------------------------
// Data types
// ---------------------------------------------------------------------------
/// Expertise level inferred from conversation patterns.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Level {
Beginner,
Intermediate,
Expert,
}
impl Level {
pub fn as_str(&self) -> &'static str {
match self {
Level::Beginner => "beginner",
Level::Intermediate => "intermediate",
Level::Expert => "expert",
}
}
pub fn from_str_lossy(s: &str) -> Option<Self> {
match s {
"beginner" => Some(Level::Beginner),
"intermediate" => Some(Level::Intermediate),
"expert" => Some(Level::Expert),
_ => None,
}
}
}
/// Communication style preference.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CommStyle {
Concise,
Detailed,
Formal,
Casual,
}
impl CommStyle {
pub fn as_str(&self) -> &'static str {
match self {
CommStyle::Concise => "concise",
CommStyle::Detailed => "detailed",
CommStyle::Formal => "formal",
CommStyle::Casual => "casual",
}
}
pub fn from_str_lossy(s: &str) -> Option<Self> {
match s {
"concise" => Some(CommStyle::Concise),
"detailed" => Some(CommStyle::Detailed),
"formal" => Some(CommStyle::Formal),
"casual" => Some(CommStyle::Casual),
_ => None,
}
}
}
/// Structured user profile (one record per user).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserProfile {
pub user_id: String,
pub industry: Option<String>,
pub role: Option<String>,
pub expertise_level: Option<Level>,
pub communication_style: Option<CommStyle>,
pub preferred_language: String,
pub recent_topics: Vec<String>,
pub active_pain_points: Vec<String>,
pub preferred_tools: Vec<String>,
pub confidence: f32,
pub updated_at: DateTime<Utc>,
}
impl UserProfile {
/// Create a blank profile for the given user.
pub fn blank(user_id: &str) -> Self {
Self {
user_id: user_id.to_string(),
industry: None,
role: None,
expertise_level: None,
communication_style: None,
preferred_language: "zh-CN".to_string(),
recent_topics: Vec::new(),
active_pain_points: Vec::new(),
preferred_tools: Vec::new(),
confidence: 0.0,
updated_at: Utc::now(),
}
}
/// Default profile for single-user desktop mode ("default_user").
pub fn default_profile() -> Self {
Self::blank("default_user")
}
}
// ---------------------------------------------------------------------------
// DDL
// ---------------------------------------------------------------------------
const PROFILE_DDL: &str = r#"
CREATE TABLE IF NOT EXISTS user_profiles (
user_id TEXT PRIMARY KEY,
industry TEXT,
role TEXT,
expertise_level TEXT,
communication_style TEXT,
preferred_language TEXT DEFAULT 'zh-CN',
recent_topics TEXT DEFAULT '[]',
active_pain_points TEXT DEFAULT '[]',
preferred_tools TEXT DEFAULT '[]',
confidence REAL DEFAULT 0.0,
updated_at TEXT NOT NULL
)
"#;
// ---------------------------------------------------------------------------
// Row mapping
// ---------------------------------------------------------------------------
fn row_to_profile(row: &sqlx::sqlite::SqliteRow) -> Result<UserProfile> {
let recent_topics_json: String = row.try_get("recent_topics").unwrap_or_else(|_| "[]".to_string());
let pain_json: String = row.try_get("active_pain_points").unwrap_or_else(|_| "[]".to_string());
let tools_json: String = row.try_get("preferred_tools").unwrap_or_else(|_| "[]".to_string());
let recent_topics: Vec<String> = serde_json::from_str(&recent_topics_json)?;
let active_pain_points: Vec<String> = serde_json::from_str(&pain_json)?;
let preferred_tools: Vec<String> = serde_json::from_str(&tools_json)?;
let expertise_str: Option<String> = row.try_get("expertise_level").unwrap_or(None);
let comm_str: Option<String> = row.try_get("communication_style").unwrap_or(None);
let updated_at_str: String = row.try_get("updated_at").unwrap_or_else(|_| Utc::now().to_rfc3339());
let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
Ok(UserProfile {
user_id: row.try_get("user_id").unwrap_or_default(),
industry: row.try_get("industry").unwrap_or(None),
role: row.try_get("role").unwrap_or(None),
expertise_level: expertise_str.as_deref().and_then(Level::from_str_lossy),
communication_style: comm_str.as_deref().and_then(CommStyle::from_str_lossy),
preferred_language: row.try_get("preferred_language").unwrap_or_else(|_| "zh-CN".to_string()),
recent_topics,
active_pain_points,
preferred_tools,
confidence: row.try_get("confidence").unwrap_or(0.0),
updated_at,
})
}
// ---------------------------------------------------------------------------
// UserProfileStore
// ---------------------------------------------------------------------------
/// SQLite-backed store for user profiles.
pub struct UserProfileStore {
pool: SqlitePool,
}
impl UserProfileStore {
/// Create a new store backed by the given connection pool.
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
/// Create tables. Idempotent — safe to call on every startup.
pub async fn initialize_schema(&self) -> Result<()> {
sqlx::query(PROFILE_DDL)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
Ok(())
}
/// Fetch the profile for a user. Returns `None` when no row exists.
pub async fn get(&self, user_id: &str) -> Result<Option<UserProfile>> {
let row = sqlx::query(
"SELECT user_id, industry, role, expertise_level, communication_style, \
preferred_language, recent_topics, active_pain_points, preferred_tools, \
confidence, updated_at \
FROM user_profiles WHERE user_id = ?",
)
.bind(user_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
match row {
Some(r) => Ok(Some(row_to_profile(&r)?)),
None => Ok(None),
}
}
/// Insert or replace the full profile.
pub async fn upsert(&self, profile: &UserProfile) -> Result<()> {
let topics = serde_json::to_string(&profile.recent_topics)?;
let pains = serde_json::to_string(&profile.active_pain_points)?;
let tools = serde_json::to_string(&profile.preferred_tools)?;
let expertise = profile.expertise_level.map(|l| l.as_str());
let comm = profile.communication_style.map(|c| c.as_str());
let updated = profile.updated_at.to_rfc3339();
sqlx::query(
"INSERT OR REPLACE INTO user_profiles \
(user_id, industry, role, expertise_level, communication_style, \
preferred_language, recent_topics, active_pain_points, preferred_tools, \
confidence, updated_at) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&profile.user_id)
.bind(&profile.industry)
.bind(&profile.role)
.bind(expertise)
.bind(comm)
.bind(&profile.preferred_language)
.bind(&topics)
.bind(&pains)
.bind(&tools)
.bind(profile.confidence)
.bind(&updated)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
Ok(())
}
/// Update a single scalar field by name.
///
/// `field` must be one of: industry, role, expertise_level,
/// communication_style, preferred_language, confidence.
/// Returns error for unrecognised field names (prevents SQL injection).
pub async fn update_field(&self, user_id: &str, field: &str, value: &str) -> Result<()> {
let sql = match field {
"industry" => "UPDATE user_profiles SET industry = ?, updated_at = ? WHERE user_id = ?",
"role" => "UPDATE user_profiles SET role = ?, updated_at = ? WHERE user_id = ?",
"expertise_level" => {
"UPDATE user_profiles SET expertise_level = ?, updated_at = ? WHERE user_id = ?"
}
"communication_style" => {
"UPDATE user_profiles SET communication_style = ?, updated_at = ? WHERE user_id = ?"
}
"preferred_language" => {
"UPDATE user_profiles SET preferred_language = ?, updated_at = ? WHERE user_id = ?"
}
"confidence" => {
"UPDATE user_profiles SET confidence = ?, updated_at = ? WHERE user_id = ?"
}
_ => {
return Err(zclaw_types::ZclawError::InvalidInput(format!(
"Unknown profile field: {}",
field
)));
}
};
let now = Utc::now().to_rfc3339();
// confidence is REAL; parse the value string.
if field == "confidence" {
let f: f32 = value.parse().map_err(|_| {
zclaw_types::ZclawError::InvalidInput(format!("Invalid confidence: {}", value))
})?;
sqlx::query(sql)
.bind(f)
.bind(&now)
.bind(user_id)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
} else {
sqlx::query(sql)
.bind(value)
.bind(&now)
.bind(user_id)
.execute(&self.pool)
.await
.map_err(|e| zclaw_types::ZclawError::StorageError(e.to_string()))?;
}
Ok(())
}
/// Append a topic to `recent_topics`, trimming to `max_topics`.
/// Creates a default profile row if none exists.
pub async fn add_recent_topic(
&self,
user_id: &str,
topic: &str,
max_topics: usize,
) -> Result<()> {
let mut profile = self
.get(user_id)
.await?
.unwrap_or_else(|| UserProfile::blank(user_id));
// Deduplicate: remove if already present, then push to front.
profile.recent_topics.retain(|t| t != topic);
profile.recent_topics.insert(0, topic.to_string());
profile.recent_topics.truncate(max_topics);
profile.updated_at = Utc::now();
self.upsert(&profile).await
}
/// Append a pain point, trimming to `max_pains`.
/// Creates a default profile row if none exists.
pub async fn add_pain_point(
&self,
user_id: &str,
pain: &str,
max_pains: usize,
) -> Result<()> {
let mut profile = self
.get(user_id)
.await?
.unwrap_or_else(|| UserProfile::blank(user_id));
profile.active_pain_points.retain(|p| p != pain);
profile.active_pain_points.insert(0, pain.to_string());
profile.active_pain_points.truncate(max_pains);
profile.updated_at = Utc::now();
self.upsert(&profile).await
}
/// Append a preferred tool, trimming to `max_tools`.
/// Creates a default profile row if none exists.
pub async fn add_preferred_tool(
&self,
user_id: &str,
tool: &str,
max_tools: usize,
) -> Result<()> {
let mut profile = self
.get(user_id)
.await?
.unwrap_or_else(|| UserProfile::blank(user_id));
profile.preferred_tools.retain(|t| t != tool);
profile.preferred_tools.insert(0, tool.to_string());
profile.preferred_tools.truncate(max_tools);
profile.updated_at = Utc::now();
self.upsert(&profile).await
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
/// Helper: create an in-memory store with schema.
async fn test_store() -> UserProfileStore {
let pool = SqlitePool::connect("sqlite::memory:")
.await
.expect("in-memory pool");
let store = UserProfileStore::new(pool);
store.initialize_schema().await.expect("schema init");
store
}
#[tokio::test]
async fn test_initialize_schema_idempotent() {
let store = test_store().await;
// Second call should succeed without error.
store.initialize_schema().await.unwrap();
store.initialize_schema().await.unwrap();
}
#[tokio::test]
async fn test_get_returns_none_for_missing() {
let store = test_store().await;
let profile = store.get("nonexistent").await.unwrap();
assert!(profile.is_none());
}
#[tokio::test]
async fn test_upsert_and_get() {
let store = test_store().await;
let mut profile = UserProfile::blank("default_user");
profile.industry = Some("healthcare".to_string());
profile.role = Some("admin".to_string());
profile.expertise_level = Some(Level::Intermediate);
profile.communication_style = Some(CommStyle::Concise);
profile.recent_topics = vec!["reporting".to_string(), "compliance".to_string()];
profile.confidence = 0.65;
store.upsert(&profile).await.unwrap();
let loaded = store.get("default_user").await.unwrap().unwrap();
assert_eq!(loaded.user_id, "default_user");
assert_eq!(loaded.industry.as_deref(), Some("healthcare"));
assert_eq!(loaded.role.as_deref(), Some("admin"));
assert_eq!(loaded.expertise_level, Some(Level::Intermediate));
assert_eq!(loaded.communication_style, Some(CommStyle::Concise));
assert_eq!(loaded.recent_topics, vec!["reporting", "compliance"]);
assert!((loaded.confidence - 0.65).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_upsert_replaces_existing() {
let store = test_store().await;
let mut profile = UserProfile::blank("user1");
profile.industry = Some("tech".to_string());
store.upsert(&profile).await.unwrap();
profile.industry = Some("finance".to_string());
store.upsert(&profile).await.unwrap();
let loaded = store.get("user1").await.unwrap().unwrap();
assert_eq!(loaded.industry.as_deref(), Some("finance"));
}
#[tokio::test]
async fn test_update_field_scalar() {
let store = test_store().await;
let profile = UserProfile::blank("user2");
store.upsert(&profile).await.unwrap();
store
.update_field("user2", "industry", "education")
.await
.unwrap();
store
.update_field("user2", "role", "teacher")
.await
.unwrap();
let loaded = store.get("user2").await.unwrap().unwrap();
assert_eq!(loaded.industry.as_deref(), Some("education"));
assert_eq!(loaded.role.as_deref(), Some("teacher"));
}
#[tokio::test]
async fn test_update_field_confidence() {
let store = test_store().await;
let profile = UserProfile::blank("user3");
store.upsert(&profile).await.unwrap();
store
.update_field("user3", "confidence", "0.88")
.await
.unwrap();
let loaded = store.get("user3").await.unwrap().unwrap();
assert!((loaded.confidence - 0.88).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_update_field_rejects_unknown() {
let store = test_store().await;
let result = store.update_field("user", "evil_column", "oops").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_add_recent_topic_auto_creates_profile() {
let store = test_store().await;
// No profile exists yet.
store
.add_recent_topic("new_user", "data analysis", 5)
.await
.unwrap();
let loaded = store.get("new_user").await.unwrap().unwrap();
assert_eq!(loaded.recent_topics, vec!["data analysis"]);
}
#[tokio::test]
async fn test_add_recent_topic_dedup_and_trim() {
let store = test_store().await;
let profile = UserProfile::blank("user");
store.upsert(&profile).await.unwrap();
store.add_recent_topic("user", "topic_a", 3).await.unwrap();
store.add_recent_topic("user", "topic_b", 3).await.unwrap();
store.add_recent_topic("user", "topic_c", 3).await.unwrap();
// Duplicate — should move to front, not add.
store.add_recent_topic("user", "topic_a", 3).await.unwrap();
let loaded = store.get("user").await.unwrap().unwrap();
assert_eq!(
loaded.recent_topics,
vec!["topic_a", "topic_c", "topic_b"]
);
}
#[tokio::test]
async fn test_add_pain_point_trim() {
let store = test_store().await;
for i in 0..5 {
store
.add_pain_point("user", &format!("pain_{}", i), 3)
.await
.unwrap();
}
let loaded = store.get("user").await.unwrap().unwrap();
assert_eq!(loaded.active_pain_points.len(), 3);
// Most recent first.
assert_eq!(loaded.active_pain_points[0], "pain_4");
}
#[tokio::test]
async fn test_add_preferred_tool_trim() {
let store = test_store().await;
store
.add_preferred_tool("user", "python", 5)
.await
.unwrap();
store
.add_preferred_tool("user", "rust", 5)
.await
.unwrap();
// Duplicate — moved to front.
store
.add_preferred_tool("user", "python", 5)
.await
.unwrap();
let loaded = store.get("user").await.unwrap().unwrap();
assert_eq!(loaded.preferred_tools, vec!["python", "rust"]);
}
#[test]
fn test_level_round_trip() {
for level in [Level::Beginner, Level::Intermediate, Level::Expert] {
assert_eq!(Level::from_str_lossy(level.as_str()), Some(level));
}
assert_eq!(Level::from_str_lossy("unknown"), None);
}
#[test]
fn test_comm_style_round_trip() {
for style in [
CommStyle::Concise,
CommStyle::Detailed,
CommStyle::Formal,
CommStyle::Casual,
] {
assert_eq!(CommStyle::from_str_lossy(style.as_str()), Some(style));
}
assert_eq!(CommStyle::from_str_lossy("unknown"), None);
}
#[test]
fn test_profile_serialization() {
let mut p = UserProfile::blank("test_user");
p.industry = Some("logistics".into());
p.expertise_level = Some(Level::Expert);
p.communication_style = Some(CommStyle::Detailed);
p.recent_topics = vec!["exports".into(), "customs".into()];
let json = serde_json::to_string(&p).unwrap();
let decoded: UserProfile = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.user_id, "test_user");
assert_eq!(decoded.industry.as_deref(), Some("logistics"));
assert_eq!(decoded.expertise_level, Some(Level::Expert));
assert_eq!(decoded.communication_style, Some(CommStyle::Detailed));
assert_eq!(decoded.recent_topics, vec!["exports", "customs"]);
}
}

View File

@@ -17,6 +17,7 @@ pub mod growth;
pub mod compaction; pub mod compaction;
pub mod middleware; pub mod middleware;
pub mod prompt; pub mod prompt;
pub mod nl_schedule;
// Re-export main types // Re-export main types
pub use driver::{ pub use driver::{

View File

@@ -278,3 +278,4 @@ pub mod title;
pub mod token_calibration; pub mod token_calibration;
pub mod tool_error; pub mod tool_error;
pub mod tool_output_guard; pub mod tool_output_guard;
pub mod trajectory_recorder;

View File

@@ -0,0 +1,231 @@
//! Trajectory Recorder Middleware — records tool-call chains for analysis.
//!
//! Priority 650 (telemetry range: after business middleware at 400-599,
//! before token_calibration at 700). Records events asynchronously via
//! `tokio::spawn` so the main conversation flow is never blocked.
use async_trait::async_trait;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::RwLock;
use zclaw_memory::trajectory_store::{
TrajectoryEvent, TrajectoryStepType, TrajectoryStore,
};
use zclaw_types::{Result, SessionId};
use crate::driver::ContentBlock;
use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision};
// ---------------------------------------------------------------------------
// Step counter per session
// ---------------------------------------------------------------------------
/// Tracks step indices per session so events are ordered correctly.
struct StepCounter {
counters: RwLock<Vec<(String, Arc<AtomicU64>)>>,
}
impl StepCounter {
fn new() -> Self {
Self {
counters: RwLock::new(Vec::new()),
}
}
async fn next(&self, session_id: &str) -> usize {
let map = self.counters.read().await;
for (sid, counter) in map.iter() {
if sid == session_id {
return counter.fetch_add(1, Ordering::Relaxed) as usize;
}
}
drop(map);
let mut map = self.counters.write().await;
// Double-check after acquiring write lock
for (sid, counter) in map.iter() {
if sid == session_id {
return counter.fetch_add(1, Ordering::Relaxed) as usize;
}
}
let counter = Arc::new(AtomicU64::new(1));
map.push((session_id.to_string(), counter.clone()));
0
}
}
// ---------------------------------------------------------------------------
// TrajectoryRecorderMiddleware
// ---------------------------------------------------------------------------
/// Middleware that records agent loop events into `TrajectoryStore`.
///
/// Hooks:
/// - `before_completion` → records UserRequest step
/// - `after_tool_call` → records ToolExecution step
/// - `after_completion` → records LlmGeneration step
pub struct TrajectoryRecorderMiddleware {
store: Arc<TrajectoryStore>,
step_counter: StepCounter,
}
impl TrajectoryRecorderMiddleware {
pub fn new(store: Arc<TrajectoryStore>) -> Self {
Self {
store,
step_counter: StepCounter::new(),
}
}
/// Spawn an async write — fire-and-forget, non-blocking.
fn spawn_write(&self, event: TrajectoryEvent) {
let store = self.store.clone();
tokio::spawn(async move {
if let Err(e) = store.insert_event(&event).await {
tracing::warn!(
"[TrajectoryRecorder] Async write failed (non-fatal): {}",
e
);
}
});
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
s.chars().take(max).collect::<String>() + ""
}
}
}
#[async_trait]
impl AgentMiddleware for TrajectoryRecorderMiddleware {
fn name(&self) -> &str {
"trajectory_recorder"
}
fn priority(&self) -> i32 {
650
}
async fn before_completion(
&self,
ctx: &mut MiddlewareContext,
) -> Result<MiddlewareDecision> {
if ctx.user_input.is_empty() {
return Ok(MiddlewareDecision::Continue);
}
let step = self.step_counter.next(&ctx.session_id.to_string()).await;
let event = TrajectoryEvent {
id: uuid::Uuid::new_v4().to_string(),
session_id: ctx.session_id.to_string(),
agent_id: ctx.agent_id.to_string(),
step_index: step,
step_type: TrajectoryStepType::UserRequest,
input_summary: Self::truncate(&ctx.user_input, 200),
output_summary: String::new(),
duration_ms: 0,
timestamp: chrono::Utc::now(),
};
self.spawn_write(event);
Ok(MiddlewareDecision::Continue)
}
async fn after_tool_call(
&self,
ctx: &mut MiddlewareContext,
tool_name: &str,
result: &serde_json::Value,
) -> Result<()> {
let step = self.step_counter.next(&ctx.session_id.to_string()).await;
let result_summary = match result {
serde_json::Value::String(s) => Self::truncate(s, 200),
serde_json::Value::Object(_) => {
let s = serde_json::to_string(result).unwrap_or_default();
Self::truncate(&s, 200)
}
other => Self::truncate(&other.to_string(), 200),
};
let event = TrajectoryEvent {
id: uuid::Uuid::new_v4().to_string(),
session_id: ctx.session_id.to_string(),
agent_id: ctx.agent_id.to_string(),
step_index: step,
step_type: TrajectoryStepType::ToolExecution,
input_summary: Self::truncate(tool_name, 200),
output_summary: result_summary,
duration_ms: 0,
timestamp: chrono::Utc::now(),
};
self.spawn_write(event);
Ok(())
}
async fn after_completion(&self, ctx: &MiddlewareContext) -> Result<()> {
let step = self.step_counter.next(&ctx.session_id.to_string()).await;
let output_summary = ctx.response_content.iter()
.filter_map(|b| match b {
ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(" ");
let event = TrajectoryEvent {
id: uuid::Uuid::new_v4().to_string(),
session_id: ctx.session_id.to_string(),
agent_id: ctx.agent_id.to_string(),
step_index: step,
step_type: TrajectoryStepType::LlmGeneration,
input_summary: String::new(),
output_summary: Self::truncate(&output_summary, 200),
duration_ms: 0,
timestamp: chrono::Utc::now(),
};
self.spawn_write(event);
Ok(())
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_step_counter_sequential() {
let counter = StepCounter::new();
assert_eq!(counter.next("sess-1").await, 0);
assert_eq!(counter.next("sess-1").await, 1);
assert_eq!(counter.next("sess-1").await, 2);
}
#[tokio::test]
async fn test_step_counter_different_sessions() {
let counter = StepCounter::new();
assert_eq!(counter.next("sess-1").await, 0);
assert_eq!(counter.next("sess-2").await, 0);
assert_eq!(counter.next("sess-1").await, 1);
assert_eq!(counter.next("sess-2").await, 1);
}
#[test]
fn test_truncate_short() {
assert_eq!(TrajectoryRecorderMiddleware::truncate("hello", 10), "hello");
}
#[test]
fn test_truncate_long() {
let long: String = "".repeat(300);
let truncated = TrajectoryRecorderMiddleware::truncate(&long, 200);
assert!(truncated.chars().count() <= 201); // 200 + …
}
}

View File

@@ -0,0 +1,593 @@
//! Natural Language Schedule Parser — transforms Chinese time expressions into cron.
//!
//! Three-layer fallback strategy:
//! 1. Regex pattern matching (covers ~80% of common expressions)
//! 2. LLM-assisted parsing (for ambiguous/complex expressions) — TODO: wire when Haiku driver available
//! 3. Interactive clarification (return `Unclear`)
//!
//! Lives in `zclaw-runtime` because it's a pure text→cron utility with no kernel dependency.
use chrono::{Datelike, Timelike};
use serde::{Deserialize, Serialize};
use zclaw_types::AgentId;
// ---------------------------------------------------------------------------
// Data structures
// ---------------------------------------------------------------------------
/// Result of parsing a natural language schedule expression.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParsedSchedule {
/// Cron expression, e.g. "0 9 * * *"
pub cron_expression: String,
/// Human-readable description of the schedule
pub natural_description: String,
/// Confidence of the parse (0.01.0)
pub confidence: f32,
/// What the task does (extracted from user input)
pub task_description: String,
/// What to trigger when the schedule fires
pub task_target: TaskTarget,
}
/// Target to trigger on schedule.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "id")]
pub enum TaskTarget {
/// Trigger a specific agent
Agent(String),
/// Trigger a specific hand
Hand(String),
/// Trigger a specific workflow
Workflow(String),
/// Generic reminder (no specific target)
Reminder,
}
/// Outcome of NL schedule parsing.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ScheduleParseResult {
/// High-confidence single parse
Exact(ParsedSchedule),
/// Multiple possible interpretations
Ambiguous(Vec<ParsedSchedule>),
/// Unable to parse — needs user clarification
Unclear,
}
// ---------------------------------------------------------------------------
// Regex pattern library
// ---------------------------------------------------------------------------
/// A single pattern for matching Chinese time expressions.
struct SchedulePattern {
/// Regex pattern string
regex: &'static str,
/// Cron template — use {h} for hour, {m} for minute, {dow} for day-of-week, {dom} for day-of-month
cron_template: &'static str,
/// Human description template
description: &'static str,
/// Base confidence for this pattern
confidence: f32,
}
/// Chinese time period keywords → hour mapping
fn period_to_hour(period: &str) -> Option<u32> {
match period {
"凌晨" => Some(0),
"早上" | "早晨" | "上午" => Some(9),
"中午" => Some(12),
"下午" | "午后" => Some(15),
"傍晚" | "黄昏" => Some(18),
"晚上" | "晚间" | "夜里" | "夜晚" => Some(21),
"半夜" | "午夜" => Some(0),
_ => None,
}
}
/// Chinese weekday names → cron day-of-week
fn weekday_to_cron(day: &str) -> Option<&'static str> {
match day {
"" | "周一" | "星期一" | "礼拜一" => Some("1"),
"" | "周二" | "星期二" | "礼拜二" => Some("2"),
"" | "周三" | "星期三" | "礼拜三" => Some("3"),
"" | "周四" | "星期四" | "礼拜四" => Some("4"),
"" | "周五" | "星期五" | "礼拜五" => Some("5"),
"" | "周六" | "星期六" | "礼拜六" => Some("6"),
"" | "周日" | "星期日" | "礼拜日" | "" | "周天" | "星期天" | "礼拜天" => Some("0"),
_ => None,
}
}
// ---------------------------------------------------------------------------
// Parser implementation
// ---------------------------------------------------------------------------
/// Parse a natural language schedule expression into a cron expression.
///
/// Uses a series of regex-based pattern matchers covering common Chinese
/// time expressions. Returns `Unclear` if no pattern matches.
pub fn parse_nl_schedule(input: &str, default_agent_id: &AgentId) -> ScheduleParseResult {
let input = input.trim();
if input.is_empty() {
return ScheduleParseResult::Unclear;
}
// Extract task description (everything after keywords like "提醒我", "帮我")
let task_description = extract_task_description(input);
// --- Pattern 1: 每天 + 时间 ---
if let Some(result) = try_every_day(input, &task_description, default_agent_id) {
return result;
}
// --- Pattern 2: 每周N + 时间 ---
if let Some(result) = try_every_week(input, &task_description, default_agent_id) {
return result;
}
// --- Pattern 3: 工作日 + 时间 ---
if let Some(result) = try_workday(input, &task_description, default_agent_id) {
return result;
}
// --- Pattern 4: 每N小时/分钟 ---
if let Some(result) = try_interval(input, &task_description, default_agent_id) {
return result;
}
// --- Pattern 5: 每月N号 ---
if let Some(result) = try_monthly(input, &task_description, default_agent_id) {
return result;
}
// --- Pattern 6: 明天/后天 + 时间 (one-shot) ---
if let Some(result) = try_one_shot(input, &task_description, default_agent_id) {
return result;
}
ScheduleParseResult::Unclear
}
/// Extract task description from input, stripping schedule-related keywords.
fn extract_task_description(input: &str) -> String {
let strip_prefixes = [
"每天", "每日", "每周", "工作日", "每个工作日",
"每月", "", "定时", "定期",
"提醒我", "提醒", "帮我", "", "",
"明天", "后天", "大后天",
];
let mut desc = input.to_string();
// Strip prefixes + time expressions in alternating passes until stable
let time_re = regex::Regex::new(
r"^(?:凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)?\d{1,2}[点时:]\d{0,2}分?"
).unwrap_or_else(|_| regex::Regex::new("").unwrap());
for _ in 0..3 {
// Pass 1: strip prefixes
loop {
let mut stripped = false;
for prefix in &strip_prefixes {
if desc.starts_with(prefix) {
desc = desc[prefix.len()..].to_string();
stripped = true;
}
}
if !stripped { break; }
}
// Pass 2: strip time expressions
let new_desc = time_re.replace(&desc, "").to_string();
if new_desc == desc { break; }
desc = new_desc;
}
desc.trim().to_string()
}
// -- Pattern matchers --
/// Adjust hour based on time-of-day period. Chinese 12-hour convention:
/// 下午3点 = 15, 晚上8点 = 20, etc. Morning hours stay as-is.
fn adjust_hour_for_period(hour: u32, period: Option<&str>) -> u32 {
if let Some(p) = period {
match p {
"下午" | "午后" => { if hour < 12 { hour + 12 } else { hour } }
"晚上" | "晚间" | "夜里" | "夜晚" => { if hour < 12 { hour + 12 } else { hour } }
"傍晚" | "黄昏" => { if hour < 12 { hour + 12 } else { hour } }
"中午" => { if hour == 12 { 12 } else if hour < 12 { hour + 12 } else { hour } }
"半夜" | "午夜" => { if hour == 12 { 0 } else { hour } }
_ => hour,
}
} else {
hour
}
}
const PERIOD_PATTERN: &str = "(凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)?";
fn try_every_day(input: &str, task_desc: &str, agent_id: &AgentId) -> Option<ScheduleParseResult> {
let re = regex::Regex::new(
&format!(r"(?:每天|每日)(?:的)?{}(\d{{1,2}})[点时:](\d{{1,2}})?", PERIOD_PATTERN)
).ok()?;
if let Some(caps) = re.captures(input) {
let period = caps.get(1).map(|m| m.as_str());
let raw_hour: u32 = caps.get(2)?.as_str().parse().ok()?;
let minute: u32 = caps.get(3).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0);
let hour = adjust_hour_for_period(raw_hour, period);
if hour > 23 || minute > 59 {
return None;
}
return Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: format!("{} {} * * *", minute, hour),
natural_description: format!("每天{:02}:{:02}", hour, minute),
confidence: 0.95,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}));
}
// "每天早上/下午..." without explicit hour
let re2 = regex::Regex::new(r"(?:每天|每日)(?:的)?(凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)").ok()?;
if let Some(caps) = re2.captures(input) {
let period = caps.get(1)?.as_str();
if let Some(hour) = period_to_hour(period) {
return Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: format!("0 {} * * *", hour),
natural_description: format!("每天{}", period),
confidence: 0.85,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}));
}
}
None
}
fn try_every_week(input: &str, task_desc: &str, agent_id: &AgentId) -> Option<ScheduleParseResult> {
let re = regex::Regex::new(
&format!(r"(?:每周|每个?星期|每个?礼拜)(一|二|三|四|五|六|日|天|周一|周二|周三|周四|周五|周六|周日|周天|星期一|星期二|星期三|星期四|星期五|星期六|星期日|星期天|礼拜一|礼拜二|礼拜三|礼拜四|礼拜五|礼拜六|礼拜日|礼拜天)(?:的)?{}(\d{{1,2}})[点时:](\d{{1,2}})?", PERIOD_PATTERN)
).ok()?;
let caps = re.captures(input)?;
let day_str = caps.get(1)?.as_str();
let dow = weekday_to_cron(day_str)?;
let period = caps.get(2).map(|m| m.as_str());
let raw_hour: u32 = caps.get(3)?.as_str().parse().ok()?;
let minute: u32 = caps.get(4).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0);
let hour = adjust_hour_for_period(raw_hour, period);
if hour > 23 || minute > 59 {
return None;
}
Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: format!("{} {} * * {}", minute, hour, dow),
natural_description: format!("每周{} {:02}:{:02}", day_str, hour, minute),
confidence: 0.92,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}))
}
fn try_workday(input: &str, task_desc: &str, agent_id: &AgentId) -> Option<ScheduleParseResult> {
let re = regex::Regex::new(
&format!(r"(?:工作日|每个?工作日|工作日(?:的)?){}(\d{{1,2}})[点时:](\d{{1,2}})?", PERIOD_PATTERN)
).ok()?;
if let Some(caps) = re.captures(input) {
let period = caps.get(1).map(|m| m.as_str());
let raw_hour: u32 = caps.get(2)?.as_str().parse().ok()?;
let minute: u32 = caps.get(3).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0);
let hour = adjust_hour_for_period(raw_hour, period);
if hour > 23 || minute > 59 {
return None;
}
return Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: format!("{} {} * * 1-5", minute, hour),
natural_description: format!("工作日{:02}:{:02}", hour, minute),
confidence: 0.90,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}));
}
// "工作日下午3点" style
let re2 = regex::Regex::new(
r"(?:工作日|每个?工作日)(?:的)?(凌晨|早上|早晨|上午|中午|下午|午后|傍晚|黄昏|晚上|晚间|夜里|夜晚|半夜|午夜)"
).ok()?;
if let Some(caps) = re2.captures(input) {
let period = caps.get(1)?.as_str();
if let Some(hour) = period_to_hour(period) {
return Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: format!("0 {} * * 1-5", hour),
natural_description: format!("工作日{}", period),
confidence: 0.85,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}));
}
}
None
}
fn try_interval(input: &str, task_desc: &str, agent_id: &AgentId) -> Option<ScheduleParseResult> {
// "每2小时", "每30分钟", "每N小时/分钟"
let re = regex::Regex::new(r"每(\d{1,2})(小时|分钟|分|钟|个小时)").ok()?;
if let Some(caps) = re.captures(input) {
let n: u32 = caps.get(1)?.as_str().parse().ok()?;
if n == 0 {
return None;
}
let unit = caps.get(2)?.as_str();
let (cron, desc) = if unit.contains("") {
(format!("0 */{} * * *", n), format!("{}小时", n))
} else {
(format!("*/{} * * * *", n), format!("{}分钟", n))
};
return Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: cron,
natural_description: desc,
confidence: 0.90,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}));
}
None
}
fn try_monthly(input: &str, task_desc: &str, agent_id: &AgentId) -> Option<ScheduleParseResult> {
let re = regex::Regex::new(
&format!(r"(?:每月|每个月)(?:的)?(\d{{1,2}})[号日](?:的)?{}(\d{{1,2}})?[点时:]?(\d{{1,2}})?", PERIOD_PATTERN)
).ok()?;
if let Some(caps) = re.captures(input) {
let day: u32 = caps.get(1)?.as_str().parse().ok()?;
let period = caps.get(2).map(|m| m.as_str());
let raw_hour: u32 = caps.get(3).map(|m| m.as_str().parse().unwrap_or(9)).unwrap_or(9);
let minute: u32 = caps.get(4).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0);
let hour = adjust_hour_for_period(raw_hour, period);
if day > 31 || hour > 23 || minute > 59 {
return None;
}
return Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: format!("{} {} {} * *", minute, hour, day),
natural_description: format!("每月{}号 {:02}:{:02}", day, hour, minute),
confidence: 0.90,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}));
}
None
}
fn try_one_shot(input: &str, task_desc: &str, agent_id: &AgentId) -> Option<ScheduleParseResult> {
let re = regex::Regex::new(
&format!(r"(明天|后天|大后天)(?:的)?{}(\d{{1,2}})[点时:](\d{{1,2}})?", PERIOD_PATTERN)
).ok()?;
let caps = re.captures(input)?;
let day_offset = match caps.get(1)?.as_str() {
"明天" => 1,
"后天" => 2,
"大后天" => 3,
_ => return None,
};
let period = caps.get(2).map(|m| m.as_str());
let raw_hour: u32 = caps.get(3)?.as_str().parse().ok()?;
let minute: u32 = caps.get(4).map(|m| m.as_str().parse().unwrap_or(0)).unwrap_or(0);
let hour = adjust_hour_for_period(raw_hour, period);
if hour > 23 || minute > 59 {
return None;
}
let target = chrono::Utc::now()
.checked_add_signed(chrono::Duration::days(day_offset))
.unwrap_or_else(chrono::Utc::now)
.with_hour(hour)
.unwrap_or_else(|| chrono::Utc::now())
.with_minute(minute)
.unwrap_or_else(|| chrono::Utc::now())
.with_second(0)
.unwrap_or_else(|| chrono::Utc::now());
Some(ScheduleParseResult::Exact(ParsedSchedule {
cron_expression: target.to_rfc3339(),
natural_description: format!("{} {:02}:{:02}", caps.get(1)?.as_str(), hour, minute),
confidence: 0.88,
task_description: task_desc.to_string(),
task_target: TaskTarget::Agent(agent_id.to_string()),
}))
}
// ---------------------------------------------------------------------------
// Schedule intent detection
// ---------------------------------------------------------------------------
/// Keywords indicating the user wants to set a scheduled task.
const SCHEDULE_INTENT_KEYWORDS: &[&str] = &[
"提醒我", "提醒", "定时", "每天", "每日", "每周", "每月",
"工作日", "每隔", "", "定期", "到时候", "准时",
"闹钟", "闹铃", "日程", "日历",
];
/// Check if user input contains schedule intent.
pub fn has_schedule_intent(input: &str) -> bool {
let lower = input.to_lowercase();
SCHEDULE_INTENT_KEYWORDS.iter().any(|kw| lower.contains(kw))
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
fn default_agent() -> AgentId {
AgentId::new()
}
#[test]
fn test_every_day_explicit_time() {
let result = parse_nl_schedule("每天早上9点提醒我查房", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "0 9 * * *");
assert!(s.confidence >= 0.9);
}
_ => panic!("Expected Exact, got {:?}", result),
}
}
#[test]
fn test_every_day_with_minute() {
let result = parse_nl_schedule("每天下午3点30分提醒我", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "30 15 * * *");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_every_day_period_only() {
let result = parse_nl_schedule("每天早上提醒我看看报告", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "0 9 * * *");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_every_week_monday() {
let result = parse_nl_schedule("每周一上午10点提醒我开会", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "0 10 * * 1");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_every_week_friday() {
let result = parse_nl_schedule("每个星期五下午2点", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "0 14 * * 5");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_workday() {
let result = parse_nl_schedule("工作日下午3点提醒我写周报", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "0 15 * * 1-5");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_interval_hours() {
let result = parse_nl_schedule("每2小时提醒我喝水", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "0 */2 * * *");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_interval_minutes() {
let result = parse_nl_schedule("每30分钟检查一次", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "*/30 * * * *");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_monthly() {
let result = parse_nl_schedule("每月1号早上9点提醒我", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert_eq!(s.cron_expression, "0 9 1 * *");
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_one_shot_tomorrow() {
let result = parse_nl_schedule("明天下午3点提醒我开会", &default_agent());
match result {
ScheduleParseResult::Exact(s) => {
assert!(s.cron_expression.contains('T'));
assert!(s.natural_description.contains("明天"));
}
_ => panic!("Expected Exact"),
}
}
#[test]
fn test_unclear_input() {
let result = parse_nl_schedule("今天天气怎么样", &default_agent());
assert!(matches!(result, ScheduleParseResult::Unclear));
}
#[test]
fn test_empty_input() {
let result = parse_nl_schedule("", &default_agent());
assert!(matches!(result, ScheduleParseResult::Unclear));
}
#[test]
fn test_schedule_intent_detection() {
assert!(has_schedule_intent("每天早上9点提醒我查房"));
assert!(has_schedule_intent("帮我设个定时任务"));
assert!(has_schedule_intent("工作日提醒我打卡"));
assert!(!has_schedule_intent("今天天气怎么样"));
assert!(!has_schedule_intent("帮我写个报告"));
}
#[test]
fn test_period_to_hour_mapping() {
assert_eq!(period_to_hour("凌晨"), Some(0));
assert_eq!(period_to_hour("早上"), Some(9));
assert_eq!(period_to_hour("中午"), Some(12));
assert_eq!(period_to_hour("下午"), Some(15));
assert_eq!(period_to_hour("晚上"), Some(21));
assert_eq!(period_to_hour("不知道"), None);
}
#[test]
fn test_weekday_to_cron_mapping() {
assert_eq!(weekday_to_cron(""), Some("1"));
assert_eq!(weekday_to_cron(""), Some("5"));
assert_eq!(weekday_to_cron(""), Some("0"));
assert_eq!(weekday_to_cron("星期三"), Some("3"));
assert_eq!(weekday_to_cron("礼拜天"), Some("0"));
assert_eq!(weekday_to_cron("未知"), None);
}
#[test]
fn test_task_description_extraction() {
assert_eq!(extract_task_description("每天早上9点提醒我查房"), "查房");
}
}

View File

@@ -0,0 +1,394 @@
//! Experience Extractor — transforms successful proposals into reusable experiences.
//!
//! Closes Breakpoint 3 (successful solution → structured experience) and
//! Breakpoint 4 (experience reuse injection) of the self-improvement loop.
//!
//! When a user confirms a proposal was helpful (explicitly or via implicit
//! keyword detection), the extractor creates an [`Experience`] record and
//! stores it through [`ExperienceStore`] for future retrieval.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use uuid::Uuid;
use zclaw_growth::ExperienceStore;
use zclaw_types::Result;
use super::pain_aggregator::PainPoint;
use super::solution_generator::{Proposal, ProposalStatus};
// ---------------------------------------------------------------------------
// Shared completion status
// ---------------------------------------------------------------------------
/// Completion outcome — shared across experience and trajectory modules.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompletionStatus {
Success,
Partial,
Failed,
Abandoned,
}
// ---------------------------------------------------------------------------
// Feedback & event types
// ---------------------------------------------------------------------------
/// User feedback on a proposal's effectiveness.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProposalFeedback {
pub proposal_id: String,
pub outcome: CompletionStatus,
pub user_comment: Option<String>,
pub detected_at: DateTime<Utc>,
}
/// Event emitted when a pain point reaches high confidence.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PainConfirmedEvent {
pub pain_point_id: String,
pub pattern: String,
pub confidence: f64,
}
// ---------------------------------------------------------------------------
// Implicit feedback detection
// ---------------------------------------------------------------------------
const POSITIVE_KEYWORDS: &[&str] = &[
"好了", "解决了", "可以了", "对了", "完美",
"谢谢", "很好", "", "不错", "成功了",
"行了", "搞定了", "OK", "ok", "搞定",
];
const NEGATIVE_KEYWORDS: &[&str] = &[
"没用", "不对", "还是不行", "错了", "差太远",
"不好使", "不管用", "没效果", "失败", "不行",
];
/// Detect implicit feedback from user messages.
/// Returns `Some(CompletionStatus)` if a clear signal is found.
pub fn detect_implicit_feedback(message: &str) -> Option<CompletionStatus> {
let lower = message.to_lowercase();
for kw in POSITIVE_KEYWORDS {
if lower.contains(kw) {
return Some(CompletionStatus::Success);
}
}
for kw in NEGATIVE_KEYWORDS {
if lower.contains(kw) {
return Some(CompletionStatus::Failed);
}
}
None
}
// ---------------------------------------------------------------------------
// ExperienceExtractor
// ---------------------------------------------------------------------------
/// Extracts structured experiences from successful proposals.
///
/// Two extraction strategies:
/// 1. **LLM-assisted** — uses LLM to summarise context + steps (when driver available)
/// 2. **Template fallback** — fixed-format extraction from proposal fields
pub struct ExperienceExtractor {
experience_store: std::sync::Arc<ExperienceStore>,
}
impl ExperienceExtractor {
pub fn new(experience_store: std::sync::Arc<ExperienceStore>) -> Self {
Self { experience_store }
}
/// Extract and store an experience from a successful proposal + pain point.
///
/// Uses template extraction as the default strategy. LLM-assisted extraction
/// can be added later by wiring a driver through the constructor.
pub async fn extract_from_proposal(
&self,
proposal: &Proposal,
pain: &PainPoint,
feedback: &ProposalFeedback,
) -> Result<()> {
if feedback.outcome != CompletionStatus::Success && feedback.outcome != CompletionStatus::Partial {
debug!(
"[ExperienceExtractor] Skipping non-success proposal {} ({:?})",
proposal.id, feedback.outcome
);
return Ok(());
}
let experience = self.template_extract(proposal, pain, feedback);
self.experience_store.store_experience(&experience).await?;
debug!(
"[ExperienceExtractor] Stored experience {} for pain '{}'",
experience.id, experience.pain_pattern
);
Ok(())
}
/// Template-based extraction — deterministic, no LLM required.
fn template_extract(
&self,
proposal: &Proposal,
pain: &PainPoint,
feedback: &ProposalFeedback,
) -> zclaw_growth::experience_store::Experience {
let solution_steps: Vec<String> = proposal.steps.iter()
.map(|s| {
if let Some(ref hint) = s.skill_hint {
format!("{} (工具: {})", s.detail, hint)
} else {
s.detail.clone()
}
})
.collect();
let context = format!(
"痛点: {} | 类别: {} | 出现{}次 | 证据: {}",
pain.summary,
pain.category,
pain.occurrence_count,
pain.evidence.iter()
.map(|e| e.user_said.as_str())
.collect::<Vec<_>>()
.join("")
);
let outcome = match feedback.outcome {
CompletionStatus::Success => "成功解决",
CompletionStatus::Partial => "部分解决",
CompletionStatus::Failed => "未解决",
CompletionStatus::Abandoned => "已放弃",
};
zclaw_growth::experience_store::Experience::new(
&pain.agent_id,
&pain.summary,
&context,
solution_steps,
outcome,
)
}
/// Search for relevant experiences to inject into a conversation.
///
/// Returns experiences whose pain pattern matches the user's current input.
pub async fn find_relevant_experiences(
&self,
agent_id: &str,
user_input: &str,
) -> Vec<zclaw_growth::experience_store::Experience> {
match self.experience_store.find_by_pattern(agent_id, user_input).await {
Ok(experiences) => {
if !experiences.is_empty() {
// Increment reuse count for found experiences (fire-and-forget)
for exp in &experiences {
let store = self.experience_store.clone();
let exp_clone = exp.clone();
tokio::spawn(async move {
store.increment_reuse(&exp_clone).await;
});
}
}
experiences
}
Err(e) => {
warn!("[ExperienceExtractor] find_relevant failed: {}", e);
Vec::new()
}
}
}
/// Format experiences for system prompt injection.
/// Returns a concise block capped at ~200 Chinese characters.
pub fn format_for_injection(
experiences: &[zclaw_growth::experience_store::Experience],
) -> String {
if experiences.is_empty() {
return String::new();
}
let mut parts = Vec::new();
let mut total_chars = 0;
let max_chars = 200;
for exp in experiences {
if total_chars >= max_chars {
break;
}
let step_summary = exp.solution_steps.first()
.map(|s| truncate(s, 40))
.unwrap_or_default();
let line = format!(
"[过往经验] 类似「{}」做过:{},结果是{}",
truncate(&exp.pain_pattern, 30),
step_summary,
exp.outcome
);
total_chars += line.chars().count();
parts.push(line);
}
if parts.is_empty() {
return String::new();
}
format!("\n\n--- 过往经验参考 ---\n{}", parts.join("\n"))
}
}
fn truncate(s: &str, max_chars: usize) -> String {
if s.chars().count() <= max_chars {
s.to_string()
} else {
s.chars().take(max_chars).collect::<String>() + ""
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::intelligence::pain_aggregator::PainSeverity;
fn sample_pain() -> PainPoint {
PainPoint::new(
"agent-1",
"user-1",
"出口包装不合格",
"logistics",
PainSeverity::High,
"又被退了",
"recurring packaging issue",
)
}
fn sample_proposal(pain: &PainPoint) -> Proposal {
Proposal::from_pain_point(pain)
}
#[test]
fn test_detect_positive_feedback() {
assert_eq!(
detect_implicit_feedback("好了,这下解决了"),
Some(CompletionStatus::Success)
);
assert_eq!(
detect_implicit_feedback("谢谢,完美"),
Some(CompletionStatus::Success)
);
}
#[test]
fn test_detect_negative_feedback() {
assert_eq!(
detect_implicit_feedback("还是不行"),
Some(CompletionStatus::Failed)
);
assert_eq!(
detect_implicit_feedback("没用啊"),
Some(CompletionStatus::Failed)
);
}
#[test]
fn test_no_feedback() {
assert_eq!(detect_implicit_feedback("今天天气怎么样"), None);
assert_eq!(detect_implicit_feedback("帮我查一下"), None);
}
#[test]
fn test_template_extract() {
let viking = std::sync::Arc::new(zclaw_growth::VikingAdapter::in_memory());
let store = std::sync::Arc::new(ExperienceStore::new(viking));
let extractor = ExperienceExtractor::new(store);
let pain = sample_pain();
let proposal = sample_proposal(&pain);
let feedback = ProposalFeedback {
proposal_id: proposal.id.clone(),
outcome: CompletionStatus::Success,
user_comment: Some("好了".into()),
detected_at: Utc::now(),
};
let exp = extractor.template_extract(&proposal, &pain, &feedback);
assert!(!exp.id.is_empty());
assert_eq!(exp.agent_id, "agent-1");
assert!(!exp.solution_steps.is_empty());
assert_eq!(exp.outcome, "成功解决");
}
#[test]
fn test_format_for_injection_empty() {
assert!(ExperienceExtractor::format_for_injection(&[]).is_empty());
}
#[test]
fn test_format_for_injection_with_data() {
let exp = zclaw_growth::experience_store::Experience::new(
"agent-1",
"出口包装问题",
"包装被退回",
vec!["检查法规".into(), "使用合规材料".into()],
"成功解决",
);
let formatted = ExperienceExtractor::format_for_injection(&[exp]);
assert!(formatted.contains("过往经验"));
assert!(formatted.contains("出口包装问题"));
}
#[tokio::test]
async fn test_extract_stores_experience() {
let viking = std::sync::Arc::new(zclaw_growth::VikingAdapter::in_memory());
let store = std::sync::Arc::new(ExperienceStore::new(viking));
let extractor = ExperienceExtractor::new(store.clone());
let pain = sample_pain();
let proposal = sample_proposal(&pain);
let feedback = ProposalFeedback {
proposal_id: proposal.id.clone(),
outcome: CompletionStatus::Success,
user_comment: Some("好了".into()),
detected_at: Utc::now(),
};
extractor.extract_from_proposal(&proposal, &pain, &feedback).await.unwrap();
let found = store.find_by_agent("agent-1").await.unwrap();
assert_eq!(found.len(), 1);
}
#[tokio::test]
async fn test_extract_skips_failed_feedback() {
let viking = std::sync::Arc::new(zclaw_growth::VikingAdapter::in_memory());
let store = std::sync::Arc::new(ExperienceStore::new(viking));
let extractor = ExperienceExtractor::new(store.clone());
let pain = sample_pain();
let proposal = sample_proposal(&pain);
let feedback = ProposalFeedback {
proposal_id: proposal.id.clone(),
outcome: CompletionStatus::Failed,
user_comment: Some("没用".into()),
detected_at: Utc::now(),
};
extractor.extract_from_proposal(&proposal, &pain, &feedback).await.unwrap();
let found = store.find_by_agent("agent-1").await.unwrap();
assert!(found.is_empty(), "Should not store experience for failed feedback");
}
#[test]
fn test_truncate() {
assert_eq!(truncate("hello", 10), "hello");
assert_eq!(truncate("这是一个很长的字符串用于测试截断", 10).chars().count(), 11); // 10 + …
}
}

View File

@@ -36,6 +36,9 @@ pub mod pain_aggregator;
pub mod solution_generator; pub mod solution_generator;
pub mod personality_detector; pub mod personality_detector;
pub mod pain_storage; pub mod pain_storage;
pub mod experience;
pub mod user_profiler;
pub mod trajectory_compressor;
// Re-export main types for convenience // Re-export main types for convenience
pub use heartbeat::HeartbeatEngineState; pub use heartbeat::HeartbeatEngineState;

View File

@@ -0,0 +1,328 @@
//! Trajectory Compressor — compresses raw events into structured trajectories.
//!
//! Takes a list of `TrajectoryEvent` records and produces a single
//! `CompressedTrajectory` summarising the session. Called at session end
//! (or compaction flush) to reduce storage and prepare data for analysis.
use chrono::Utc;
use zclaw_memory::trajectory_store::{
CompressedTrajectory, CompletionStatus, SatisfactionSignal, TrajectoryEvent, TrajectoryStepType,
};
// ---------------------------------------------------------------------------
// Satisfaction detection
// ---------------------------------------------------------------------------
const POSITIVE_SIGNALS: &[&str] = &[
"谢谢", "很好", "解决了", "可以了", "对了", "完美",
"", "不错", "成功了", "行了", "搞定",
];
const NEGATIVE_SIGNALS: &[&str] = &[
"不对", "没用", "还是不行", "错了", "差太远",
"不好使", "不管用", "没效果", "失败", "不行",
];
/// Detect user satisfaction from the last few messages.
pub fn detect_satisfaction(last_messages: &[String]) -> Option<SatisfactionSignal> {
if last_messages.is_empty() {
return None;
}
// Check the last user messages for satisfaction signals
for msg in last_messages.iter().rev().take(3) {
let lower = msg.to_lowercase();
for kw in POSITIVE_SIGNALS {
if lower.contains(kw) {
return Some(SatisfactionSignal::Positive);
}
}
for kw in NEGATIVE_SIGNALS {
if lower.contains(kw) {
return Some(SatisfactionSignal::Negative);
}
}
}
Some(SatisfactionSignal::Neutral)
}
// ---------------------------------------------------------------------------
// Compression
// ---------------------------------------------------------------------------
/// Compress a sequence of trajectory events into a single summary.
///
/// Returns `None` if the events list is empty.
pub fn compress(
events: Vec<TrajectoryEvent>,
satisfaction: Option<SatisfactionSignal>,
) -> Option<CompressedTrajectory> {
if events.is_empty() {
return None;
}
let session_id = events[0].session_id.clone();
let agent_id = events[0].agent_id.clone();
// Extract key steps (skip retries — consecutive same-type steps)
let key_events = deduplicate_steps(&events);
let request_type = infer_request_type(&key_events);
let tools_used = extract_tools(&key_events);
let total_steps = key_events.len();
let total_duration_ms: u64 = events.iter().map(|e| e.duration_ms).sum();
let outcome = infer_outcome(&key_events, satisfaction);
let execution_chain = build_chain_json(&key_events);
Some(CompressedTrajectory {
id: uuid::Uuid::new_v4().to_string(),
session_id,
agent_id,
request_type,
tools_used,
outcome,
total_steps,
total_duration_ms,
total_tokens: 0, // filled by middleware from context
execution_chain,
satisfaction_signal: satisfaction,
created_at: Utc::now(),
})
}
/// Remove consecutive duplicate step types (retries/error recovery).
fn deduplicate_steps(events: &[TrajectoryEvent]) -> Vec<&TrajectoryEvent> {
let mut result = Vec::new();
let mut last_type: Option<TrajectoryStepType> = None;
for event in events {
// Keep first occurrence of each step type change
if last_type != Some(event.step_type) {
result.push(event);
last_type = Some(event.step_type);
}
}
// If we deduplicated everything away, keep the first and last
if result.is_empty() && !events.is_empty() {
result.push(&events[0]);
if events.len() > 1 {
result.push(&events[events.len() - 1]);
}
}
result
}
/// Infer request type from the first user request event.
fn infer_request_type(events: &[&TrajectoryEvent]) -> String {
for event in events {
if event.step_type == TrajectoryStepType::UserRequest {
let input = &event.input_summary;
return classify_request(input);
}
}
"general".to_string()
}
fn classify_request(input: &str) -> String {
let lower = input.to_lowercase();
if ["报告", "数据", "统计", "报表", "汇总"].iter().any(|k| lower.contains(k)) {
return "data_report".into();
}
if ["政策", "法规", "合规", "标准"].iter().any(|k| lower.contains(k)) {
return "policy_query".into();
}
if ["查房", "巡房"].iter().any(|k| lower.contains(k)) {
return "inspection".into();
}
if ["排班", "值班"].iter().any(|k| lower.contains(k)) {
return "scheduling".into();
}
if ["会议", "日程", "安排", "提醒"].iter().any(|k| lower.contains(k)) {
return "meeting".into();
}
if ["检查"].iter().any(|k| lower.contains(k)) {
return "inspection".into();
}
"general".to_string()
}
/// Extract unique tool names from ToolExecution events.
fn extract_tools(events: &[&TrajectoryEvent]) -> Vec<String> {
let mut tools = Vec::new();
let mut seen = std::collections::HashSet::new();
for event in events {
if event.step_type == TrajectoryStepType::ToolExecution {
let tool = event.input_summary.clone();
if !tool.is_empty() && seen.insert(tool.clone()) {
tools.push(tool);
}
}
}
tools
}
/// Infer completion outcome from last step + satisfaction signal.
fn infer_outcome(
events: &[&TrajectoryEvent],
satisfaction: Option<SatisfactionSignal>,
) -> CompletionStatus {
match satisfaction {
Some(SatisfactionSignal::Positive) => CompletionStatus::Success,
Some(SatisfactionSignal::Negative) => CompletionStatus::Failed,
Some(SatisfactionSignal::Neutral) => {
// Check if last meaningful step was a successful LLM generation
if events.iter().any(|e| e.step_type == TrajectoryStepType::LlmGeneration) {
CompletionStatus::Partial
} else {
CompletionStatus::Abandoned
}
}
None => CompletionStatus::Partial,
}
}
/// Build JSON execution chain from key events.
fn build_chain_json(events: &[&TrajectoryEvent]) -> String {
let chain: Vec<serde_json::Value> = events.iter().map(|e| {
serde_json::json!({
"step": e.step_index,
"type": e.step_type.as_str(),
"input": truncate(&e.input_summary, 100),
"output": truncate(&e.output_summary, 100),
})
}).collect();
serde_json::to_string(&chain).unwrap_or_else(|_| "[]".to_string())
}
fn truncate(s: &str, max: usize) -> String {
if s.chars().count() <= max {
s.to_string()
} else {
s.chars().take(max).collect::<String>() + ""
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn make_event(index: usize, step_type: TrajectoryStepType, input: &str, output: &str) -> TrajectoryEvent {
TrajectoryEvent {
id: format!("evt-{}", index),
session_id: "sess-1".to_string(),
agent_id: "agent-1".to_string(),
step_index: index,
step_type,
input_summary: input.to_string(),
output_summary: output.to_string(),
duration_ms: 100,
timestamp: Utc::now(),
}
}
#[test]
fn test_compress_empty() {
assert!(compress(vec![], None).is_none());
}
#[test]
fn test_compress_single_event() {
let events = vec![make_event(0, TrajectoryStepType::UserRequest, "帮我查数据", "")];
let ct = compress(events, None).unwrap();
assert_eq!(ct.session_id, "sess-1");
assert_eq!(ct.total_steps, 1);
}
#[test]
fn test_compress_full_chain() {
let events = vec![
make_event(0, TrajectoryStepType::UserRequest, "帮我生成月度报告", ""),
make_event(1, TrajectoryStepType::ToolExecution, "collector", "5条数据"),
make_event(2, TrajectoryStepType::LlmGeneration, "", "报告已生成"),
];
let ct = compress(events, Some(SatisfactionSignal::Positive)).unwrap();
assert_eq!(ct.request_type, "data_report");
assert_eq!(ct.tools_used, vec!["collector"]);
assert_eq!(ct.outcome, CompletionStatus::Success);
assert!(ct.execution_chain.starts_with('['));
}
#[test]
fn test_deduplicate_retries() {
let events = vec![
make_event(0, TrajectoryStepType::ToolExecution, "tool-a", "err"),
make_event(1, TrajectoryStepType::ToolExecution, "tool-a", "ok"),
make_event(2, TrajectoryStepType::LlmGeneration, "", "done"),
];
let deduped = deduplicate_steps(&events);
assert_eq!(deduped.len(), 2); // first ToolExecution + LlmGeneration
}
#[test]
fn test_classify_request() {
assert_eq!(classify_request("帮我生成月度报告"), "data_report");
assert_eq!(classify_request("最新的合规政策是什么"), "policy_query");
assert_eq!(classify_request("明天有什么会议"), "meeting");
assert_eq!(classify_request("查房安排"), "inspection");
assert_eq!(classify_request("你好"), "general");
}
#[test]
fn test_detect_satisfaction_positive() {
let msgs = vec!["谢谢,很好用".to_string()];
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Positive));
}
#[test]
fn test_detect_satisfaction_negative() {
let msgs = vec!["还是不行啊".to_string()];
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Negative));
}
#[test]
fn test_detect_satisfaction_neutral() {
let msgs = vec!["好的我知道了".to_string()];
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Neutral));
}
#[test]
fn test_detect_satisfaction_empty() {
assert_eq!(detect_satisfaction(&[]), None);
}
#[test]
fn test_infer_outcome() {
let events = vec![make_event(0, TrajectoryStepType::LlmGeneration, "", "ok")];
assert_eq!(
infer_outcome(&events.iter().collect::<Vec<_>>(), Some(SatisfactionSignal::Positive)),
CompletionStatus::Success
);
assert_eq!(
infer_outcome(&events.iter().collect::<Vec<_>>(), Some(SatisfactionSignal::Negative)),
CompletionStatus::Failed
);
}
#[test]
fn test_extract_tools_dedup() {
let events = vec![
make_event(0, TrajectoryStepType::ToolExecution, "researcher", ""),
make_event(1, TrajectoryStepType::ToolExecution, "researcher", ""),
make_event(2, TrajectoryStepType::ToolExecution, "collector", ""),
];
let refs: Vec<&TrajectoryEvent> = events.iter().collect();
let tools = extract_tools(&refs);
assert_eq!(tools, vec!["researcher", "collector"]);
}
}

View File

@@ -0,0 +1,369 @@
//! User Profiler — aggregates extracted facts into a structured user profile.
//!
//! Takes `ExtractedFactBatch` from the growth pipeline, classifies facts by
//! category, and updates the `UserProfile` via `UserProfileStore`.
//!
//! Desktop uses "default_user" as the single user ID.
use std::sync::Arc;
use chrono::Utc;
use tracing::{debug, warn};
use zclaw_memory::fact::{Fact, FactCategory};
use zclaw_memory::user_profile_store::{
CommStyle, Level, UserProfile, UserProfileStore,
};
use zclaw_types::Result;
/// Default user ID for single-user desktop mode.
const DEFAULT_USER: &str = "default_user";
// ---------------------------------------------------------------------------
// Classification helpers
// ---------------------------------------------------------------------------
/// Maps a fact category to the profile field it should update.
enum ProfileFieldUpdate {
Industry(String),
Role(String),
ExpertiseLevel(Level),
CommunicationStyle(CommStyle),
PreferredTool(String),
RecentTopic(String),
}
/// Classify a fact content into a profile update.
fn classify_fact_content(fact: &Fact) -> Option<ProfileFieldUpdate> {
let content = fact.content.to_lowercase();
// Communication style detection
if content.contains("简洁") || content.contains("简短") || content.contains("简单说") {
return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Concise));
}
if content.contains("详细") || content.contains("展开说") || content.contains("多说点") {
return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Detailed));
}
if content.contains("正式") || content.contains("专业") || content.contains("官方") {
return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Formal));
}
if content.contains("随意") || content.contains("轻松") || content.contains("随便") {
return Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Casual));
}
// Industry / role detection
if content.contains("医疗") || content.contains("医院") || content.contains("诊所") {
return Some(ProfileFieldUpdate::Industry("医疗".into()));
}
if content.contains("制造") || content.contains("工厂") || content.contains("生产") {
return Some(ProfileFieldUpdate::Industry("制造业".into()));
}
if content.contains("教育") || content.contains("学校") || content.contains("教学") {
return Some(ProfileFieldUpdate::Industry("教育".into()));
}
if content.contains("行政") || content.contains("主任") || content.contains("管理") {
return Some(ProfileFieldUpdate::Role("行政管理".into()));
}
if content.contains("工程师") || content.contains("开发") || content.contains("技术") {
return Some(ProfileFieldUpdate::Role("技术人员".into()));
}
if content.contains("医生") || content.contains("护士") || content.contains("临床") {
return Some(ProfileFieldUpdate::Role("医务人员".into()));
}
// Expertise level
if content.contains("新手") || content.contains("不会") || content.contains("不了解") {
return Some(ProfileFieldUpdate::ExpertiseLevel(Level::Beginner));
}
if content.contains("熟练") || content.contains("熟悉") || content.contains("常用") {
return Some(ProfileFieldUpdate::ExpertiseLevel(Level::Expert));
}
// Tool preferences
if content.contains("用研究") || content.contains("帮我查") || content.contains("调研") {
return Some(ProfileFieldUpdate::PreferredTool("researcher".into()));
}
if content.contains("收集") || content.contains("整理") || content.contains("汇总") {
return Some(ProfileFieldUpdate::PreferredTool("collector".into()));
}
if content.contains("幻灯") || content.contains("演示") || content.contains("ppt") {
return Some(ProfileFieldUpdate::PreferredTool("slideshow".into()));
}
// Default: treat as a recent topic
if fact.confidence >= 0.6 {
let topic = truncate(&fact.content, 30);
return Some(ProfileFieldUpdate::RecentTopic(topic));
}
None
}
// ---------------------------------------------------------------------------
// UserProfiler
// ---------------------------------------------------------------------------
/// Aggregates extracted facts into a structured user profile.
pub struct UserProfiler {
store: Arc<UserProfileStore>,
}
impl UserProfiler {
pub fn new(store: Arc<UserProfileStore>) -> Self {
Self { store }
}
/// Main entry point: update profile from extracted facts.
pub async fn update_from_facts(
&self,
facts: &[Fact],
) -> Result<()> {
if facts.is_empty() {
return Ok(());
}
for fact in facts {
if let Some(update) = classify_fact_content(fact) {
if let Err(e) = self.apply_update(&update).await {
warn!("[UserProfiler] Failed to apply update: {}", e);
}
}
}
// Update confidence based on number of classified facts
self.update_confidence().await;
debug!("[UserProfiler] Updated profile from {} facts", facts.len());
Ok(())
}
/// Update active pain points in the profile.
pub async fn update_pain_points(
&self,
pains: Vec<String>,
) -> Result<()> {
// Replace all pain points by loading, modifying, and upserting
let mut profile = self.get_or_create_profile().await;
profile.active_pain_points = pains;
profile.updated_at = Utc::now();
self.store.upsert(&profile).await
}
/// Format relevant profile attributes for injection into system prompt.
/// Caps output at ~200 Chinese characters (≈100 tokens).
pub fn format_profile_summary(profile: &UserProfile, topic: &str) -> Option<String> {
let mut parts = Vec::new();
if let Some(ref industry) = profile.industry {
parts.push(format!("行业: {}", industry));
}
if let Some(ref role) = profile.role {
parts.push(format!("角色: {}", role));
}
if let Some(ref level) = profile.expertise_level {
let level_str = match level {
Level::Beginner => "入门",
Level::Intermediate => "中级",
Level::Expert => "专家",
};
parts.push(format!("水平: {}", level_str));
}
if let Some(ref style) = profile.communication_style {
let style_str = match style {
CommStyle::Concise => "简洁",
CommStyle::Detailed => "详细",
CommStyle::Formal => "正式",
CommStyle::Casual => "随意",
};
parts.push(format!("沟通风格: {}", style_str));
}
// Only add topics relevant to the current conversation
if !profile.recent_topics.is_empty() {
let relevant: Vec<&str> = profile.recent_topics.iter()
.filter(|t| {
let t_lower = t.to_lowercase();
let topic_lower = topic.to_lowercase();
t_lower.chars().any(|c| topic_lower.contains(c))
|| topic_lower.chars().any(|c| t_lower.contains(c))
})
.take(3)
.map(|s| s.as_str())
.collect();
if !relevant.is_empty() {
parts.push(format!("近期话题: {}", relevant.join(", ")));
}
}
if parts.is_empty() {
return None;
}
let summary = format!("[用户画像] {}", parts.join(" | "));
if summary.chars().count() > 200 {
Some(truncate(&summary, 200))
} else {
Some(summary)
}
}
// -- internal helpers --
async fn apply_update(&self, update: &ProfileFieldUpdate) -> Result<()> {
match update {
ProfileFieldUpdate::Industry(v) => {
self.store.update_field(DEFAULT_USER, "industry", v).await
}
ProfileFieldUpdate::Role(v) => {
self.store.update_field(DEFAULT_USER, "role", v).await
}
ProfileFieldUpdate::ExpertiseLevel(v) => {
let val = match v {
Level::Beginner => "beginner",
Level::Intermediate => "intermediate",
Level::Expert => "expert",
};
self.store.update_field(DEFAULT_USER, "expertise_level", val).await
}
ProfileFieldUpdate::CommunicationStyle(v) => {
let val = match v {
CommStyle::Concise => "concise",
CommStyle::Detailed => "detailed",
CommStyle::Formal => "formal",
CommStyle::Casual => "casual",
};
self.store.update_field(DEFAULT_USER, "communication_style", val).await
}
ProfileFieldUpdate::PreferredTool(tool) => {
self.store.add_preferred_tool(DEFAULT_USER, tool, 5).await
}
ProfileFieldUpdate::RecentTopic(topic) => {
self.store.add_recent_topic(DEFAULT_USER, topic, 10).await
}
}
}
async fn update_confidence(&self) {
if let Ok(Some(profile)) = self.store.get(DEFAULT_USER).await {
let filled = [
profile.industry.is_some(),
profile.role.is_some(),
profile.expertise_level.is_some(),
profile.communication_style.is_some(),
!profile.recent_topics.is_empty(),
].iter().filter(|&&x| x).count() as f32;
let confidence = (filled / 5.0).min(1.0);
let conf_str = format!("{:.2}", confidence);
if let Err(e) = self.store.update_field(DEFAULT_USER, "confidence", &conf_str).await {
warn!("[UserProfiler] Failed to update confidence: {}", e);
}
}
}
async fn get_or_create_profile(&self) -> UserProfile {
match self.store.get(DEFAULT_USER).await {
Ok(Some(p)) => p,
_ => UserProfile::default_profile(),
}
}
}
fn truncate(s: &str, max_chars: usize) -> String {
if s.chars().count() <= max_chars {
s.to_string()
} else {
s.chars().take(max_chars).collect::<String>() + ""
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_classify_communication_style() {
let fact = Fact::new("喜欢简洁的回答".to_string(), FactCategory::Preference, 0.8);
let update = classify_fact_content(&fact);
assert!(matches!(update, Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Concise))));
let fact2 = Fact::new("请详细说明".to_string(), FactCategory::Preference, 0.8);
let update2 = classify_fact_content(&fact2);
assert!(matches!(update2, Some(ProfileFieldUpdate::CommunicationStyle(CommStyle::Detailed))));
}
#[test]
fn test_classify_industry() {
let fact = Fact::new("我在医院工作".to_string(), FactCategory::Knowledge, 0.8);
let update = classify_fact_content(&fact);
assert!(matches!(update, Some(ProfileFieldUpdate::Industry(ref s)) if s == "医疗"));
}
#[test]
fn test_classify_role() {
let fact = Fact::new("我是行政主任".to_string(), FactCategory::Knowledge, 0.8);
let update = classify_fact_content(&fact);
assert!(matches!(update, Some(ProfileFieldUpdate::Role(ref s)) if s == "行政管理"));
}
#[test]
fn test_classify_expertise() {
let fact = Fact::new("我是新手".to_string(), FactCategory::Knowledge, 0.8);
let update = classify_fact_content(&fact);
assert!(matches!(update, Some(ProfileFieldUpdate::ExpertiseLevel(Level::Beginner))));
}
#[test]
fn test_classify_tool() {
let fact = Fact::new("帮我调研一下市场".to_string(), FactCategory::Preference, 0.8);
let update = classify_fact_content(&fact);
assert!(matches!(update, Some(ProfileFieldUpdate::PreferredTool(ref s)) if s == "researcher"));
}
#[test]
fn test_classify_topic_fallback() {
let fact = Fact::new("关于季度报告的编制流程".to_string(), FactCategory::Behavior, 0.7);
let update = classify_fact_content(&fact);
assert!(matches!(update, Some(ProfileFieldUpdate::RecentTopic(_))));
}
#[test]
fn test_classify_low_confidence_ignored() {
let fact = Fact::new("关于季度报告的编制流程".to_string(), FactCategory::Behavior, 0.3);
let update = classify_fact_content(&fact);
assert!(update.is_none());
}
#[test]
fn test_format_profile_summary() {
let profile = UserProfile {
user_id: "default_user".to_string(),
industry: Some("医疗".to_string()),
role: Some("行政主任".to_string()),
expertise_level: Some(Level::Intermediate),
communication_style: Some(CommStyle::Concise),
preferred_language: "zh-CN".to_string(),
recent_topics: vec!["排班管理".to_string()],
active_pain_points: vec![],
preferred_tools: vec![],
confidence: 0.6,
updated_at: Utc::now(),
};
let summary = UserProfiler::format_profile_summary(&profile, "排班");
assert!(summary.is_some());
let text = summary.unwrap();
assert!(text.contains("医疗"));
assert!(text.contains("行政主任"));
assert!(text.contains("排班管理"));
}
#[test]
fn test_format_profile_empty() {
let profile = UserProfile::default_profile();
let summary = UserProfiler::format_profile_summary(&profile, "test");
assert!(summary.is_none());
}
}

View File

@@ -0,0 +1,742 @@
# Hermes Intelligence Pipeline Design
> 基于 Hermes Agent (Nous Research) 竞品分析,吸收 4 个核心理念到 ZCLAW 的详细设计方案。
> 架构方案Pipeline Closure — 闭合现有管线断点,不引入新架构层。
## Context
Hermes Agent 验证了"一个管家 + 记忆飞轮"的方向,其 4 个核心创新对 ZCLAW 发布后迭代有直接参考价值:
1. **自我改进闭环** — 执行 → 评估 → 提取技能 → 改进 → 复用
2. **用户建模** — 三层记忆栈 + 统一用户画像
3. **自然语言 Cron** — LLM 解析自然语言为定时任务
4. **轨迹压缩** — 工具调用链 → 结构化 JSON → RL 基础
**关键诊断:** ZCLAW 缺的不是模块,是管线没接通。现有 PainAggregator、SolutionGenerator、Reflection、Heartbeat、MemoryExtractor 等组件已就位,但彼此断开。本设计闭合这些断点。
**范围约束:**
- 管家路由器ButlerRouterMiddleware + SemanticSkillRouter 接通)由另一个会话推进,本设计标注为外部依赖
- 发布后迭代,不影响当前发布计划
- 4 个理念全部设计,按优先级排序:自我改进闭环 > 用户建模 > NL Cron > 轨迹压缩
**总代码量估算:** ~2200 行新增/修改(~1700 新增 + ~500 修改)
### 类型约定
本设计使用以下 ID 类型约定:
```rust
// 所有 Rust 原生结构体使用强类型
use uuid::Uuid;
use zclaw_types::{AgentId, SessionId};
// 为新实体定义类型别名newtype wrapper 在 Tauri 命令层解包为 String
type ExperienceId = String; // Uuid::new_v4().to_string()
type ProposalId = String; // 与现有 Proposal.id 一致
type TrajectoryId = String; // Uuid::new_v4().to_string()
```
Rust 内部结构体使用 `AgentId``SessionId`Tauri 命令边界使用 `String`Tauri serialize 要求)。
### 统一完成状态枚举
跨 Section 1/4 使用统一的完成状态:
```rust
/// 通用完成状态,所有 Outcome 枚举的基础
enum CompletionStatus {
Success,
Partial,
Failed,
Abandoned, // Section 1 不使用此变体(运行时约定,非编译时约束)
}
```
Section 1 的 Experience 使用 `CompletionStatus`(不含 AbandonedSection 4 的 CompressedTrajectory 使用完整版。
---
## Section 1: 自我改进闭环
### 目标
用户反馈痛点 → 自动识别 → 自动生成方案 → 方案成功后提取为可复用经验 → 下次类似问题直接复用。
### 数据流
```
用户消息 → PainAggregator已有
↓ confidence >= 0.7
SolutionGenerator已有改为自动触发
↓ 生成 Proposal
等待用户反馈(成功/失败)
↓ 成功
ExperienceExtractor新增
↓ 生成结构化经验
ExperienceStore新增SQLite
↓ 下次对话
MemoryMiddleware已有注入相关经验
```
### 关键断点修复
**断点 1PainAggregator → SolutionGenerator未自动触发**
- 文件:`desktop/src-tauri/src/intelligence/pain_aggregator.rs`
-`confidence >= 0.7` 时,通过 Tauri event 自动调用 `butler_generate_solution`
- 新增 `PainConfirmedEvent` 事件结构体
**断点 2方案结果反馈无反馈机制**
- 新增 `ProposalFeedback` 结构体
- 在聊天流中检测用户隐式反馈关键词("好了""解决了""没用"
- 新增 Tauri 命令 `butler_submit_proposal_feedback`
**断点 3成功方案 → 结构化经验(完全缺失)**
- 新增 `ExperienceExtractor`:从成功方案中提取经验
- LLM 辅助提取(复用现有 LlmDriverfallback 到模板提取
- 存入 VikingStorage使用 scope 前缀 `experience://{agent_id}/`
**断点 4经验复用完全缺失**
- 扩展 `MemoryMiddleware`:用户新消息时,通过 VikingStorage 检索相关经验
- 使用 scope 过滤 `experience://` 前缀 + TF-IDF 相关性匹配
- 相似度 > 阈值时,注入"过往经验"到 system prompt
- 格式:`[过往经验] 类似情况 X 做过 Y结果是 Z`
### 数据结构
```rust
// 新增文件desktop/src-tauri/src/intelligence/experience.rs
use zclaw_types::AgentId;
use uuid::Uuid;
struct Experience {
id: ExperienceId,
agent_id: AgentId,
pain_pattern: String, // 触发模式(关键词摘要)
context: String, // 问题上下文
solution_steps: Vec<String>, // 解决步骤
outcome: CompletionStatus, // Success | Partial经验只记录成功的
source_proposal_id: Option<ProposalId>,
reuse_count: usize,
created_at: DateTime,
}
struct ProposalFeedback {
proposal_id: ProposalId,
outcome: CompletionStatus, // Success | Failed | Partial
user_comment: Option<String>,
detected_at: DateTime,
}
struct PainConfirmedEvent {
pain_point_id: String, // PainPoint.id (Uuid String)
pattern: String,
confidence: f32,
}
```
### 存储策略
经验存储在现有 VikingStorage 中,使用 scope 前缀区分:
```rust
// Experience 存储为 VikingStorage memory entry
scope: "agent://{agent_id}/experience/{pattern_hash}" // 遵循 OpenViking URI 约定
content: JSON(Experience) // 序列化的完整 Experience 结构体
```
**为什么不用独立的 experiences + FTS5 表:**
- VikingStorage 已有成熟的 FTS5 + TF-IDF + embedding 检索管道
- MemoryMiddleware 已与 VikingStorage 集成,增加 scope 前缀即可区分
- 避免维护两套独立的 FTS5 索引
独立的 `experience_store.rs` 文件负责 VikingStorage CRUD + scope 过滤,不创建新表。
### 迁移策略
不需要新数据库表或 schema 变更。经验数据通过 VikingStorage 的现有 memory 表存储,使用 scope 前缀区分。
### 错误处理
- ExperienceExtractor LLM 调用失败 → fallback 到模板提取(固定格式提取 solution_steps
- ProposalFeedback 检测失败 → 不阻塞对话,静默跳过
- 经验注入失败 → MemoryMiddleware 记录 warn 日志,不注入,不影响正常对话
- 所有错误遵循代码库约定:非关键路径使用 `log::warn!` / `log::error!`,不阻塞主流程
### 测试计划
| 测试目标 | 文件位置 | 覆盖场景 |
|----------|---------|---------|
| ExperienceExtractor | `experience.rs` 内联 `#[cfg(test)]` | LLM 提取成功/failure fallback、模板提取 |
| ExperienceStore | `experience_store.rs` 内联 | CRUD 往返、scope 过滤、VikingStorage 集成 |
| PainConfirmedEvent 触发 | `pain_aggregator.rs` 测试扩展 | confidence >= 0.7 触发事件 |
| 经验注入 | MemoryMiddleware 测试 | 相关性过滤、token 限制、空结果处理 |
| ProposalFeedback 检测 | `solution_generator.rs` 测试扩展 | 隐式反馈关键词匹配 |
### 文件清单
| 文件 | 用途 | 预估行数 |
|------|------|---------|
| `desktop/src-tauri/src/intelligence/experience.rs` | ExperienceExtractor + 逻辑 | ~250 |
| `crates/zclaw-growth/src/experience_store.rs` | VikingStorage scope CRUD | ~120 |
| 改动 `pain_aggregator.rs` | 自动触发 SolutionGenerator | ~30 |
| 改动 `solution_generator.rs` | Proposal feedback 槽位 | ~40 |
| 改动 `intelligence_hooks.rs` | 新增 post-proposal-evaluation hook | ~50 |
| 改动 MemoryMiddleware | 经验注入逻辑scope 过滤) | ~60 |
| 改动 `crates/zclaw-memory/src/lib.rs` | 导出新模块 | ~5 |
**预估:~555 行新增/修改**
---
## Section 2: 用户建模
### 目标
从每次对话中持续提取用户特征,聚合为结构化画像,注入到路由和生成环节。
### 数据流
```
对话消息 → MemoryExtractor已有
UserProfiler新增
↓ 聚合到 UserProfile
UserProfileStore新增SQLite
├→ ButlerRouter外部依赖另一个会话
│ → 路由决策考虑用户偏好
└→ MemoryMiddleware已有
→ system prompt 注入用户画像摘要
```
### 设计决策
**为什么新建 UserProfile 而不沿用 IdentityManager.user_profile**
现有 user_profile 是非结构化 markdown无法做条件查询。Profile injection 已被有意禁用(`identity.rs:291-298`),因为它导致模型过度关注旧话题。需要结构化画像做相关性过滤后注入。
**单用户桌面场景:** 桌面版使用 `"default_user"` 作为 user_id与 PainAggregator 一致),仅维护一条 UserProfile 记录。
### 数据结构
```rust
// 新增文件crates/zclaw-memory/src/user_profile_store.rs
struct UserProfile {
user_id: String, // "default_user"(桌面版单用户)
// 静态属性(低频更新)
industry: Option<String>, // "医疗" "制造业"
role: Option<String>, // "行政主任" "厂长"
expertise_level: Option<Level>, // Beginner / Intermediate / Expert
communication_style: Option<CommStyle>, // Concise / Detailed / Formal / Casual
preferred_language: String, // "zh-CN"
// 动态属性(高频更新)
recent_topics: Vec<String>, // 最近 7 天的话题
active_pain_points: Vec<String>, // 当前未解决痛点
preferred_tools: Vec<String>, // 常用技能/工具
// 元数据
updated_at: DateTime,
confidence: f32, // 画像置信度
}
enum Level { Beginner, Intermediate, Expert }
enum CommStyle { Concise, Detailed, Formal, Casual }
```
### 聚合逻辑UserProfiler
1. **MemoryExtractor 输出 → 分类**:已提取的记忆按 `UserPreference` / `UserFact` / `AgentLesson` 分类
2. **分类后聚合**
- `UserPreference` → 更新 `communication_style`, `preferred_tools`
- `UserFact` → 更新 `industry`, `role`, `expertise_level`
- `AgentLesson` → 更新 `recent_topics`
- PainAggregator 的活跃痛点 → 更新 `active_pain_points`
3. **去重 + 衰减**:相似属性合并,超过 30 天无佐证的属性降低 confidence
4. **存储**单用户单条记录upsertSQLite `user_profiles`
### 注入逻辑
```rust
// 在 MemoryMiddleware 中新增
fn inject_user_profile(&self, ctx: &mut MiddlewareContext, profile: &UserProfile) {
// 只注入与当前话题相关的属性
let relevant = self.filter_by_relevance(profile, &ctx.user_input);
if relevant.is_empty() { return; }
// 格式化为简洁摘要,不超过 100 tokens
let summary = format_user_profile_summary(&relevant);
ctx.system_prompt.push_str(&summary);
}
```
**关键约束:** 注入内容不超过 100 tokens只注入与当前话题相关的属性。
### 与管家路由器的协作(外部依赖)
当管家路由器接通后:
- ButlerRouterMiddleware 可读取 UserProfile.industry 和 role
- 路由时考虑用户背景
- 本设计只提供数据接口,路由逻辑由另一个会话处理
### 迁移策略
新增 `user_profiles` 表,通过 `schema.rs``MIGRATIONS` 数组递增版本。初始版本包含 CREATE TABLE + 默认 "default_user" 行。
```rust
// 在 schema.rs MIGRATIONS 数组新增
("CREATE TABLE IF NOT EXISTS user_profiles (...)", "DROP TABLE IF EXISTS user_profiles")
```
### 错误处理
- UserProfileStore 读写失败 → `log::warn!` + 返回 None不阻塞对话
- UserProfiler 聚合失败 → 保留上次有效画像,不覆盖
- Profile 注入失败 → MemoryMiddleware 降级到无 profile 注入模式
- 所有操作遵循:非关键路径错误不阻塞主流程
### 测试计划
| 测试目标 | 文件位置 | 覆盖场景 |
|----------|---------|---------|
| UserProfileStore | `user_profile_store.rs` 内联 | CRUD 往返、upsert 去重、JSON 字段序列化 |
| UserProfiler 聚合 | `user_profiler.rs` 内联 | 分类正确性、去重、衰减、空输入 |
| Profile 注入 | MemoryMiddleware 测试扩展 | 相关性过滤、100 token 限制、空 profile |
| 迁移 | schema 测试 | 新建 + 升级路径 |
### 数据库 Schema
```sql
CREATE TABLE IF NOT EXISTS user_profiles (
user_id TEXT PRIMARY KEY,
industry TEXT,
role TEXT,
expertise_level TEXT, -- 'Beginner' | 'Intermediate' | 'Expert'
communication_style TEXT, -- 'Concise' | 'Detailed' | 'Formal' | 'Casual'
preferred_language TEXT DEFAULT 'zh-CN',
recent_topics TEXT, -- JSON array
active_pain_points TEXT, -- JSON array
preferred_tools TEXT, -- JSON array
confidence REAL DEFAULT 0.0,
updated_at TEXT NOT NULL
);
```
### 文件清单
| 文件 | 用途 | 预估行数 |
|------|------|---------|
| `crates/zclaw-memory/src/user_profile_store.rs` | UserProfile 结构体 + SQLite CRUD | ~200 |
| `desktop/src-tauri/src/intelligence/user_profiler.rs` | 聚合逻辑 | ~180 |
| 改动 `MemoryMiddleware` | profile 注入(相关性过滤) | ~80 |
| 改动 `intelligence_hooks.rs` | post-extraction 触发 UserProfiler | ~30 |
| 改动 `crates/zclaw-memory/src/lib.rs` | 导出新模块 | ~5 |
**预估:~495 行新增/修改**
---
## Section 3: 自然语言 Cron
### 目标
用户说"每天早上9点提醒我查房" → 系统解析为 `0 9 * * *` → 自动创建定时任务。
### 数据流
```
用户消息(含时间意图)
意图分类ButlerRouter / 正则预检)
↓ 检测到"定时/提醒"意图
NlScheduleParser新增位于 zclaw-runtime
↓ 解析为 ParsedSchedule
ScheduleConfirmDialog新增
↓ 用户确认 "每天早上9点 → 0 9 * * *"
SchedulerService已有位于 zclaw-kernel
↓ 创建定时任务
TriggerManager已有
↓ 到时触发
Hand 执行(已有)
```
### 解析策略(三层 fallback
**Layer 1: 正则模式匹配(覆盖 80% 常见场景)**
| 模式 | 示例 | Cron |
|------|------|------|
| 每天 + 时间 | 每天早上9点 | `0 9 * * *` |
| 每周N + 时间 | 每周一上午10点 | `0 10 * * 1` |
| 工作日 + 时间 | 工作日下午3点 | `0 15 * * 1-5` |
| 每N小时 | 每2小时 | `0 */2 * * *` |
| 每月N号 | 每月1号 | `0 0 1 * *` |
| 相对时间 | 明天下午3点 | 一次性 ISO |
**Layer 2: LLM 辅助解析(覆盖模糊/复杂表述)**
- 使用 Haiku~50 tokens 输入,~20 tokens 输出)
- 处理如"下个月开始每周二和周四提醒我"
**Layer 3: 交互澄清(无法确定时)**
- "我理解您想设置定时任务,请确认:..."
### 数据结构
```rust
// 新增文件crates/zclaw-runtime/src/nl_schedule.rs
// 放在 runtime 层因为这是纯文本→cron工具不依赖 kernel 协调
use zclaw_types::AgentId;
struct ParsedSchedule {
cron_expression: String, // "0 9 * * *"
natural_description: String, // "每天早上9点"
confidence: f32,
task_description: String, // "查房提醒"
task_target: TaskTarget,
}
/// 定时任务目标
enum TaskTarget {
Agent(AgentId), // 触发指定 agent
Hand(String), // 触发指定 hand工具名
Workflow(String), // 触发指定 workflow名称
}
enum ScheduleParseResult {
Exact(ParsedSchedule), // 高置信度,直接确认
Ambiguous(Vec<ParsedSchedule>), // 多种理解,需选择
Unclear, // 需要澄清
}
```
### 确认流程
1. 用户说"每天早上9点提醒我查房"
2. 解析为 `{ cron: "0 9 * * *", desc: "查房提醒" }`
3. 系统回复:"好的,我为您设置了:**每天早上 9:00** 提醒查房。确认吗?"
4. 用户确认 → 调用已有 `SchedulerService.create_trigger()`
5. 用户修正 → 重新解析或手动编辑
### 迁移策略
不需要新数据库表。NlScheduleParser 是纯计算工具,输出通过现有 `SchedulerService` + `TriggerManager` 存储。
### 错误处理
- 正则匹配失败 → 尝试 Layer 2 LLM 解析
- LLM 解析失败 → 返回 `ScheduleParseResult::Unclear`,触发交互澄清
- 定时任务创建失败 → 向用户报告错误,建议手动设置
- 所有错误不阻塞对话流程
### 测试计划
| 测试目标 | 文件位置 | 覆盖场景 |
|----------|---------|---------|
| 正则解析 | `nl_schedule.rs` 内联 | 10+ 中文时间表述模式、边界值、无效输入 |
| LLM fallback | mock 测试 | LLM 返回无效 cron 时的容错 |
| ParsedSchedule | 单元测试 | 序列化、字段完整性 |
| TaskTarget 枚举 | 单元测试 | 各变体匹配现有类型 |
| 确认流程 | 集成测试 | 完整 parse → confirm → create 链路 |
### 文件清单
| 文件 | 用途 | 预估行数 |
|------|------|---------|
| `crates/zclaw-runtime/src/nl_schedule.rs` | NlScheduleParser + 中文模式库 | ~300 |
| 改动 `intelligence_hooks.rs` | 检测定时意图并触发解析 | ~40 |
| 改动 desktop store + component | 确认对话框交互 | ~150 |
| 改动 `crates/zclaw-kernel/src/scheduler.rs` | 接受 cron 字符串输入 | ~20 |
**预估:~510 行新增/修改**
---
## Section 4: 轨迹压缩
### 目标
记录完整的工具调用链(用户请求 → 意图分类 → 技能选择 → 执行步骤 → 结果 → 用户满意度),压缩为结构化 JSON作为未来 RL/改进的基础数据。
### 数据流
```
用户请求
AgentLoop已有
↓ 每步通过中间件记录
TrajectoryRecorderMiddleware新增实现 AgentMiddleware trait
↓ 异步写入 trajectory_events 表
↓ 会话结束时
TrajectoryCompressor新增
↓ 压缩为结构化 JSON
compressed_trajectories 表
↓ 可选
导出为 RL 训练数据格式
```
### 关键设计决策TrajectoryRecorder 作为中间件
TrajectoryRecorder 实现 `AgentMiddleware` trait来自 `zclaw-runtime`),利用现有中间件钩子:
- `before_completion` → 记录 `UserRequest` 步骤
- `after_tool_call` → 记录 `ToolExecution` 步骤
- `after_completion` → 记录 `LlmGeneration` 步骤 + 会话结束时触发压缩
**为什么不用自定义 AgentLoop hook**
- 现有中间件系统已提供所有需要的钩子点
- `MiddlewareContext` 已暴露 `agent_id``session_id``user_input``input_tokens``output_tokens`
- 符合 Pipeline Closure 原则:不引入新架构层
优先级设置600-799 范围(遥测类别),确保在业务中间件之后运行。注意现有 `token_calibration` 中间件已占用优先级 700推荐使用 650。
### 数据结构
```rust
// 新增文件crates/zclaw-memory/src/trajectory_store.rs
use zclaw_types::{AgentId, SessionId};
use uuid::Uuid;
/// 单条轨迹事件(细粒度,按步骤记录)
struct TrajectoryEvent {
id: TrajectoryId,
session_id: SessionId,
agent_id: AgentId,
step_index: usize,
step_type: TrajectoryStepType,
input_summary: String, // ≤200 字
output_summary: String, // ≤200 字
duration_ms: u64,
timestamp: DateTime,
}
enum TrajectoryStepType {
UserRequest, // 用户原始请求
IntentClassification, // 意图分类结果
SkillSelection, // 选择了哪个技能
ToolExecution, // 工具调用
LlmGeneration, // LLM 生成
UserFeedback, // 用户反馈
}
/// 压缩后的完整轨迹(会话结束时生成)
struct CompressedTrajectory {
id: TrajectoryId,
session_id: SessionId,
agent_id: AgentId,
request_type: String, // "data_report" "policy_query"
tools_used: Vec<String>, // ["researcher", "collector"]
outcome: CompletionStatus, // Success | Partial | Failed | Abandoned
total_steps: usize,
total_duration_ms: u64,
total_tokens: u32,
execution_chain: String, // JSON: [{step, tool, result_summary}]
satisfaction_signal: Option<SatisfactionSignal>,
created_at: DateTime,
}
enum SatisfactionSignal {
Positive, // "谢谢""很好""解决了"
Negative, // "不对""没用""还是不行"
Neutral, // 无明显信号
}
```
### 记录策略
**低开销原则:** 轨迹记录不能影响正常对话性能。
1. **事件记录:** 每步只存 `step_type + input_summary(≤200字) + output_summary(≤200字)`
2. **异步写入:** 通过 `tokio::spawn` 异步写入 SQLite不阻塞主流程
3. **压缩触发:** 会话结束时compactor flush 或 session close异步压缩
4. **保留策略:** 压缩后删除原始事件(保留 7 天),压缩轨迹保留 90 天
### 压缩算法
```rust
fn compress(events: Vec<TrajectoryEvent>) -> CompressedTrajectory {
// 1. 提取关键步骤(跳过中间重试/错误恢复)
// 2. 合并连续相同类型的步骤
// 3. 生成 execution_chain JSON
// 4. 推断 outcome最后一步是否成功 + 用户反馈信号)
// 5. 统计 token 用量和耗时
}
```
### 与自我改进闭环的协作
当 ExperienceExtractor 运行时:
- 查询 `compressed_trajectories` 找到类似场景的历史轨迹
- 评估"这个方案上次用了几步?成功率多少?"
- 为经验提取提供数据支撑
### 未来 RL 扩展(本次不实施)
- `execution_chain` 可直接转换为 Atropos/GEPA 训练格式
- `satisfaction_signal` 可作为 reward signal
- RL 训练管道不在本次范围内
### 迁移策略
通过 `schema.rs``MIGRATIONS` 数组递增版本(使用 `&[&str]` 扁平数组格式,与现有代码一致),新增 `trajectory_events``compressed_trajectories` 两张表。
```rust
// 在 schema.rs MIGRATIONS 数组新增(扁平 &str 数组,无 down migration
&[
"CREATE TABLE IF NOT EXISTS trajectory_events (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
...
);
CREATE TABLE IF NOT EXISTS compressed_trajectories (
...
);
CREATE INDEX IF NOT EXISTS idx_trajectory_session ON trajectory_events(session_id);",
]
```
### 错误处理
- TrajectoryRecorder 异步写入失败 → `log::warn!`,不重试,丢弃单条事件
- TrajectoryCompressor 压缩失败 → `log::warn!`,原始事件保留 7 天后自动清理
- 压缩轨迹查询失败 → ExperienceExtractor 降级到无历史数据模式
- 所有操作:非关键路径错误不阻塞对话
### 测试计划
| 测试目标 | 文件位置 | 覆盖场景 |
|----------|---------|---------|
| TrajectoryStore CRUD | `trajectory_store.rs` 内联 | 插入/查询/删除、session 过滤 |
| 压缩算法 | `trajectory_compressor.rs` 内联 | 正常压缩、空事件、单步事件、合并去重 |
| TrajectoryRecorderMiddleware | 中间件测试 | before/after 钩子记录、空输入跳过 |
| 保留策略 | 集成测试 | 7 天清理、90 天清理 |
| 满意度检测 | 单元测试 | 正/负/中性关键词匹配 |
### 数据库 Schema
```sql
CREATE TABLE IF NOT EXISTS trajectory_events (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
step_index INTEGER NOT NULL,
step_type TEXT NOT NULL,
input_summary TEXT,
output_summary TEXT,
duration_ms INTEGER DEFAULT 0,
timestamp TEXT NOT NULL
);
CREATE INDEX idx_trajectory_session ON trajectory_events(session_id);
CREATE TABLE IF NOT EXISTS compressed_trajectories (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
request_type TEXT NOT NULL,
tools_used TEXT, -- JSON array
outcome TEXT NOT NULL, -- 'Success'|'Partial'|'Failed'|'Abandoned'
total_steps INTEGER DEFAULT 0,
total_duration_ms INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
execution_chain TEXT NOT NULL, -- JSON
satisfaction_signal TEXT, -- 'Positive'|'Negative'|'Neutral'|NULL
created_at TEXT NOT NULL
);
CREATE INDEX idx_ct_request_type ON compressed_trajectories(request_type);
CREATE INDEX idx_ct_outcome ON compressed_trajectories(outcome);
```
### 文件清单
| 文件 | 用途 | 预估行数 |
|------|------|---------|
| `crates/zclaw-memory/src/trajectory_store.rs` | TrajectoryEvent + CompressedTrajectory + SQLite CRUD | ~250 |
| `crates/zclaw-runtime/src/middleware/trajectory_recorder.rs` | AgentMiddleware 实现 | ~150 |
| `desktop/src-tauri/src/intelligence/trajectory_compressor.rs` | 压缩算法 | ~120 |
| 改动 `crates/zclaw-memory/src/lib.rs` | 导出新模块 | ~5 |
| 改动 `crates/zclaw-kernel/src/kernel/mod.rs` | 注册中间件priority 650 | ~10 |
**预估:~535 行新增/修改**
---
## 总览
### 代码量汇总
| 理念 | 新增 | 修改 | 总计 | 优先级 |
|------|------|------|------|--------|
| 自我改进闭环 | ~400 | ~155 | ~555 | P1 |
| 用户建模 | ~380 | ~115 | ~495 | P2 |
| 自然语言 Cron | ~320 | ~190 | ~510 | P3 |
| 轨迹压缩 | ~525 | ~15 | ~540 | P4 |
| **总计** | **~1625** | **~475** | **~2100** | — |
### 实施顺序和依赖关系
```
Section 1 (自我改进闭环) ← 立即开始
Section 2 (用户建模) ← 可与 Section 1 并行,无强依赖
Section 3 (NL Cron) ← 依赖 Section 2 的 UserProfile可选+ 管家路由器(外部)
Section 4 (轨迹压缩) ← 可与 Section 1-3 并行,无依赖
```
Section 1 和 2 可以并行开发。Section 3 建议在管家路由器接通后实施。Section 4 完全独立。
### 外部依赖
- 管家路由器ButlerRouterMiddleware + SemanticSkillRouter 接通)— 另一个会话推进
- 痛点数据持久化(内存 → SQLite— 已在 pre-release strategy 中规划
### intelligence_hooks.rs 管理
当前 `intelligence_hooks.rs` 约 281 行。本设计新增约 120 行钩子代码Section 1: ~50, Section 2: ~30, Section 3: ~40
如果文件超过 400 行,应拆分为 `hooks/` 子模块:
- `hooks/pain.rs` — 痛点相关钩子
- `hooks/profile.rs` — 用户画像钩子
- `hooks/schedule.rs` — 定时任务意图检测
- `hooks/mod.rs` — 统一注册
### 验证方式
每个 Section 完成后的验证步骤:
1. **自我改进闭环:** 人工模拟痛点对话 → 验证自动生成方案 → 验证经验提取 → 验证经验复用注入
2. **用户建模:** 多轮对话 → 检查 UserProfile 各字段是否正确聚合 → 验证注入内容相关性
3. **NL Cron** 测试 10+ 种中文时间表述 → 验证 cron 输出 → 验证定时任务创建
4. **轨迹压缩:** 完整对话流程 → 检查 trajectory_events 记录 → 验证压缩结果 → 检查异步写入无阻塞
### 验证命令
```bash
# Rust 编译检查
cargo check --workspace --exclude zclaw-saas
# Rust 测试
cargo test --workspace --exclude zclaw-saas
# TypeScript 类型检查
cd desktop && pnpm tsc --noEmit
# 前端测试
cd desktop && pnpm vitest run
```