Hermes Intelligence Pipeline closes breakpoints in ZCLAW's existing intelligence components with 4 self-contained modules: Chunk 1 — Self-improvement Loop: - ExperienceStore (zclaw-growth): FTS5+TF-IDF wrapper with scope prefix - ExperienceExtractor (desktop/intelligence): template-based extraction from successful proposals with implicit keyword detection Chunk 2 — User Modeling: - UserProfileStore (zclaw-memory): SQLite-backed structured profiles with industry/role/expertise/comm_style/recent_topics/pain_points - UserProfiler (desktop/intelligence): fact classification by category (Preference/Knowledge/Behavior) with profile summary formatting Chunk 3 — NL Cron Chinese Time Parser: - NlScheduleParser (zclaw-runtime): 6 pattern matchers for Chinese time expressions (每天/每周/工作日/间隔/每月/一次性) producing cron expressions - Period-aware hour adjustment (下午3点→15, 晚上8点→20) - Schedule intent detection + task description extraction Chunk 4 — Trajectory Compression: - TrajectoryStore (zclaw-memory): trajectory_events + compressed_trajectories - TrajectoryRecorderMiddleware (zclaw-runtime/middleware): priority 650, async non-blocking event recording via tokio::spawn - TrajectoryCompressor (desktop/intelligence): dedup, request classification, satisfaction detection, execution chain JSON Schema migrations: v2→v3 (user_profiles), v3→v4 (trajectory tables)
329 lines
11 KiB
Rust
329 lines
11 KiB
Rust
//! Trajectory Compressor — compresses raw events into structured trajectories.
|
|
//!
|
|
//! Takes a list of `TrajectoryEvent` records and produces a single
|
|
//! `CompressedTrajectory` summarising the session. Called at session end
|
|
//! (or compaction flush) to reduce storage and prepare data for analysis.
|
|
|
|
use chrono::Utc;
|
|
use zclaw_memory::trajectory_store::{
|
|
CompressedTrajectory, CompletionStatus, SatisfactionSignal, TrajectoryEvent, TrajectoryStepType,
|
|
};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Satisfaction detection
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const POSITIVE_SIGNALS: &[&str] = &[
|
|
"谢谢", "很好", "解决了", "可以了", "对了", "完美",
|
|
"棒", "不错", "成功了", "行了", "搞定",
|
|
];
|
|
|
|
const NEGATIVE_SIGNALS: &[&str] = &[
|
|
"不对", "没用", "还是不行", "错了", "差太远",
|
|
"不好使", "不管用", "没效果", "失败", "不行",
|
|
];
|
|
|
|
/// Detect user satisfaction from the last few messages.
|
|
pub fn detect_satisfaction(last_messages: &[String]) -> Option<SatisfactionSignal> {
|
|
if last_messages.is_empty() {
|
|
return None;
|
|
}
|
|
|
|
// Check the last user messages for satisfaction signals
|
|
for msg in last_messages.iter().rev().take(3) {
|
|
let lower = msg.to_lowercase();
|
|
for kw in POSITIVE_SIGNALS {
|
|
if lower.contains(kw) {
|
|
return Some(SatisfactionSignal::Positive);
|
|
}
|
|
}
|
|
for kw in NEGATIVE_SIGNALS {
|
|
if lower.contains(kw) {
|
|
return Some(SatisfactionSignal::Negative);
|
|
}
|
|
}
|
|
}
|
|
|
|
Some(SatisfactionSignal::Neutral)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Compression
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Compress a sequence of trajectory events into a single summary.
|
|
///
|
|
/// Returns `None` if the events list is empty.
|
|
pub fn compress(
|
|
events: Vec<TrajectoryEvent>,
|
|
satisfaction: Option<SatisfactionSignal>,
|
|
) -> Option<CompressedTrajectory> {
|
|
if events.is_empty() {
|
|
return None;
|
|
}
|
|
|
|
let session_id = events[0].session_id.clone();
|
|
let agent_id = events[0].agent_id.clone();
|
|
|
|
// Extract key steps (skip retries — consecutive same-type steps)
|
|
let key_events = deduplicate_steps(&events);
|
|
|
|
let request_type = infer_request_type(&key_events);
|
|
let tools_used = extract_tools(&key_events);
|
|
let total_steps = key_events.len();
|
|
let total_duration_ms: u64 = events.iter().map(|e| e.duration_ms).sum();
|
|
let outcome = infer_outcome(&key_events, satisfaction);
|
|
let execution_chain = build_chain_json(&key_events);
|
|
|
|
Some(CompressedTrajectory {
|
|
id: uuid::Uuid::new_v4().to_string(),
|
|
session_id,
|
|
agent_id,
|
|
request_type,
|
|
tools_used,
|
|
outcome,
|
|
total_steps,
|
|
total_duration_ms,
|
|
total_tokens: 0, // filled by middleware from context
|
|
execution_chain,
|
|
satisfaction_signal: satisfaction,
|
|
created_at: Utc::now(),
|
|
})
|
|
}
|
|
|
|
/// Remove consecutive duplicate step types (retries/error recovery).
|
|
fn deduplicate_steps(events: &[TrajectoryEvent]) -> Vec<&TrajectoryEvent> {
|
|
let mut result = Vec::new();
|
|
let mut last_type: Option<TrajectoryStepType> = None;
|
|
|
|
for event in events {
|
|
// Keep first occurrence of each step type change
|
|
if last_type != Some(event.step_type) {
|
|
result.push(event);
|
|
last_type = Some(event.step_type);
|
|
}
|
|
}
|
|
|
|
// If we deduplicated everything away, keep the first and last
|
|
if result.is_empty() && !events.is_empty() {
|
|
result.push(&events[0]);
|
|
if events.len() > 1 {
|
|
result.push(&events[events.len() - 1]);
|
|
}
|
|
}
|
|
|
|
result
|
|
}
|
|
|
|
/// Infer request type from the first user request event.
|
|
fn infer_request_type(events: &[&TrajectoryEvent]) -> String {
|
|
for event in events {
|
|
if event.step_type == TrajectoryStepType::UserRequest {
|
|
let input = &event.input_summary;
|
|
return classify_request(input);
|
|
}
|
|
}
|
|
"general".to_string()
|
|
}
|
|
|
|
fn classify_request(input: &str) -> String {
|
|
let lower = input.to_lowercase();
|
|
if ["报告", "数据", "统计", "报表", "汇总"].iter().any(|k| lower.contains(k)) {
|
|
return "data_report".into();
|
|
}
|
|
if ["政策", "法规", "合规", "标准"].iter().any(|k| lower.contains(k)) {
|
|
return "policy_query".into();
|
|
}
|
|
if ["查房", "巡房"].iter().any(|k| lower.contains(k)) {
|
|
return "inspection".into();
|
|
}
|
|
if ["排班", "值班"].iter().any(|k| lower.contains(k)) {
|
|
return "scheduling".into();
|
|
}
|
|
if ["会议", "日程", "安排", "提醒"].iter().any(|k| lower.contains(k)) {
|
|
return "meeting".into();
|
|
}
|
|
if ["检查"].iter().any(|k| lower.contains(k)) {
|
|
return "inspection".into();
|
|
}
|
|
"general".to_string()
|
|
}
|
|
|
|
/// Extract unique tool names from ToolExecution events.
|
|
fn extract_tools(events: &[&TrajectoryEvent]) -> Vec<String> {
|
|
let mut tools = Vec::new();
|
|
let mut seen = std::collections::HashSet::new();
|
|
|
|
for event in events {
|
|
if event.step_type == TrajectoryStepType::ToolExecution {
|
|
let tool = event.input_summary.clone();
|
|
if !tool.is_empty() && seen.insert(tool.clone()) {
|
|
tools.push(tool);
|
|
}
|
|
}
|
|
}
|
|
|
|
tools
|
|
}
|
|
|
|
/// Infer completion outcome from last step + satisfaction signal.
|
|
fn infer_outcome(
|
|
events: &[&TrajectoryEvent],
|
|
satisfaction: Option<SatisfactionSignal>,
|
|
) -> CompletionStatus {
|
|
match satisfaction {
|
|
Some(SatisfactionSignal::Positive) => CompletionStatus::Success,
|
|
Some(SatisfactionSignal::Negative) => CompletionStatus::Failed,
|
|
Some(SatisfactionSignal::Neutral) => {
|
|
// Check if last meaningful step was a successful LLM generation
|
|
if events.iter().any(|e| e.step_type == TrajectoryStepType::LlmGeneration) {
|
|
CompletionStatus::Partial
|
|
} else {
|
|
CompletionStatus::Abandoned
|
|
}
|
|
}
|
|
None => CompletionStatus::Partial,
|
|
}
|
|
}
|
|
|
|
/// Build JSON execution chain from key events.
|
|
fn build_chain_json(events: &[&TrajectoryEvent]) -> String {
|
|
let chain: Vec<serde_json::Value> = events.iter().map(|e| {
|
|
serde_json::json!({
|
|
"step": e.step_index,
|
|
"type": e.step_type.as_str(),
|
|
"input": truncate(&e.input_summary, 100),
|
|
"output": truncate(&e.output_summary, 100),
|
|
})
|
|
}).collect();
|
|
|
|
serde_json::to_string(&chain).unwrap_or_else(|_| "[]".to_string())
|
|
}
|
|
|
|
fn truncate(s: &str, max: usize) -> String {
|
|
if s.chars().count() <= max {
|
|
s.to_string()
|
|
} else {
|
|
s.chars().take(max).collect::<String>() + "…"
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Tests
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use chrono::Utc;
|
|
|
|
fn make_event(index: usize, step_type: TrajectoryStepType, input: &str, output: &str) -> TrajectoryEvent {
|
|
TrajectoryEvent {
|
|
id: format!("evt-{}", index),
|
|
session_id: "sess-1".to_string(),
|
|
agent_id: "agent-1".to_string(),
|
|
step_index: index,
|
|
step_type,
|
|
input_summary: input.to_string(),
|
|
output_summary: output.to_string(),
|
|
duration_ms: 100,
|
|
timestamp: Utc::now(),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_compress_empty() {
|
|
assert!(compress(vec![], None).is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn test_compress_single_event() {
|
|
let events = vec![make_event(0, TrajectoryStepType::UserRequest, "帮我查数据", "")];
|
|
let ct = compress(events, None).unwrap();
|
|
assert_eq!(ct.session_id, "sess-1");
|
|
assert_eq!(ct.total_steps, 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_compress_full_chain() {
|
|
let events = vec![
|
|
make_event(0, TrajectoryStepType::UserRequest, "帮我生成月度报告", ""),
|
|
make_event(1, TrajectoryStepType::ToolExecution, "collector", "5条数据"),
|
|
make_event(2, TrajectoryStepType::LlmGeneration, "", "报告已生成"),
|
|
];
|
|
let ct = compress(events, Some(SatisfactionSignal::Positive)).unwrap();
|
|
assert_eq!(ct.request_type, "data_report");
|
|
assert_eq!(ct.tools_used, vec!["collector"]);
|
|
assert_eq!(ct.outcome, CompletionStatus::Success);
|
|
assert!(ct.execution_chain.starts_with('['));
|
|
}
|
|
|
|
#[test]
|
|
fn test_deduplicate_retries() {
|
|
let events = vec![
|
|
make_event(0, TrajectoryStepType::ToolExecution, "tool-a", "err"),
|
|
make_event(1, TrajectoryStepType::ToolExecution, "tool-a", "ok"),
|
|
make_event(2, TrajectoryStepType::LlmGeneration, "", "done"),
|
|
];
|
|
let deduped = deduplicate_steps(&events);
|
|
assert_eq!(deduped.len(), 2); // first ToolExecution + LlmGeneration
|
|
}
|
|
|
|
#[test]
|
|
fn test_classify_request() {
|
|
assert_eq!(classify_request("帮我生成月度报告"), "data_report");
|
|
assert_eq!(classify_request("最新的合规政策是什么"), "policy_query");
|
|
assert_eq!(classify_request("明天有什么会议"), "meeting");
|
|
assert_eq!(classify_request("查房安排"), "inspection");
|
|
assert_eq!(classify_request("你好"), "general");
|
|
}
|
|
|
|
#[test]
|
|
fn test_detect_satisfaction_positive() {
|
|
let msgs = vec!["谢谢,很好用".to_string()];
|
|
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Positive));
|
|
}
|
|
|
|
#[test]
|
|
fn test_detect_satisfaction_negative() {
|
|
let msgs = vec!["还是不行啊".to_string()];
|
|
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Negative));
|
|
}
|
|
|
|
#[test]
|
|
fn test_detect_satisfaction_neutral() {
|
|
let msgs = vec!["好的我知道了".to_string()];
|
|
assert_eq!(detect_satisfaction(&msgs), Some(SatisfactionSignal::Neutral));
|
|
}
|
|
|
|
#[test]
|
|
fn test_detect_satisfaction_empty() {
|
|
assert_eq!(detect_satisfaction(&[]), None);
|
|
}
|
|
|
|
#[test]
|
|
fn test_infer_outcome() {
|
|
let events = vec![make_event(0, TrajectoryStepType::LlmGeneration, "", "ok")];
|
|
assert_eq!(
|
|
infer_outcome(&events.iter().collect::<Vec<_>>(), Some(SatisfactionSignal::Positive)),
|
|
CompletionStatus::Success
|
|
);
|
|
assert_eq!(
|
|
infer_outcome(&events.iter().collect::<Vec<_>>(), Some(SatisfactionSignal::Negative)),
|
|
CompletionStatus::Failed
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_extract_tools_dedup() {
|
|
let events = vec![
|
|
make_event(0, TrajectoryStepType::ToolExecution, "researcher", ""),
|
|
make_event(1, TrajectoryStepType::ToolExecution, "researcher", ""),
|
|
make_event(2, TrajectoryStepType::ToolExecution, "collector", ""),
|
|
];
|
|
let refs: Vec<&TrajectoryEvent> = events.iter().collect();
|
|
let tools = extract_tools(&refs);
|
|
assert_eq!(tools, vec!["researcher", "collector"]);
|
|
}
|
|
}
|