Files
zclaw_openfang/desktop/src-tauri/src/intelligence/compactor.rs
iven 9a77fd4645
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(intelligence): 精确化 dead_code 标注并实现 LLM 上下文压缩
- 将 intelligence/llm/memory/browser 模块的 dead_code 注释从模糊的
  "reserved for future" 改为明确说明 Tauri invoke_handler 运行时注册机制
- 为 identity.rs 中 3 个真正未使用的方法添加 #[allow(dead_code)]
- 实现 compactor use_llm: true 功能:新增 compact_with_llm 方法和
  compactor_compact_llm Tauri 命令,支持 LLM 驱动的对话摘要生成
- 将 pipeline_commands.rs 中 40+ 处 println!/eprintln! 调试输出替换为
  tracing::debug!/warn!/error! 结构化日志
- 移除 intelligence/mod.rs 中不必要的 #[allow(unused_imports)]
2026-03-27 00:43:14 +08:00

623 lines
21 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Context Compactor - Manages infinite-length conversations without losing key info
//!
//! Flow:
//! 1. Monitor token count against soft threshold
//! 2. When threshold approached: flush memories from old messages
//! 3. Summarize old messages into a compact system message
//! 4. Replace old messages with summary — user sees no interruption
//!
//! Phase 2 of Intelligence Layer Migration.
//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.3.1
//!
//! NOTE: Some configuration methods are reserved for future dynamic adjustment.
// NOTE: #[tauri::command] functions are registered via invoke_handler! at runtime,
// which the Rust compiler does not track as "use". Module-level allow required
// for Tauri-commanded functions. Genuinely unused methods annotated individually.
#![allow(dead_code)]
use serde::{Deserialize, Serialize};
use regex::Regex;
// === Types ===
/// Compaction configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionConfig {
#[serde(default = "default_soft_threshold")]
pub soft_threshold_tokens: usize,
#[serde(default = "default_hard_threshold")]
pub hard_threshold_tokens: usize,
#[serde(default = "default_reserve")]
pub reserve_tokens: usize,
#[serde(default = "default_memory_flush")]
pub memory_flush_enabled: bool,
#[serde(default = "default_keep_recent")]
pub keep_recent_messages: usize,
#[serde(default = "default_summary_max")]
pub summary_max_tokens: usize,
#[serde(default)]
pub use_llm: bool,
#[serde(default = "default_llm_fallback")]
pub llm_fallback_to_rules: bool,
}
fn default_soft_threshold() -> usize { 15000 }
fn default_hard_threshold() -> usize { 20000 }
fn default_reserve() -> usize { 4000 }
fn default_memory_flush() -> bool { true }
fn default_keep_recent() -> usize { 6 }
fn default_summary_max() -> usize { 800 }
fn default_llm_fallback() -> bool { true }
impl Default for CompactionConfig {
fn default() -> Self {
Self {
soft_threshold_tokens: 15000,
hard_threshold_tokens: 20000,
reserve_tokens: 4000,
memory_flush_enabled: true,
keep_recent_messages: 6,
summary_max_tokens: 800,
use_llm: false,
llm_fallback_to_rules: true,
}
}
}
/// Message that can be compacted
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactableMessage {
pub role: String,
pub content: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
/// Result of compaction
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionResult {
pub compacted_messages: Vec<CompactableMessage>,
pub summary: String,
pub original_count: usize,
pub retained_count: usize,
pub flushed_memories: usize,
pub tokens_before_compaction: usize,
pub tokens_after_compaction: usize,
}
/// Check result before compaction
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionCheck {
pub should_compact: bool,
pub current_tokens: usize,
pub threshold: usize,
#[serde(rename = "urgency")]
pub urgency: CompactionUrgency,
}
/// Configuration for LLM-based summary generation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmSummaryConfig {
pub provider: String,
pub api_key: String,
pub endpoint: Option<String>,
pub model: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompactionUrgency {
None,
Soft,
Hard,
}
// === Token Estimation ===
/// Heuristic token count estimation.
/// CJK characters ≈ 1.5 tokens each, English words ≈ 1.3 tokens each.
/// This is intentionally conservative (overestimates) to avoid hitting real limits.
pub fn estimate_tokens(text: &str) -> usize {
if text.is_empty() {
return 0;
}
let mut tokens: f64 = 0.0;
for char in text.chars() {
let code = char as u32;
if code >= 0x4E00 && code <= 0x9FFF {
// CJK ideographs
tokens += 1.5;
} else if code >= 0x3400 && code <= 0x4DBF {
// CJK Extension A
tokens += 1.5;
} else if code >= 0x3000 && code <= 0x303F {
// CJK punctuation
tokens += 1.0;
} else if char == ' ' || char == '\n' || char == '\t' {
// whitespace
tokens += 0.25;
} else {
// ASCII chars (roughly 4 chars per token for English)
tokens += 0.3;
}
}
tokens.ceil() as usize
}
/// Estimate total tokens for a list of messages
pub fn estimate_messages_tokens(messages: &[CompactableMessage]) -> usize {
let mut total = 0;
for msg in messages {
total += estimate_tokens(&msg.content);
total += 4; // message framing overhead (role, separators)
}
total
}
// === Context Compactor ===
pub struct ContextCompactor {
config: CompactionConfig,
}
impl ContextCompactor {
pub fn new(config: Option<CompactionConfig>) -> Self {
Self {
config: config.unwrap_or_default(),
}
}
/// Check if compaction is needed based on current message token count
pub fn check_threshold(&self, messages: &[CompactableMessage]) -> CompactionCheck {
let current_tokens = estimate_messages_tokens(messages);
if current_tokens >= self.config.hard_threshold_tokens {
return CompactionCheck {
should_compact: true,
current_tokens,
threshold: self.config.hard_threshold_tokens,
urgency: CompactionUrgency::Hard,
};
}
if current_tokens >= self.config.soft_threshold_tokens {
return CompactionCheck {
should_compact: true,
current_tokens,
threshold: self.config.soft_threshold_tokens,
urgency: CompactionUrgency::Soft,
};
}
CompactionCheck {
should_compact: false,
current_tokens,
threshold: self.config.soft_threshold_tokens,
urgency: CompactionUrgency::None,
}
}
/// Execute compaction: summarize old messages, keep recent ones
pub fn compact(
&self,
messages: &[CompactableMessage],
_agent_id: &str,
_conversation_id: Option<&str>,
) -> CompactionResult {
let tokens_before_compaction = estimate_messages_tokens(messages);
let keep_count = self.config.keep_recent_messages.min(messages.len());
// Split: old messages to compact vs recent to keep
let split_index = messages.len().saturating_sub(keep_count);
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
// Generate summary of old messages
let summary = self.generate_summary(old_messages);
// Build compacted message list
let summary_message = CompactableMessage {
role: "system".to_string(),
content: summary.clone(),
id: Some(format!("compaction_{}", chrono::Utc::now().timestamp())),
timestamp: Some(chrono::Utc::now().to_rfc3339()),
};
let mut compacted_messages = vec![summary_message];
compacted_messages.extend(recent_messages.to_vec());
let tokens_after_compaction = estimate_messages_tokens(&compacted_messages);
CompactionResult {
compacted_messages,
summary,
original_count: messages.len(),
retained_count: split_index + 1, // summary + recent
flushed_memories: 0, // Would be populated by memory flush
tokens_before_compaction,
tokens_after_compaction,
}
}
/// Generate summary using LLM when configured
///
/// Falls back to rule-based summary if:
/// - `use_llm` is false
/// - LLM config is not provided
/// - LLM call fails and `llm_fallback_to_rules` is true
pub async fn compact_with_llm(
&self,
messages: &[CompactableMessage],
_agent_id: &str,
_conversation_id: Option<&str>,
llm_config: Option<&LlmSummaryConfig>,
) -> CompactionResult {
let tokens_before_compaction = estimate_messages_tokens(messages);
let keep_count = self.config.keep_recent_messages.min(messages.len());
let split_index = messages.len().saturating_sub(keep_count);
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
let summary = if self.config.use_llm {
match llm_config {
Some(config) => {
match self.generate_llm_summary(old_messages, config).await {
Ok(s) => s,
Err(e) => {
tracing::warn!(
"[Compactor] LLM summary failed, falling back to rules: {}",
e
);
if self.config.llm_fallback_to_rules {
self.generate_summary(old_messages)
} else {
format!("[摘要生成失败: {}]", e)
}
}
}
}
None => {
tracing::debug!("[Compactor] use_llm=true but no LLM config provided, using rules");
self.generate_summary(old_messages)
}
}
} else {
self.generate_summary(old_messages)
};
let summary_message = CompactableMessage {
role: "system".to_string(),
content: summary.clone(),
id: Some(format!("compaction_{}", chrono::Utc::now().timestamp())),
timestamp: Some(chrono::Utc::now().to_rfc3339()),
};
let mut compacted_messages = vec![summary_message];
compacted_messages.extend(recent_messages.to_vec());
let tokens_after_compaction = estimate_messages_tokens(&compacted_messages);
CompactionResult {
compacted_messages,
summary,
original_count: messages.len(),
retained_count: split_index + 1,
flushed_memories: 0,
tokens_before_compaction,
tokens_after_compaction,
}
}
/// Generate summary using LLM API
async fn generate_llm_summary(
&self,
messages: &[CompactableMessage],
config: &LlmSummaryConfig,
) -> Result<String, String> {
if messages.is_empty() {
return Ok("[对话开始]".to_string());
}
// Build conversation text for LLM
let mut conversation_text = String::new();
for msg in messages {
let role_label = match msg.role.as_str() {
"user" => "用户",
"assistant" => "助手",
"system" => "系统",
_ => &msg.role,
};
conversation_text.push_str(&format!("{}: {}\n", role_label, msg.content));
}
// Truncate if too long for LLM context
let max_chars = 12000;
if conversation_text.len() > max_chars {
conversation_text = format!("...(截断)...\n{}", &conversation_text[conversation_text.len() - max_chars..]);
}
let prompt = format!(
"请简洁地总结以下对话的关键内容,包括:\n\
1. 讨论的主要话题\n\
2. 达成的关键结论\n\
3. 重要的技术细节或决策\n\n\
对话内容:\n{}\n\n\
请用简洁的中文要点格式输出控制在200字以内。",
conversation_text
);
let llm_messages = vec![
crate::llm::LlmMessage {
role: "system".to_string(),
content: "你是一个对话摘要助手。请简洁地总结对话的关键信息。".to_string(),
},
crate::llm::LlmMessage {
role: "user".to_string(),
content: prompt,
},
];
let llm_config = crate::llm::LlmConfig {
provider: config.provider.clone(),
api_key: config.api_key.clone(),
endpoint: config.endpoint.clone(),
model: config.model.clone(),
};
let client = crate::llm::LlmClient::new(llm_config);
let response = client.complete(llm_messages).await?;
Ok(response.content)
}
/// Phase 2: Rule-based summary generation (fallback)
fn generate_summary(&self, messages: &[CompactableMessage]) -> String {
if messages.is_empty() {
return "[对话开始]".to_string();
}
let mut sections: Vec<String> = vec!["[以下是之前对话的摘要]".to_string()];
// Extract user questions/topics
let user_messages: Vec<_> = messages.iter().filter(|m| m.role == "user").collect();
let assistant_messages: Vec<_> = messages.iter().filter(|m| m.role == "assistant").collect();
// Summarize topics discussed
if !user_messages.is_empty() {
let topics: Vec<String> = user_messages
.iter()
.filter_map(|m| self.extract_topic(&m.content))
.collect();
if !topics.is_empty() {
sections.push(format!("讨论主题: {}", topics.join("; ")));
}
}
// Extract key decisions/conclusions from assistant
if !assistant_messages.is_empty() {
let conclusions: Vec<String> = assistant_messages
.iter()
.flat_map(|m| self.extract_conclusions(&m.content))
.take(5)
.collect();
if !conclusions.is_empty() {
let formatted: Vec<String> = conclusions.iter().map(|c| format!("- {}", c)).collect();
sections.push(format!("关键结论:\n{}", formatted.join("\n")));
}
}
// Extract technical context
let technical_context: Vec<String> = messages
.iter()
.filter(|m| m.content.contains("```") || m.content.contains("function ") || m.content.contains("class "))
.filter_map(|m| {
let re = Regex::new(r"```(\w+)?[\s\S]*?```").ok()?;
let cap = re.captures(&m.content)?;
let lang = cap.get(1).map(|m| m.as_str()).unwrap_or("code");
Some(format!("代码片段 ({})", lang))
})
.collect();
if !technical_context.is_empty() {
sections.push(format!("技术上下文: {}", technical_context.join(", ")));
}
// Message count summary
sections.push(format!(
"(已压缩 {} 条消息,其中用户 {} 条,助手 {} 条)",
messages.len(),
user_messages.len(),
assistant_messages.len()
));
let summary = sections.join("\n");
// Enforce token limit
let summary_tokens = estimate_tokens(&summary);
if summary_tokens > self.config.summary_max_tokens {
let max_chars = self.config.summary_max_tokens * 2;
return format!("{}...\n(摘要已截断)", &summary[..max_chars.min(summary.len())]);
}
summary
}
/// Extract the main topic from a user message
fn extract_topic(&self, content: &str) -> Option<String> {
let trimmed = content.trim();
// Find sentence end markers (byte position)
let sentence_end = trimmed.find(|c| c == '。' || c == '' || c == '' || c == '\n');
if let Some(byte_pos) = sentence_end {
if byte_pos <= 80 {
// Find the char boundary after the sentence end marker
// The marker itself is a single char (1-3 bytes for Chinese)
let end_boundary = byte_pos + trimmed[byte_pos..].chars().next().map(|c| c.len_utf8()).unwrap_or(1);
return Some(trimmed[..end_boundary].to_string());
}
}
if trimmed.chars().count() <= 50 {
return Some(trimmed.to_string());
}
// Use chars() to safely handle UTF-8 boundaries
Some(format!("{}...", trimmed.chars().take(50).collect::<String>()))
}
/// Extract key conclusions/decisions from assistant messages
fn extract_conclusions(&self, content: &str) -> Vec<String> {
let mut conclusions = Vec::new();
let patterns = vec![
Regex::new(r"(?:总结|结论|关键点|建议|方案)[:]\s*(.{10,100})").ok(),
Regex::new(r"(?:\d+\.\s+)(.{10,80})").ok(),
Regex::new(r"(?:需要|应该|可以|建议)(.{5,60})").ok(),
];
for pattern_opt in patterns {
if let Some(pattern) = pattern_opt {
for cap in pattern.captures_iter(content) {
if let Some(m) = cap.get(1) {
let text = m.as_str().trim();
if text.len() > 10 && text.len() < 100 {
conclusions.push(text.to_string());
}
}
}
}
}
conclusions.into_iter().take(3).collect()
}
/// Get current configuration
#[allow(dead_code)] // Reserved: no Tauri command yet
pub fn get_config(&self) -> &CompactionConfig {
&self.config
}
/// Update configuration
#[allow(dead_code)] // Reserved: no Tauri command yet
pub fn update_config(&mut self, updates: CompactionConfig) {
self.config = updates;
}
}
// === Tauri Commands ===
/// Estimate tokens for text
#[tauri::command]
pub fn compactor_estimate_tokens(text: String) -> usize {
estimate_tokens(&text)
}
/// Estimate tokens for messages
#[tauri::command]
pub fn compactor_estimate_messages_tokens(messages: Vec<CompactableMessage>) -> usize {
estimate_messages_tokens(&messages)
}
/// Check if compaction is needed
#[tauri::command]
pub fn compactor_check_threshold(
messages: Vec<CompactableMessage>,
config: Option<CompactionConfig>,
) -> CompactionCheck {
let compactor = ContextCompactor::new(config);
compactor.check_threshold(&messages)
}
/// Execute compaction
#[tauri::command]
pub fn compactor_compact(
messages: Vec<CompactableMessage>,
agent_id: String,
conversation_id: Option<String>,
config: Option<CompactionConfig>,
) -> CompactionResult {
let compactor = ContextCompactor::new(config);
compactor.compact(&messages, &agent_id, conversation_id.as_deref())
}
/// Execute compaction with optional LLM-based summary
#[tauri::command]
pub async fn compactor_compact_llm(
messages: Vec<CompactableMessage>,
agent_id: String,
conversation_id: Option<String>,
compaction_config: Option<CompactionConfig>,
llm_config: Option<LlmSummaryConfig>,
) -> CompactionResult {
let compactor = ContextCompactor::new(compaction_config);
compactor
.compact_with_llm(&messages, &agent_id, conversation_id.as_deref(), llm_config.as_ref())
.await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_estimate_tokens_english() {
let text = "Hello world";
let tokens = estimate_tokens(text);
assert!(tokens > 0);
assert!(tokens < 20); // Should be around 3-4 tokens
}
#[test]
fn test_estimate_tokens_chinese() {
let text = "你好世界";
let tokens = estimate_tokens(text);
assert_eq!(tokens, 6); // 4 chars * 1.5 = 6
}
#[test]
fn test_compaction_check() {
let compactor = ContextCompactor::new(None);
// Small message list - no compaction needed
let small_messages = vec![CompactableMessage {
role: "user".to_string(),
content: "Hello".to_string(),
id: None,
timestamp: None,
}];
let check = compactor.check_threshold(&small_messages);
assert!(!check.should_compact);
}
#[test]
fn test_generate_summary() {
let compactor = ContextCompactor::new(None);
let messages = vec![
CompactableMessage {
role: "user".to_string(),
content: "什么是 Rust".to_string(),
id: None,
timestamp: None,
},
CompactableMessage {
role: "assistant".to_string(),
content: "Rust 是一门系统编程语言,专注于安全性和性能。".to_string(),
id: None,
timestamp: None,
},
];
let summary = compactor.generate_summary(&messages);
assert!(summary.contains("讨论主题"));
}
}