Files
zclaw_openfang/crates/zclaw-runtime/src/compaction.rs
iven 52bdafa633 refactor(crates): kernel/generation module split + DeerFlow optimizations + middleware + dead code cleanup
- Split zclaw-kernel/kernel.rs (1486 lines) into 9 domain modules
- Split zclaw-kernel/generation.rs (1080 lines) into 3 modules
- Add DeerFlow-inspired middleware: DanglingTool, SubagentLimit, ToolError, ToolOutputGuard
- Add PromptBuilder for structured system prompt assembly
- Add FactStore (zclaw-memory) for persistent fact extraction
- Add task builtin tool for agent task management
- Driver improvements: Anthropic/OpenAI extended thinking, Gemini safety settings
- Replace let _ = with proper log::warn! across SaaS handlers
- Remove unused dependency (url) from zclaw-hands
2026-04-03 00:28:03 +08:00

704 lines
23 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Context compaction for the agent loop.
//!
//! Provides rule-based token estimation and message compaction to prevent
//! conversations from exceeding LLM context windows. When the estimated
//! token count exceeds the configured threshold, older messages are
//! summarized into a single system message and only recent messages are
//! retained.
//!
//! Supports two compaction modes:
//! - **Rule-based**: Heuristic topic extraction (default, no LLM needed)
//! - **LLM-based**: Uses an LLM driver to generate higher-quality summaries
//!
//! Optionally flushes old messages to the growth/memory system before discarding.
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use zclaw_types::{AgentId, Message, SessionId};
use crate::driver::{CompletionRequest, ContentBlock, LlmDriver};
use crate::growth::GrowthIntegration;
/// Number of recent messages to preserve after compaction.
const DEFAULT_KEEP_RECENT: usize = 6;
/// Heuristic token count estimation.
///
/// CJK characters ≈ 1.5 tokens each, English words ≈ 1.3 tokens each.
/// Intentionally conservative (overestimates) to avoid hitting real limits.
pub fn estimate_tokens(text: &str) -> usize {
if text.is_empty() {
return 0;
}
let mut tokens: f64 = 0.0;
for char in text.chars() {
let code = char as u32;
if (0x4E00..=0x9FFF).contains(&code)
|| (0x3400..=0x4DBF).contains(&code)
|| (0x20000..=0x2A6DF).contains(&code)
|| (0xF900..=0xFAFF).contains(&code)
{
// CJK ideographs — ~1.5 tokens
tokens += 1.5;
} else if (0xAC00..=0xD7AF).contains(&code) || (0x1100..=0x11FF).contains(&code) {
// Korean Hangul syllables + Jamo — ~1.5 tokens
tokens += 1.5;
} else if (0x3040..=0x309F).contains(&code) || (0x30A0..=0x30FF).contains(&code) {
// Japanese Hiragana + Katakana — ~1.5 tokens
tokens += 1.5;
} else if (0x3000..=0x303F).contains(&code) || (0xFF00..=0xFFEF).contains(&code) {
// CJK / fullwidth punctuation — ~1.0 token
tokens += 1.0;
} else if (0x1F000..=0x1FAFF).contains(&code) || (0x2600..=0x27BF).contains(&code) {
// Emoji & Symbols — ~2.0 tokens
tokens += 2.0;
} else if char == ' ' || char == '\n' || char == '\t' {
// whitespace
tokens += 0.25;
} else {
// ASCII / Latin characters — roughly 4 chars per token
tokens += 0.3;
}
}
tokens.ceil() as usize
}
/// Estimate total tokens for a list of messages (including framing overhead).
pub fn estimate_messages_tokens(messages: &[Message]) -> usize {
let mut total = 0;
for msg in messages {
match msg {
Message::User { content } => {
total += estimate_tokens(content);
total += 4;
}
Message::Assistant { content, thinking } => {
total += estimate_tokens(content);
if let Some(th) = thinking {
total += estimate_tokens(th);
}
total += 4;
}
Message::System { content } => {
total += estimate_tokens(content);
total += 4;
}
Message::ToolUse { input, .. } => {
total += estimate_tokens(&input.to_string());
total += 4;
}
Message::ToolResult { output, .. } => {
total += estimate_tokens(&output.to_string());
total += 4;
}
}
}
total
}
// ============================================================
// Calibration: adjust heuristic estimates using API feedback
// ============================================================
const F64_1_0_BITS: u64 = 4607182418800017408u64; // 1.0f64.to_bits()
/// Global calibration factor for token estimation (stored as f64 bits).
///
/// Updated via exponential moving average when API returns actual token counts.
/// Initial value is 1.0 (no adjustment).
static CALIBRATION_FACTOR_BITS: AtomicU64 = AtomicU64::new(F64_1_0_BITS);
/// Get the current calibration factor.
pub fn get_calibration_factor() -> f64 {
f64::from_bits(CALIBRATION_FACTOR_BITS.load(Ordering::Relaxed))
}
/// Update calibration factor using exponential moving average.
///
/// Compares estimated tokens with actual tokens from API response:
/// - `ratio = actual / estimated` so underestimates push factor UP
/// - EMA: `new = current * 0.7 + ratio * 0.3`
/// - Clamped to [0.5, 2.0] to prevent runaway values
pub fn update_calibration(estimated: usize, actual: u32) {
if actual == 0 || estimated == 0 {
return;
}
let ratio = actual as f64 / estimated as f64;
let current = get_calibration_factor();
let new_factor = (current * 0.7 + ratio * 0.3).clamp(0.5, 2.0);
CALIBRATION_FACTOR_BITS.store(new_factor.to_bits(), Ordering::Relaxed);
tracing::debug!(
"[Compaction] Calibration: estimated={}, actual={}, ratio={:.2}, factor {:.2} → {:.2}",
estimated, actual, ratio, current, new_factor
);
}
/// Estimate total tokens for messages with calibration applied.
fn estimate_messages_tokens_calibrated(messages: &[Message]) -> usize {
let raw = estimate_messages_tokens(messages);
let factor = get_calibration_factor();
if (factor - 1.0).abs() < f64::EPSILON {
raw
} else {
((raw as f64 * factor).ceil()) as usize
}
}
/// Compact a message list by summarizing old messages and keeping recent ones.
///
/// When `messages.len() > keep_recent`, the oldest messages are summarized
/// into a single system message. System messages at the beginning of the
/// conversation are always preserved.
///
/// Returns the compacted message list and the number of original messages removed.
pub fn compact_messages(messages: Vec<Message>, keep_recent: usize) -> (Vec<Message>, usize) {
if messages.len() <= keep_recent {
return (messages, 0);
}
// Preserve leading system messages (they contain compaction summaries from prior runs)
let leading_system_count = messages
.iter()
.take_while(|m| matches!(m, Message::System { .. }))
.count();
// Calculate split point: keep leading system + recent messages
let keep_from_end = keep_recent.min(messages.len().saturating_sub(leading_system_count));
let split_index = messages.len().saturating_sub(keep_from_end);
// Ensure we keep at least the leading system messages
let split_index = split_index.max(leading_system_count);
if split_index == 0 {
return (messages, 0);
}
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
let summary = generate_summary(old_messages);
let removed_count = old_messages.len();
let mut compacted = Vec::with_capacity(1 + recent_messages.len());
compacted.push(Message::system(summary));
compacted.extend(recent_messages.iter().cloned());
(compacted, removed_count)
}
/// Check if compaction should be triggered and perform it if needed.
///
/// Returns the (possibly compacted) message list.
pub fn maybe_compact(messages: Vec<Message>, threshold: usize) -> Vec<Message> {
let tokens = estimate_messages_tokens_calibrated(&messages);
if tokens < threshold {
return messages;
}
tracing::info!(
"[Compaction] Triggered: {} tokens > {} threshold, {} messages",
tokens,
threshold,
messages.len(),
);
let (compacted, removed) = compact_messages(messages, DEFAULT_KEEP_RECENT);
tracing::info!(
"[Compaction] Removed {} messages, {} remain",
removed,
compacted.len(),
);
compacted
}
/// Configuration for compaction behavior.
#[derive(Debug, Clone)]
pub struct CompactionConfig {
/// Use LLM for generating summaries instead of rule-based extraction.
pub use_llm: bool,
/// Fall back to rule-based summary if LLM fails.
pub llm_fallback_to_rules: bool,
/// Flush memories from old messages before discarding them.
pub memory_flush_enabled: bool,
/// Maximum tokens for LLM-generated summary.
pub summary_max_tokens: u32,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
use_llm: false,
llm_fallback_to_rules: true,
memory_flush_enabled: false,
summary_max_tokens: 500,
}
}
}
/// Outcome of an async compaction operation.
#[derive(Debug, Clone)]
pub struct CompactionOutcome {
/// The (possibly compacted) message list.
pub messages: Vec<Message>,
/// Number of messages removed during compaction.
pub removed_count: usize,
/// Number of memories flushed to the growth system.
pub flushed_memories: usize,
/// Whether LLM was used for summary generation.
pub used_llm: bool,
}
/// Async compaction with optional LLM summary and memory flushing.
///
/// When `messages` exceed `threshold` tokens:
/// 1. If `memory_flush_enabled`, extract memories from old messages via growth system
/// 2. Generate summary (LLM or rule-based depending on config)
/// 3. Replace old messages with summary + keep recent messages
pub async fn maybe_compact_with_config(
messages: Vec<Message>,
threshold: usize,
config: &CompactionConfig,
agent_id: &AgentId,
session_id: &SessionId,
driver: Option<&Arc<dyn LlmDriver>>,
growth: Option<&GrowthIntegration>,
) -> CompactionOutcome {
let tokens = estimate_messages_tokens_calibrated(&messages);
if tokens < threshold {
return CompactionOutcome {
messages,
removed_count: 0,
flushed_memories: 0,
used_llm: false,
};
}
tracing::info!(
"[Compaction] Triggered: {} tokens > {} threshold, {} messages",
tokens,
threshold,
messages.len(),
);
// Step 1: Flush memories from messages that are about to be compacted
let flushed_memories = if config.memory_flush_enabled {
if let Some(growth) = growth {
match growth
.process_conversation(agent_id, &messages, session_id.clone())
.await
{
Ok(count) => {
tracing::info!(
"[Compaction] Flushed {} memories before compaction",
count
);
count
}
Err(e) => {
tracing::warn!("[Compaction] Memory flush failed: {}", e);
0
}
}
} else {
tracing::debug!("[Compaction] Memory flush requested but no growth integration available");
0
}
} else {
0
};
// Step 2: Determine split point (same logic as compact_messages)
let leading_system_count = messages
.iter()
.take_while(|m| matches!(m, Message::System { .. }))
.count();
let keep_from_end = DEFAULT_KEEP_RECENT
.min(messages.len().saturating_sub(leading_system_count));
let split_index = messages.len().saturating_sub(keep_from_end);
let split_index = split_index.max(leading_system_count);
if split_index == 0 {
return CompactionOutcome {
messages,
removed_count: 0,
flushed_memories,
used_llm: false,
};
}
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
let removed_count = old_messages.len();
// Step 3: Generate summary (LLM or rule-based)
let summary = if config.use_llm {
if let Some(driver) = driver {
match generate_llm_summary(driver, old_messages, config.summary_max_tokens).await {
Ok(llm_summary) => {
tracing::info!(
"[Compaction] Generated LLM summary ({} chars)",
llm_summary.len()
);
llm_summary
}
Err(e) => {
if config.llm_fallback_to_rules {
tracing::warn!(
"[Compaction] LLM summary failed: {}, falling back to rules",
e
);
generate_summary(old_messages)
} else {
tracing::warn!(
"[Compaction] LLM summary failed: {}, returning original messages",
e
);
return CompactionOutcome {
messages,
removed_count: 0,
flushed_memories,
used_llm: false,
};
}
}
}
} else {
tracing::warn!(
"[Compaction] LLM compaction requested but no driver available, using rules"
);
generate_summary(old_messages)
}
} else {
generate_summary(old_messages)
};
let used_llm = config.use_llm && driver.is_some();
// Step 4: Build compacted message list
let mut compacted = Vec::with_capacity(1 + recent_messages.len());
compacted.push(Message::system(summary));
compacted.extend(recent_messages.iter().cloned());
tracing::info!(
"[Compaction] Removed {} messages, {} remain (llm={})",
removed_count,
compacted.len(),
used_llm,
);
CompactionOutcome {
messages: compacted,
removed_count,
flushed_memories,
used_llm,
}
}
/// Generate a summary using an LLM driver.
async fn generate_llm_summary(
driver: &Arc<dyn LlmDriver>,
messages: &[Message],
max_tokens: u32,
) -> Result<String, String> {
let mut conversation_text = String::new();
for msg in messages {
match msg {
Message::User { content } => {
conversation_text.push_str(&format!("用户: {}\n", content))
}
Message::Assistant { content, .. } => {
conversation_text.push_str(&format!("助手: {}\n", content))
}
Message::System { content } => {
if !content.starts_with("[以下是之前对话的摘要]") {
conversation_text.push_str(&format!("[系统]: {}\n", content))
}
}
Message::ToolUse { tool, input, .. } => {
conversation_text.push_str(&format!(
"[工具调用 {}]: {}\n",
tool.as_str(),
input
))
}
Message::ToolResult { output, .. } => {
conversation_text.push_str(&format!("[工具结果]: {}\n", output))
}
}
}
// Truncate conversation text if too long for the prompt itself
let max_conversation_chars = 8000;
if conversation_text.len() > max_conversation_chars {
conversation_text.truncate(max_conversation_chars);
conversation_text.push_str("\n...(对话已截断)");
}
let prompt = format!(
"请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\
输出格式为段落式摘要不超过200字。\n\n{}",
conversation_text
);
let request = CompletionRequest {
model: String::new(),
system: Some(
"你是一个对话摘要助手。只输出摘要内容,不要添加额外解释。".to_string(),
),
messages: vec![Message::user(&prompt)],
tools: Vec::new(),
max_tokens: Some(max_tokens),
temperature: Some(0.3),
stop: Vec::new(),
stream: false,
thinking_enabled: false,
reasoning_effort: None,
plan_mode: false,
};
let response = driver
.complete(request)
.await
.map_err(|e| format!("{}", e))?;
// Extract text from content blocks
let text_parts: Vec<String> = response
.content
.iter()
.filter_map(|block| match block {
ContentBlock::Text { text } => Some(text.clone()),
_ => None,
})
.collect();
let summary = text_parts.join("");
if summary.is_empty() {
return Err("LLM returned empty response".to_string());
}
Ok(summary)
}
/// Generate a rule-based summary of old messages.
fn generate_summary(messages: &[Message]) -> String {
if messages.is_empty() {
return "[对话开始]".to_string();
}
let mut sections: Vec<String> = vec!["[以下是之前对话的摘要]".to_string()];
let mut user_count = 0;
let mut assistant_count = 0;
let mut topics: Vec<String> = Vec::new();
for msg in messages {
match msg {
Message::User { content } => {
user_count += 1;
let topic = extract_topic(content);
if let Some(t) = topic {
topics.push(t);
}
}
Message::Assistant { .. } => {
assistant_count += 1;
}
Message::System { content } => {
// Skip system messages that are previous compaction summaries
if !content.starts_with("[以下是之前对话的摘要]") {
sections.push(format!("系统提示: {}", truncate(content, 60)));
}
}
Message::ToolUse { tool, .. } => {
sections.push(format!("工具调用: {}", tool.as_str()));
}
Message::ToolResult { .. } => {
// Skip tool results in summary
}
}
}
if !topics.is_empty() {
let topic_list: Vec<String> = topics.iter().take(8).cloned().collect();
sections.push(format!("讨论主题: {}", topic_list.join("; ")));
}
sections.push(format!(
"(已压缩 {} 条消息,其中用户 {} 条,助手 {} 条)",
messages.len(),
user_count,
assistant_count,
));
let summary = sections.join("\n");
// Enforce max length (char-safe for CJK)
let max_chars = 800;
if summary.chars().count() > max_chars {
let truncated: String = summary.chars().take(max_chars).collect();
format!("{}...\n(摘要已截断)", truncated)
} else {
summary
}
}
/// Extract the main topic from a user message (first sentence or first 50 chars).
fn extract_topic(content: &str) -> Option<String> {
let trimmed = content.trim();
if trimmed.is_empty() {
return None;
}
// Find sentence end markers
for (i, char) in trimmed.char_indices() {
if char == '。' || char == '' || char == '' || char == '\n' {
let end = i + char.len_utf8();
if end <= 80 {
return Some(trimmed[..end].trim().to_string());
}
break;
}
}
if trimmed.chars().count() <= 50 {
return Some(trimmed.to_string());
}
Some(format!("{}...", trimmed.chars().take(50).collect::<String>()))
}
/// Truncate text to max_chars at char boundary.
fn truncate(text: &str, max_chars: usize) -> String {
if text.chars().count() <= max_chars {
return text.to_string();
}
let truncated: String = text.chars().take(max_chars).collect();
format!("{}...", truncated)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_estimate_tokens_empty() {
assert_eq!(estimate_tokens(""), 0);
}
#[test]
fn test_estimate_tokens_english() {
let tokens = estimate_tokens("Hello world");
assert!(tokens > 0);
}
#[test]
fn test_estimate_tokens_cjk() {
let tokens = estimate_tokens("你好世界");
assert!(tokens > 3); // CJK chars are ~1.5 tokens each
}
#[test]
fn test_estimate_messages_tokens() {
let messages = vec![
Message::user("Hello"),
Message::assistant("Hi there"),
];
let tokens = estimate_messages_tokens(&messages);
assert!(tokens > 0);
}
#[test]
fn test_compact_messages_under_threshold() {
let messages = vec![
Message::user("Hello"),
Message::assistant("Hi"),
];
let (result, removed) = compact_messages(messages, 6);
assert_eq!(removed, 0);
assert_eq!(result.len(), 2);
}
#[test]
fn test_compact_messages_over_threshold() {
let messages: Vec<Message> = (0..10)
.flat_map(|i| {
vec![
Message::user(format!("Question {}", i)),
Message::assistant(format!("Answer {}", i)),
]
})
.collect();
let (result, removed) = compact_messages(messages, 4);
assert!(removed > 0);
// Should have: 1 summary + 4 recent messages
assert_eq!(result.len(), 5);
// First message should be a system summary
assert!(matches!(&result[0], Message::System { .. }));
}
#[test]
fn test_compact_preserves_leading_system() {
let messages = vec![
Message::system("You are helpful"),
Message::user("Q1"),
Message::assistant("A1"),
Message::user("Q2"),
Message::assistant("A2"),
Message::user("Q3"),
Message::assistant("A3"),
];
let (result, removed) = compact_messages(messages, 4);
assert!(removed > 0);
// Should start with compaction summary, then recent messages
assert!(matches!(&result[0], Message::System { .. }));
}
#[test]
fn test_maybe_compact_under_threshold() {
let messages = vec![
Message::user("Short message"),
Message::assistant("Short reply"),
];
let result = maybe_compact(messages, 100_000);
assert_eq!(result.len(), 2);
}
#[test]
fn test_extract_topic_sentence() {
let topic = extract_topic("什么是Rust的所有权系统").unwrap();
assert!(topic.contains("所有权"));
}
#[test]
fn test_extract_topic_short() {
let topic = extract_topic("Hello").unwrap();
assert_eq!(topic, "Hello");
}
#[test]
fn test_extract_topic_long() {
let long = "This is a very long message that exceeds fifty characters in total length";
let topic = extract_topic(long).unwrap();
assert!(topic.ends_with("..."));
}
#[test]
fn test_generate_summary() {
let messages = vec![
Message::user("What is Rust?"),
Message::assistant("Rust is a systems programming language"),
Message::user("How does ownership work?"),
Message::assistant("Ownership is Rust's memory management system"),
];
let summary = generate_summary(&messages);
assert!(summary.contains("摘要"));
assert!(summary.contains("2"));
}
}