Files
zclaw_openfang/desktop/src-tauri/src/intelligence/trajectory_compressor.rs
iven 4b15ead8e7 feat(hermes): implement intelligence pipeline — 4 chunks, 684 tests passing
Hermes Intelligence Pipeline closes breakpoints in ZCLAW's existing
intelligence components with 4 self-contained modules:

Chunk 1 — Self-improvement Loop:
- ExperienceStore (zclaw-growth): FTS5+TF-IDF wrapper with scope prefix
- ExperienceExtractor (desktop/intelligence): template-based extraction
  from successful proposals with implicit keyword detection

Chunk 2 — User Modeling:
- UserProfileStore (zclaw-memory): SQLite-backed structured profiles
  with industry/role/expertise/comm_style/recent_topics/pain_points
- UserProfiler (desktop/intelligence): fact classification by category
  (Preference/Knowledge/Behavior) with profile summary formatting

Chunk 3 — NL Cron Chinese Time Parser:
- NlScheduleParser (zclaw-runtime): 6 pattern matchers for Chinese time
  expressions (每天/每周/工作日/间隔/每月/一次性) producing cron expressions
- Period-aware hour adjustment (下午3点→15, 晚上8点→20)
- Schedule intent detection + task description extraction

Chunk 4 — Trajectory Compression:
- TrajectoryStore (zclaw-memory): trajectory_events + compressed_trajectories
- TrajectoryRecorderMiddleware (zclaw-runtime/middleware): priority 650,
  async non-blocking event recording via tokio::spawn
- TrajectoryCompressor (desktop/intelligence): dedup, request classification,
  satisfaction detection, execution chain JSON

Schema migrations: v2→v3 (user_profiles), v3→v4 (trajectory tables)
2026-04-09 17:47:43 +08:00

329 lines
11 KiB
Rust

//! Trajectory Compressor — compresses raw events into structured trajectories.
//!
//! Takes a list of `TrajectoryEvent` records and produces a single
//! `CompressedTrajectory` summarising the session. Called at session end
//! (or compaction flush) to reduce storage and prepare data for analysis.
use chrono::Utc;
use zclaw_memory::trajectory_store::{
CompressedTrajectory, CompletionStatus, SatisfactionSignal, TrajectoryEvent, TrajectoryStepType,
};
// ---------------------------------------------------------------------------
// Satisfaction detection
// ---------------------------------------------------------------------------
const POSITIVE_SIGNALS: &[&str] = &[
"谢谢", "很好", "解决了", "可以了", "对了", "完美",
"", "不错", "成功了", "行了", "搞定",
];
const NEGATIVE_SIGNALS: &[&str] = &[
"不对", "没用", "还是不行", "错了", "差太远",
"不好使", "不管用", "没效果", "失败", "不行",
];
/// Detect user satisfaction from the last few messages.
pub fn detect_satisfaction(last_messages: &[String]) -> Option<SatisfactionSignal> {
if last_messages.is_empty() {
return None;
}
// Check the last user messages for satisfaction signals
for msg in last_messages.iter().rev().take(3) {
let lower = msg.to_lowercase();
for kw in POSITIVE_SIGNALS {
if lower.contains(kw) {
return Some(SatisfactionSignal::Positive);
}
}
for kw in NEGATIVE_SIGNALS {
if lower.contains(kw) {
return Some(SatisfactionSignal::Negative);
}
}
}
Some(SatisfactionSignal::Neutral)
}
// ---------------------------------------------------------------------------
// Compression
// ---------------------------------------------------------------------------
/// Compress a sequence of trajectory events into a single summary.
///
/// Returns `None` if the events list is empty.
pub fn compress(
events: Vec<TrajectoryEvent>,
satisfaction: Option<SatisfactionSignal>,
) -> Option<CompressedTrajectory> {
if events.is_empty() {
return None;
}
let session_id = events[0].session_id.clone();
let agent_id = events[0].agent_id.clone();
// Extract key steps (skip retries — consecutive same-type steps)
let key_events = deduplicate_steps(&events);
let request_type = infer_request_type(&key_events);
let tools_used = extract_tools(&key_events);
let total_steps = key_events.len();
let total_duration_ms: u64 = events.iter().map(|e| e.duration_ms).sum();
let outcome = infer_outcome(&key_events, satisfaction);
let execution_chain = build_chain_json(&key_events);
Some(CompressedTrajectory {
id: uuid::Uuid::new_v4().to_string(),
session_id,
agent_id,
request_type,
tools_used,
outcome,
total_steps,
total_duration_ms,
total_tokens: 0, // filled by middleware from context
execution_chain,
satisfaction_signal: satisfaction,
created_at: Utc::now(),
})
}
/// Remove consecutive duplicate step types (retries/error recovery).
fn deduplicate_steps(events: &[TrajectoryEvent]) -> Vec<&TrajectoryEvent> {
let mut result = Vec::new();
let mut last_type: Option<TrajectoryStepType> = None;
for event in events {
// Keep first occurrence of each step type change
if last_type != Some(event.step_type) {
result.push(event);
last_type = Some(event.step_type);
}
}
// If we deduplicated everything away, keep the first and last
if result.is_empty() && !events.is_empty() {
result.push(&events[0]);
if events.len() > 1 {
result.push(&events[events.len() - 1]);
}
}
result
}
/// Infer request type from the first user request event.
fn infer_request_type(events: &[&TrajectoryEvent]) -> String {
for event in events {
if event.step_type == TrajectoryStepType::UserRequest {
let input = &event.input_summary;
return classify_request(input);
}
}
"general".to_string()
}
fn classify_request(input: &str) -> String {
let lower = input.to_lowercase();
if ["报告", "数据", "统计", "报表", "汇总"].iter().any(|k| lower.contains(k)) {
return "data_report".into();
}
if ["政策", "法规", "合规", "标准"].iter().any(|k| lower.contains(k)) {
return "policy_query".into();
}
if ["查房", "巡房"].iter().any(|k| lower.contains(k)) {
return "inspection".into();
}
if ["排班", "值班"].iter().any(|k| lower.contains(k)) {
return "scheduling".into();
}
if ["会议", "日程", "安排", "提醒"].iter().any(|k| lower.contains(k)) {
return "meeting".into();
}
if ["检查"].iter().any(|k| lower.contains(k)) {
return "inspection".into();
}
"general".to_string()
}
/// Extract unique tool names from ToolExecution events.
fn extract_tools(events: &[&TrajectoryEvent]) -> Vec<String> {
let mut tools = Vec::new();
let mut seen = std::collections::HashSet::new();
for event in events {
if event.step_type == TrajectoryStepType::ToolExecution {
let tool = event.input_summary.clone();
if !tool.is_empty() && seen.insert(tool.clone()) {
tools.push(tool);
}
}
}
tools
}
/// Infer completion outcome from last step + satisfaction signal.
fn infer_outcome(
events: &[&TrajectoryEvent],
satisfaction: Option<SatisfactionSignal>,
) -> CompletionStatus {
match satisfaction {
Some(SatisfactionSignal::Positive) => CompletionStatus::Success,
Some(SatisfactionSignal::Negative) => CompletionStatus::Failed,
Some(SatisfactionSignal::Neutral) => {
// Check if last meaningful step was a successful LLM generation
if events.iter().any(|e| e.step_type == TrajectoryStepType::LlmGeneration) {
CompletionStatus::Partial
} else {
CompletionStatus::Abandoned
}
}
None => CompletionStatus::Partial,
}
}
/// Build JSON execution chain from key events.
fn build_chain_json(events: &[&TrajectoryEvent]) -> String {
let chain: Vec<serde_json::Value> = events.iter().map(|e| {
serde_json::json!({
"step": e.step_index,
"type": e.step_type.as_str(),
"input": truncate(&e.input_summary, 100),
"output": truncate(&e.output_summary, 100),
})
}).collect();
serde_json::to_string(&chain).unwrap_or_else(|_| "[]".to_string())
}
fn truncate(s: &str, max: usize) -> String {
if s.chars().count() <= max {
s.to_string()
} else {
s.chars().take(max).collect::<String>() + ""
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn make_event(index: usize, step_type: TrajectoryStepType, input: &str, output: &str) -> TrajectoryEvent {
TrajectoryEvent {
id: format!("evt-{}", index),
session_id: "sess-1".to_string(),
agent_id: "agent-1".to_string(),
step_index: index,
step_type,
input_summary: input.to_string(),
output_summary: output.to_string(),
duration_ms: 100,
timestamp: Utc::now(),
}
}
#[test]
fn test_compress_empty() {
assert!(compress(vec![], None).is_none());
}
#[test]
fn test_compress_single_event() {
let events = vec![make_event(0, TrajectoryStepType::UserRequest, "帮我查数据", "")];
let ct = compress(events, None).unwrap();
assert_eq!(ct.session_id, "sess-1");
assert_eq!(ct.total_steps, 1);
}
#[test]
fn test_compress_full_chain() {
let events = vec![
make_event(0, TrajectoryStepType::UserRequest, "帮我生成月度报告", ""),
make_event(1, TrajectoryStepType::ToolExecution, "collector", "5条数据"),
make_event(2, TrajectoryStepType::LlmGeneration, "", "报告已生成"),
];
let ct = compress(events, Some(SatisfactionSignal::Positive)).unwrap();
assert_eq!(ct.request_type, "data_report");
assert_eq!(ct.tools_used, vec!["collector"]);
assert_eq!(ct.outcome, CompletionStatus::Success);
assert!(ct.execution_chain.starts_with('['));
}
#[test]
fn test_deduplicate_retries() {
let events = vec![
make_event(0, TrajectoryStepType::ToolExecution, "tool-a", "err"),
make_event(1, TrajectoryStepType::ToolExecution, "tool-a", "ok"),
make_event(2, TrajectoryStepType::LlmGeneration, "", "done"),
];
let deduped = deduplicate_steps(&events);
assert_eq!(deduped.len(), 2); // first ToolExecution + LlmGeneration
}
#[test]
fn test_classify_request() {
assert_eq!(classify_request("帮我生成月度报告"), "data_report");
assert_eq!(classify_request("最新的合规政策是什么"), "policy_query");
assert_eq!(classify_request("明天有什么会议"), "meeting");
assert_eq!(classify_request("查房安排"), "inspection");
assert_eq!(classify_request("你好"), "general");
}
#[test]
fn test_detect_satisfaction_positive() {
let msgs = vec!["谢谢,很好用".to_string()];
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Positive));
}
#[test]
fn test_detect_satisfaction_negative() {
let msgs = vec!["还是不行啊".to_string()];
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Negative));
}
#[test]
fn test_detect_satisfaction_neutral() {
let msgs = vec!["好的我知道了".to_string()];
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Neutral));
}
#[test]
fn test_detect_satisfaction_empty() {
assert_eq!(detect_satisfaction(&[]), None);
}
#[test]
fn test_infer_outcome() {
let events = vec![make_event(0, TrajectoryStepType::LlmGeneration, "", "ok")];
assert_eq!(
infer_outcome(&events.iter().collect::<Vec<_>>(), Some(SatisfactionSignal::Positive)),
CompletionStatus::Success
);
assert_eq!(
infer_outcome(&events.iter().collect::<Vec<_>>(), Some(SatisfactionSignal::Negative)),
CompletionStatus::Failed
);
}
#[test]
fn test_extract_tools_dedup() {
let events = vec![
make_event(0, TrajectoryStepType::ToolExecution, "researcher", ""),
make_event(1, TrajectoryStepType::ToolExecution, "researcher", ""),
make_event(2, TrajectoryStepType::ToolExecution, "collector", ""),
];
let refs: Vec<&TrajectoryEvent> = events.iter().collect();
let tools = extract_tools(&refs);
assert_eq!(tools, vec!["researcher", "collector"]);
}
}