Files
zclaw_openfang/desktop/src-tauri/src/intelligence/compactor.rs
iven 30b2515f07
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
feat(audit): 审计修复第四轮 — 跨会话搜索、LLM压缩集成、Presentation渲染器
- S9: MessageSearch 新增 Session/Global 双模式,Global 调用 VikingStorage memory_search
- M4b: LLM 压缩器集成到 kernel AgentLoop,支持 use_llm 配置切换
- M4c: 压缩时自动提取记忆到 VikingStorage (runtime + tauri 双路径)
- H6: 新增 ChartRenderer(recharts)、Document/Slideshow 完整渲染
- 累计修复 23 项,整体完成度 ~72%,真实可用率 ~80%
2026-03-27 11:44:14 +08:00

720 lines
24 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Context Compactor - Manages infinite-length conversations without losing key info
//!
//! Flow:
//! 1. Monitor token count against soft threshold
//! 2. When threshold approached: flush memories from old messages
//! 3. Summarize old messages into a compact system message
//! 4. Replace old messages with summary — user sees no interruption
//!
//! Phase 2 of Intelligence Layer Migration.
//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.3.1
//!
//! NOTE: Some configuration methods are reserved for future dynamic adjustment.
// NOTE: #[tauri::command] functions are registered via invoke_handler! at runtime,
// which the Rust compiler does not track as "use". Module-level allow required
// for Tauri-commanded functions. Genuinely unused methods annotated individually.
#![allow(dead_code)]
use serde::{Deserialize, Serialize};
use regex::Regex;
// === Types ===
/// Compaction configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionConfig {
#[serde(default = "default_soft_threshold")]
pub soft_threshold_tokens: usize,
#[serde(default = "default_hard_threshold")]
pub hard_threshold_tokens: usize,
#[serde(default = "default_reserve")]
pub reserve_tokens: usize,
#[serde(default = "default_memory_flush")]
pub memory_flush_enabled: bool,
#[serde(default = "default_keep_recent")]
pub keep_recent_messages: usize,
#[serde(default = "default_summary_max")]
pub summary_max_tokens: usize,
#[serde(default)]
pub use_llm: bool,
#[serde(default = "default_llm_fallback")]
pub llm_fallback_to_rules: bool,
}
fn default_soft_threshold() -> usize { 15000 }
fn default_hard_threshold() -> usize { 20000 }
fn default_reserve() -> usize { 4000 }
fn default_memory_flush() -> bool { true }
fn default_keep_recent() -> usize { 6 }
fn default_summary_max() -> usize { 800 }
fn default_llm_fallback() -> bool { true }
impl Default for CompactionConfig {
fn default() -> Self {
Self {
soft_threshold_tokens: 15000,
hard_threshold_tokens: 20000,
reserve_tokens: 4000,
memory_flush_enabled: true,
keep_recent_messages: 6,
summary_max_tokens: 800,
use_llm: false,
llm_fallback_to_rules: true,
}
}
}
/// Message that can be compacted
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactableMessage {
pub role: String,
pub content: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
/// Result of compaction
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionResult {
pub compacted_messages: Vec<CompactableMessage>,
pub summary: String,
pub original_count: usize,
pub retained_count: usize,
pub flushed_memories: usize,
pub tokens_before_compaction: usize,
pub tokens_after_compaction: usize,
}
/// Check result before compaction
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionCheck {
pub should_compact: bool,
pub current_tokens: usize,
pub threshold: usize,
#[serde(rename = "urgency")]
pub urgency: CompactionUrgency,
}
/// Configuration for LLM-based summary generation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmSummaryConfig {
pub provider: String,
pub api_key: String,
pub endpoint: Option<String>,
pub model: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompactionUrgency {
None,
Soft,
Hard,
}
// === Token Estimation ===
/// Heuristic token count estimation.
/// CJK characters ≈ 1.5 tokens each, English words ≈ 1.3 tokens each.
/// This is intentionally conservative (overestimates) to avoid hitting real limits.
pub fn estimate_tokens(text: &str) -> usize {
if text.is_empty() {
return 0;
}
let mut tokens: f64 = 0.0;
for char in text.chars() {
let code = char as u32;
if code >= 0x4E00 && code <= 0x9FFF {
// CJK ideographs
tokens += 1.5;
} else if code >= 0x3400 && code <= 0x4DBF {
// CJK Extension A
tokens += 1.5;
} else if code >= 0x3000 && code <= 0x303F {
// CJK punctuation
tokens += 1.0;
} else if char == ' ' || char == '\n' || char == '\t' {
// whitespace
tokens += 0.25;
} else {
// ASCII chars (roughly 4 chars per token for English)
tokens += 0.3;
}
}
tokens.ceil() as usize
}
/// Estimate total tokens for a list of messages
pub fn estimate_messages_tokens(messages: &[CompactableMessage]) -> usize {
let mut total = 0;
for msg in messages {
total += estimate_tokens(&msg.content);
total += 4; // message framing overhead (role, separators)
}
total
}
// === Context Compactor ===
pub struct ContextCompactor {
config: CompactionConfig,
}
impl ContextCompactor {
pub fn new(config: Option<CompactionConfig>) -> Self {
Self {
config: config.unwrap_or_default(),
}
}
/// Check if compaction is needed based on current message token count
pub fn check_threshold(&self, messages: &[CompactableMessage]) -> CompactionCheck {
let current_tokens = estimate_messages_tokens(messages);
if current_tokens >= self.config.hard_threshold_tokens {
return CompactionCheck {
should_compact: true,
current_tokens,
threshold: self.config.hard_threshold_tokens,
urgency: CompactionUrgency::Hard,
};
}
if current_tokens >= self.config.soft_threshold_tokens {
return CompactionCheck {
should_compact: true,
current_tokens,
threshold: self.config.soft_threshold_tokens,
urgency: CompactionUrgency::Soft,
};
}
CompactionCheck {
should_compact: false,
current_tokens,
threshold: self.config.soft_threshold_tokens,
urgency: CompactionUrgency::None,
}
}
/// Execute compaction: summarize old messages, keep recent ones
pub fn compact(
&self,
messages: &[CompactableMessage],
_agent_id: &str,
_conversation_id: Option<&str>,
) -> CompactionResult {
let tokens_before_compaction = estimate_messages_tokens(messages);
let keep_count = self.config.keep_recent_messages.min(messages.len());
// Split: old messages to compact vs recent to keep
let split_index = messages.len().saturating_sub(keep_count);
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
// Generate summary of old messages
let summary = self.generate_summary(old_messages);
// Build compacted message list
let summary_message = CompactableMessage {
role: "system".to_string(),
content: summary.clone(),
id: Some(format!("compaction_{}", chrono::Utc::now().timestamp())),
timestamp: Some(chrono::Utc::now().to_rfc3339()),
};
let mut compacted_messages = vec![summary_message];
compacted_messages.extend(recent_messages.to_vec());
let tokens_after_compaction = estimate_messages_tokens(&compacted_messages);
CompactionResult {
compacted_messages,
summary,
original_count: messages.len(),
retained_count: split_index + 1, // summary + recent
flushed_memories: 0, // Would be populated by memory flush
tokens_before_compaction,
tokens_after_compaction,
}
}
/// Generate summary using LLM when configured
///
/// Falls back to rule-based summary if:
/// - `use_llm` is false
/// - LLM config is not provided
/// - LLM call fails and `llm_fallback_to_rules` is true
pub async fn compact_with_llm(
&self,
messages: &[CompactableMessage],
_agent_id: &str,
_conversation_id: Option<&str>,
llm_config: Option<&LlmSummaryConfig>,
) -> CompactionResult {
let tokens_before_compaction = estimate_messages_tokens(messages);
let keep_count = self.config.keep_recent_messages.min(messages.len());
let split_index = messages.len().saturating_sub(keep_count);
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
let summary = if self.config.use_llm {
match llm_config {
Some(config) => {
match self.generate_llm_summary(old_messages, config).await {
Ok(s) => s,
Err(e) => {
tracing::warn!(
"[Compactor] LLM summary failed, falling back to rules: {}",
e
);
if self.config.llm_fallback_to_rules {
self.generate_summary(old_messages)
} else {
format!("[摘要生成失败: {}]", e)
}
}
}
}
None => {
tracing::debug!("[Compactor] use_llm=true but no LLM config provided, using rules");
self.generate_summary(old_messages)
}
}
} else {
self.generate_summary(old_messages)
};
let summary_message = CompactableMessage {
role: "system".to_string(),
content: summary.clone(),
id: Some(format!("compaction_{}", chrono::Utc::now().timestamp())),
timestamp: Some(chrono::Utc::now().to_rfc3339()),
};
let mut compacted_messages = vec![summary_message];
compacted_messages.extend(recent_messages.to_vec());
let tokens_after_compaction = estimate_messages_tokens(&compacted_messages);
CompactionResult {
compacted_messages,
summary,
original_count: messages.len(),
retained_count: split_index + 1,
flushed_memories: 0,
tokens_before_compaction,
tokens_after_compaction,
}
}
/// Generate summary using LLM API
async fn generate_llm_summary(
&self,
messages: &[CompactableMessage],
config: &LlmSummaryConfig,
) -> Result<String, String> {
if messages.is_empty() {
return Ok("[对话开始]".to_string());
}
// Build conversation text for LLM
let mut conversation_text = String::new();
for msg in messages {
let role_label = match msg.role.as_str() {
"user" => "用户",
"assistant" => "助手",
"system" => "系统",
_ => &msg.role,
};
conversation_text.push_str(&format!("{}: {}\n", role_label, msg.content));
}
// Truncate if too long for LLM context
let max_chars = 12000;
if conversation_text.len() > max_chars {
conversation_text = format!("...(截断)...\n{}", &conversation_text[conversation_text.len() - max_chars..]);
}
let prompt = format!(
"请简洁地总结以下对话的关键内容,包括:\n\
1. 讨论的主要话题\n\
2. 达成的关键结论\n\
3. 重要的技术细节或决策\n\n\
对话内容:\n{}\n\n\
请用简洁的中文要点格式输出控制在200字以内。",
conversation_text
);
let llm_messages = vec![
crate::llm::LlmMessage {
role: "system".to_string(),
content: "你是一个对话摘要助手。请简洁地总结对话的关键信息。".to_string(),
},
crate::llm::LlmMessage {
role: "user".to_string(),
content: prompt,
},
];
let llm_config = crate::llm::LlmConfig {
provider: config.provider.clone(),
api_key: config.api_key.clone(),
endpoint: config.endpoint.clone(),
model: config.model.clone(),
};
let client = crate::llm::LlmClient::new(llm_config);
let response = client.complete(llm_messages).await?;
Ok(response.content)
}
/// Phase 2: Rule-based summary generation (fallback)
fn generate_summary(&self, messages: &[CompactableMessage]) -> String {
if messages.is_empty() {
return "[对话开始]".to_string();
}
let mut sections: Vec<String> = vec!["[以下是之前对话的摘要]".to_string()];
// Extract user questions/topics
let user_messages: Vec<_> = messages.iter().filter(|m| m.role == "user").collect();
let assistant_messages: Vec<_> = messages.iter().filter(|m| m.role == "assistant").collect();
// Summarize topics discussed
if !user_messages.is_empty() {
let topics: Vec<String> = user_messages
.iter()
.filter_map(|m| self.extract_topic(&m.content))
.collect();
if !topics.is_empty() {
sections.push(format!("讨论主题: {}", topics.join("; ")));
}
}
// Extract key decisions/conclusions from assistant
if !assistant_messages.is_empty() {
let conclusions: Vec<String> = assistant_messages
.iter()
.flat_map(|m| self.extract_conclusions(&m.content))
.take(5)
.collect();
if !conclusions.is_empty() {
let formatted: Vec<String> = conclusions.iter().map(|c| format!("- {}", c)).collect();
sections.push(format!("关键结论:\n{}", formatted.join("\n")));
}
}
// Extract technical context
let technical_context: Vec<String> = messages
.iter()
.filter(|m| m.content.contains("```") || m.content.contains("function ") || m.content.contains("class "))
.filter_map(|m| {
let re = Regex::new(r"```(\w+)?[\s\S]*?```").ok()?;
let cap = re.captures(&m.content)?;
let lang = cap.get(1).map(|m| m.as_str()).unwrap_or("code");
Some(format!("代码片段 ({})", lang))
})
.collect();
if !technical_context.is_empty() {
sections.push(format!("技术上下文: {}", technical_context.join(", ")));
}
// Message count summary
sections.push(format!(
"(已压缩 {} 条消息,其中用户 {} 条,助手 {} 条)",
messages.len(),
user_messages.len(),
assistant_messages.len()
));
let summary = sections.join("\n");
// Enforce token limit
let summary_tokens = estimate_tokens(&summary);
if summary_tokens > self.config.summary_max_tokens {
let max_chars = self.config.summary_max_tokens * 2;
return format!("{}...\n(摘要已截断)", &summary[..max_chars.min(summary.len())]);
}
summary
}
/// Extract the main topic from a user message
fn extract_topic(&self, content: &str) -> Option<String> {
let trimmed = content.trim();
// Find sentence end markers (byte position)
let sentence_end = trimmed.find(|c| c == '。' || c == '' || c == '' || c == '\n');
if let Some(byte_pos) = sentence_end {
if byte_pos <= 80 {
// Find the char boundary after the sentence end marker
// The marker itself is a single char (1-3 bytes for Chinese)
let end_boundary = byte_pos + trimmed[byte_pos..].chars().next().map(|c| c.len_utf8()).unwrap_or(1);
return Some(trimmed[..end_boundary].to_string());
}
}
if trimmed.chars().count() <= 50 {
return Some(trimmed.to_string());
}
// Use chars() to safely handle UTF-8 boundaries
Some(format!("{}...", trimmed.chars().take(50).collect::<String>()))
}
/// Extract key conclusions/decisions from assistant messages
fn extract_conclusions(&self, content: &str) -> Vec<String> {
let mut conclusions = Vec::new();
let patterns = vec![
Regex::new(r"(?:总结|结论|关键点|建议|方案)[:]\s*(.{10,100})").ok(),
Regex::new(r"(?:\d+\.\s+)(.{10,80})").ok(),
Regex::new(r"(?:需要|应该|可以|建议)(.{5,60})").ok(),
];
for pattern_opt in patterns {
if let Some(pattern) = pattern_opt {
for cap in pattern.captures_iter(content) {
if let Some(m) = cap.get(1) {
let text = m.as_str().trim();
if text.len() > 10 && text.len() < 100 {
conclusions.push(text.to_string());
}
}
}
}
}
conclusions.into_iter().take(3).collect()
}
/// Get current configuration
#[allow(dead_code)] // Reserved: no Tauri command yet
pub fn get_config(&self) -> &CompactionConfig {
&self.config
}
/// Update configuration
#[allow(dead_code)] // Reserved: no Tauri command yet
pub fn update_config(&mut self, updates: CompactionConfig) {
self.config = updates;
}
}
// === Tauri Commands ===
/// Estimate tokens for text
#[tauri::command]
pub fn compactor_estimate_tokens(text: String) -> usize {
estimate_tokens(&text)
}
/// Estimate tokens for messages
#[tauri::command]
pub fn compactor_estimate_messages_tokens(messages: Vec<CompactableMessage>) -> usize {
estimate_messages_tokens(&messages)
}
/// Check if compaction is needed
#[tauri::command]
pub fn compactor_check_threshold(
messages: Vec<CompactableMessage>,
config: Option<CompactionConfig>,
) -> CompactionCheck {
let compactor = ContextCompactor::new(config);
compactor.check_threshold(&messages)
}
/// Execute compaction
#[tauri::command]
pub async fn compactor_compact(
messages: Vec<CompactableMessage>,
agent_id: String,
conversation_id: Option<String>,
config: Option<CompactionConfig>,
) -> CompactionResult {
let memory_flush = config
.as_ref()
.map(|c| c.memory_flush_enabled)
.unwrap_or(false);
let flushed = if memory_flush {
flush_old_messages_to_memory(&messages, &agent_id, conversation_id.as_deref()).await
} else {
0
};
let compactor = ContextCompactor::new(config);
let mut result = compactor.compact(&messages, &agent_id, conversation_id.as_deref());
result.flushed_memories = flushed;
result
}
/// Execute compaction with optional LLM-based summary
#[tauri::command]
pub async fn compactor_compact_llm(
messages: Vec<CompactableMessage>,
agent_id: String,
conversation_id: Option<String>,
compaction_config: Option<CompactionConfig>,
llm_config: Option<LlmSummaryConfig>,
) -> CompactionResult {
let memory_flush = compaction_config
.as_ref()
.map(|c| c.memory_flush_enabled)
.unwrap_or(false);
let flushed = if memory_flush {
flush_old_messages_to_memory(&messages, &agent_id, conversation_id.as_deref()).await
} else {
0
};
let compactor = ContextCompactor::new(compaction_config);
let mut result = compactor
.compact_with_llm(&messages, &agent_id, conversation_id.as_deref(), llm_config.as_ref())
.await;
result.flushed_memories = flushed;
result
}
/// Flush important messages from the old (pre-compaction) portion to VikingStorage.
///
/// Extracts user messages and key assistant responses as session memories
/// so that information is preserved even after messages are compacted away.
async fn flush_old_messages_to_memory(
messages: &[CompactableMessage],
agent_id: &str,
_conversation_id: Option<&str>,
) -> usize {
let storage = match crate::viking_commands::get_storage().await {
Ok(s) => s,
Err(e) => {
tracing::warn!("[Compactor] Cannot get storage for memory flush: {}", e);
return 0;
}
};
let mut flushed = 0usize;
let mut prev_was_user = false;
for msg in messages {
// Flush user messages as session memories (they contain user intent/preferences)
if msg.role == "user" && msg.content.len() > 10 {
let entry = zclaw_growth::MemoryEntry::new(
agent_id,
zclaw_growth::MemoryType::Session,
"compaction_flush",
msg.content.clone(),
)
.with_importance(4);
match zclaw_growth::VikingStorage::store(storage.as_ref(), &entry).await {
Ok(_) => flushed += 1,
Err(e) => {
tracing::debug!("[Compactor] Memory flush failed for user msg: {}", e);
}
}
prev_was_user = true;
} else if msg.role == "assistant" && prev_was_user {
// Flush the assistant response that follows a user message (contains answers)
if msg.content.len() > 20 {
let entry = zclaw_growth::MemoryEntry::new(
agent_id,
zclaw_growth::MemoryType::Session,
"compaction_flush",
msg.content.clone(),
)
.with_importance(3);
match zclaw_growth::VikingStorage::store(storage.as_ref(), &entry).await {
Ok(_) => flushed += 1,
Err(e) => {
tracing::debug!("[Compactor] Memory flush failed for assistant msg: {}", e);
}
}
}
prev_was_user = false;
} else {
prev_was_user = false;
}
}
if flushed > 0 {
tracing::info!(
"[Compactor] Flushed {} memories before compaction for agent {}",
flushed,
agent_id
);
}
flushed
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_estimate_tokens_english() {
let text = "Hello world";
let tokens = estimate_tokens(text);
assert!(tokens > 0);
assert!(tokens < 20); // Should be around 3-4 tokens
}
#[test]
fn test_estimate_tokens_chinese() {
let text = "你好世界";
let tokens = estimate_tokens(text);
assert_eq!(tokens, 6); // 4 chars * 1.5 = 6
}
#[test]
fn test_compaction_check() {
let compactor = ContextCompactor::new(None);
// Small message list - no compaction needed
let small_messages = vec![CompactableMessage {
role: "user".to_string(),
content: "Hello".to_string(),
id: None,
timestamp: None,
}];
let check = compactor.check_threshold(&small_messages);
assert!(!check.should_compact);
}
#[test]
fn test_generate_summary() {
let compactor = ContextCompactor::new(None);
let messages = vec![
CompactableMessage {
role: "user".to_string(),
content: "什么是 Rust".to_string(),
id: None,
timestamp: None,
},
CompactableMessage {
role: "assistant".to_string(),
content: "Rust 是一门系统编程语言,专注于安全性和性能。".to_string(),
id: None,
timestamp: None,
},
];
let summary = compactor.generate_summary(&messages);
assert!(summary.contains("讨论主题"));
}
}