diff --git a/crates/zclaw-runtime/src/compaction.rs b/crates/zclaw-runtime/src/compaction.rs index 9e2331f..9dfe5c5 100644 --- a/crates/zclaw-runtime/src/compaction.rs +++ b/crates/zclaw-runtime/src/compaction.rs @@ -179,7 +179,7 @@ pub fn compact_messages(messages: Vec, keep_recent: usize) -> (Vec { + Some(content.clone()) + } + _ => None, + }) + .next(); + let keep_from_end = DEFAULT_KEEP_RECENT .min(messages.len().saturating_sub(leading_system_count)); let split_index = messages.len().saturating_sub(keep_from_end); @@ -366,14 +378,16 @@ pub async fn maybe_compact_with_config( let recent_messages = &messages[split_index..]; let removed_count = old_messages.len(); - // Step 3: Generate summary (LLM or rule-based) + // Step 3: Generate summary (LLM or rule-based), with iterative context + let prev_ref = previous_summary.as_deref(); let summary = if config.use_llm { if let Some(driver) = driver { - match generate_llm_summary(driver, old_messages, config.summary_max_tokens).await { + match generate_llm_summary(driver, old_messages, prev_ref, config.summary_max_tokens).await { Ok(llm_summary) => { tracing::info!( - "[Compaction] Generated LLM summary ({} chars)", - llm_summary.len() + "[Compaction] Generated LLM summary ({} chars, iterative={})", + llm_summary.len(), + previous_summary.is_some() ); llm_summary } @@ -383,7 +397,7 @@ pub async fn maybe_compact_with_config( "[Compaction] LLM summary failed: {}, falling back to rules", e ); - generate_summary(old_messages) + generate_summary(old_messages, prev_ref) } else { tracing::warn!( "[Compaction] LLM summary failed: {}, returning original messages", @@ -402,10 +416,10 @@ pub async fn maybe_compact_with_config( tracing::warn!( "[Compaction] LLM compaction requested but no driver available, using rules" ); - generate_summary(old_messages) + generate_summary(old_messages, prev_ref) } } else { - generate_summary(old_messages) + generate_summary(old_messages, prev_ref) }; let used_llm = config.use_llm && driver.is_some(); @@ -431,9 +445,11 @@ pub async fn maybe_compact_with_config( } /// Generate a summary using an LLM driver. +/// If `previous_summary` is provided, builds on it iteratively. async fn generate_llm_summary( driver: &Arc, messages: &[Message], + previous_summary: Option<&str>, max_tokens: u32, ) -> Result { let mut conversation_text = String::new(); @@ -470,11 +486,21 @@ async fn generate_llm_summary( conversation_text.push_str("\n...(对话已截断)"); } - let prompt = format!( - "请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\ - 输出格式为段落式摘要,不超过200字。\n\n{}", - conversation_text - ); + let prompt = match previous_summary { + Some(prev) => format!( + "你是一个对话摘要助手。\n\n\ + ## 上一轮摘要\n{}\n\n\ + ## 新增对话内容\n{}\n\n\ + 请在上一轮摘要的基础上更新,保留所有关键决策、用户偏好和文件操作。\ + 输出200字以内的中文摘要。", + prev, conversation_text + ), + None => format!( + "请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\ + 输出格式为段落式摘要,不超过200字。\n\n{}", + conversation_text + ), + }; let request = CompletionRequest { model: String::new(), @@ -517,13 +543,22 @@ async fn generate_llm_summary( } /// Generate a rule-based summary of old messages. -fn generate_summary(messages: &[Message]) -> String { +/// If `previous_summary` is provided, carries forward key info. +fn generate_summary(messages: &[Message], previous_summary: Option<&str>) -> String { if messages.is_empty() { return "[对话开始]".to_string(); } let mut sections: Vec = vec!["[以下是之前对话的摘要]".to_string()]; + // Carry forward previous summary if available + if let Some(prev) = previous_summary { + // Strip the header line from previous summary for cleaner nesting + let prev_body = prev.strip_prefix("[以下是之前对话的摘要]\n") + .unwrap_or(prev); + sections.push(format!("[上轮摘要保留]: {}", truncate(prev_body, 200))); + } + let mut user_count = 0; let mut assistant_count = 0; let mut topics: Vec = Vec::new(); @@ -729,8 +764,21 @@ mod tests { Message::user("How does ownership work?"), Message::assistant("Ownership is Rust's memory management system"), ]; - let summary = generate_summary(&messages); + let summary = generate_summary(&messages, None); assert!(summary.contains("摘要")); assert!(summary.contains("2")); } + + #[test] + fn test_generate_summary_iterative() { + let messages = vec![ + Message::user("What is async/await?"), + Message::assistant("Async/await is a concurrency model"), + ]; + let prev = "[以下是之前对话的摘要]\n讨论主题: Rust; 所有权\n(已压缩 4 条消息)"; + let summary = generate_summary(&messages, Some(prev)); + assert!(summary.contains("摘要")); + assert!(summary.contains("上轮摘要保留")); + assert!(summary.contains("所有权")); + } } diff --git a/crates/zclaw-runtime/src/middleware/compaction.rs b/crates/zclaw-runtime/src/middleware/compaction.rs index c94e897..2b0d65a 100644 --- a/crates/zclaw-runtime/src/middleware/compaction.rs +++ b/crates/zclaw-runtime/src/middleware/compaction.rs @@ -1,21 +1,49 @@ //! Compaction middleware — wraps the existing compaction module. +//! +//! Supports debounce (cooldown + min-round checks), async LLM compression +//! with cached fallback, and iterative summaries that carry forward key info. use async_trait::async_trait; -use zclaw_types::Result; -use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision}; -use crate::compaction::{self, CompactionConfig}; -use crate::growth::GrowthIntegration; -use crate::driver::LlmDriver; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use tokio::sync::RwLock; +use zclaw_types::{Message, Result}; +use crate::compaction::{self, CompactionConfig}; +use crate::driver::LlmDriver; +use crate::growth::GrowthIntegration; +use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision}; + +/// Minimum seconds between consecutive compactions. +const COMPACTION_COOLDOWN_SECS: u64 = 30; +/// Minimum message pairs (user+assistant) since last compaction before triggering again. +const COMPACTION_MIN_ROUNDS: u64 = 3; + +fn now_millis() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +/// Shared compaction debounce state (lock-free). +struct CompactionState { + last_compaction_ms: AtomicU64, + last_compaction_msg_count: AtomicU64, +} + +/// Cached result from a previous async LLM compaction. +struct AsyncCompactionCache { + last_result: RwLock>>, +} /// Middleware that compresses conversation history when it exceeds a token threshold. pub struct CompactionMiddleware { threshold: usize, config: CompactionConfig, - /// Optional LLM driver for async compaction (LLM summarisation, memory flush). driver: Option>, - /// Optional growth integration for memory flushing during compaction. growth: Option, + state: Arc, + cache: Arc, } impl CompactionMiddleware { @@ -25,7 +53,39 @@ impl CompactionMiddleware { driver: Option>, growth: Option, ) -> Self { - Self { threshold, config, driver, growth } + Self { + threshold, + config, + driver, + growth, + state: Arc::new(CompactionState { + last_compaction_ms: AtomicU64::new(0), + last_compaction_msg_count: AtomicU64::new(0), + }), + cache: Arc::new(AsyncCompactionCache { + last_result: RwLock::new(None), + }), + } + } + + fn should_compact(&self, msg_count: u64) -> bool { + let last_ms = self.state.last_compaction_ms.load(Ordering::Relaxed); + let last_count = self.state.last_compaction_msg_count.load(Ordering::Relaxed); + + if now_millis().saturating_sub(last_ms) < COMPACTION_COOLDOWN_SECS * 1000 { + return false; + } + + if msg_count.saturating_sub(last_count) < COMPACTION_MIN_ROUNDS * 2 { + return false; + } + + true + } + + fn record_compaction(&self, msg_count: u64) { + self.state.last_compaction_ms.store(now_millis(), Ordering::Relaxed); + self.state.last_compaction_msg_count.store(msg_count, Ordering::Relaxed); } } @@ -51,7 +111,17 @@ impl AgentMiddleware for CompactionMiddleware { return Ok(MiddlewareDecision::Continue); } - // Step 3: Still over threshold — compact + // Step 3: Debounce check + if !self.should_compact(ctx.messages.len() as u64) { + // Still over threshold but within cooldown — use cached result if available + if let Some(cached) = self.cache.last_result.read().await.clone() { + tracing::debug!("[CompactionMiddleware] Cooldown active, using cached compaction result"); + ctx.messages = cached; + } + return Ok(MiddlewareDecision::Continue); + } + + // Step 4: Execute compaction let needs_async = self.config.use_llm || self.config.memory_flush_enabled; if needs_async { let outcome = compaction::maybe_compact_with_config( @@ -69,6 +139,14 @@ impl AgentMiddleware for CompactionMiddleware { ctx.messages = compaction::maybe_compact(ctx.messages.clone(), self.threshold); } + self.record_compaction(ctx.messages.len() as u64); + + // Cache result for cooldown fallback + { + let mut cache = self.cache.last_result.write().await; + *cache = Some(ctx.messages.clone()); + } + Ok(MiddlewareDecision::Continue) } }