perf(compaction): Hermes Phase 4 — debounce + async cache + iterative summary
Step 4.1: Compaction debounce - 30s cooldown between consecutive compactions - Minimum 3 rounds (6 messages) since last compaction before re-triggering - AtomicU64 lock-free state tracking Step 4.2: Async compaction with cached fallback - During cooldown, use cached result from previous compaction - RwLock<Option<Vec<Message>>> for thread-safe cache access - Cache updated after each successful compaction Step 4.3: Iterative summary - generate_summary/generate_llm_summary accept previous_summary parameter - LLM prompt includes previous summary for cumulative context preservation - Rule-based summary carries forward [上轮摘要保留] section - previous_summary extracted from leading System messages in message history
This commit is contained in:
@@ -179,7 +179,7 @@ pub fn compact_messages(messages: Vec<Message>, keep_recent: usize) -> (Vec<Mess
|
||||
let old_messages = &messages[..split_index];
|
||||
let recent_messages = &messages[split_index..];
|
||||
|
||||
let summary = generate_summary(old_messages);
|
||||
let summary = generate_summary(old_messages, None);
|
||||
let removed_count = old_messages.len();
|
||||
|
||||
let mut compacted = Vec::with_capacity(1 + recent_messages.len());
|
||||
@@ -348,6 +348,18 @@ pub async fn maybe_compact_with_config(
|
||||
.iter()
|
||||
.take_while(|m| matches!(m, Message::System { .. }))
|
||||
.count();
|
||||
|
||||
// Extract previous summary from leading system messages for iterative summarization
|
||||
let previous_summary = messages.iter()
|
||||
.take(leading_system_count)
|
||||
.filter_map(|m| match m {
|
||||
Message::System { content } if content.starts_with("[以下是之前对话的摘要]") => {
|
||||
Some(content.clone())
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.next();
|
||||
|
||||
let keep_from_end = DEFAULT_KEEP_RECENT
|
||||
.min(messages.len().saturating_sub(leading_system_count));
|
||||
let split_index = messages.len().saturating_sub(keep_from_end);
|
||||
@@ -366,14 +378,16 @@ pub async fn maybe_compact_with_config(
|
||||
let recent_messages = &messages[split_index..];
|
||||
let removed_count = old_messages.len();
|
||||
|
||||
// Step 3: Generate summary (LLM or rule-based)
|
||||
// Step 3: Generate summary (LLM or rule-based), with iterative context
|
||||
let prev_ref = previous_summary.as_deref();
|
||||
let summary = if config.use_llm {
|
||||
if let Some(driver) = driver {
|
||||
match generate_llm_summary(driver, old_messages, config.summary_max_tokens).await {
|
||||
match generate_llm_summary(driver, old_messages, prev_ref, config.summary_max_tokens).await {
|
||||
Ok(llm_summary) => {
|
||||
tracing::info!(
|
||||
"[Compaction] Generated LLM summary ({} chars)",
|
||||
llm_summary.len()
|
||||
"[Compaction] Generated LLM summary ({} chars, iterative={})",
|
||||
llm_summary.len(),
|
||||
previous_summary.is_some()
|
||||
);
|
||||
llm_summary
|
||||
}
|
||||
@@ -383,7 +397,7 @@ pub async fn maybe_compact_with_config(
|
||||
"[Compaction] LLM summary failed: {}, falling back to rules",
|
||||
e
|
||||
);
|
||||
generate_summary(old_messages)
|
||||
generate_summary(old_messages, prev_ref)
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"[Compaction] LLM summary failed: {}, returning original messages",
|
||||
@@ -402,10 +416,10 @@ pub async fn maybe_compact_with_config(
|
||||
tracing::warn!(
|
||||
"[Compaction] LLM compaction requested but no driver available, using rules"
|
||||
);
|
||||
generate_summary(old_messages)
|
||||
generate_summary(old_messages, prev_ref)
|
||||
}
|
||||
} else {
|
||||
generate_summary(old_messages)
|
||||
generate_summary(old_messages, prev_ref)
|
||||
};
|
||||
|
||||
let used_llm = config.use_llm && driver.is_some();
|
||||
@@ -431,9 +445,11 @@ pub async fn maybe_compact_with_config(
|
||||
}
|
||||
|
||||
/// Generate a summary using an LLM driver.
|
||||
/// If `previous_summary` is provided, builds on it iteratively.
|
||||
async fn generate_llm_summary(
|
||||
driver: &Arc<dyn LlmDriver>,
|
||||
messages: &[Message],
|
||||
previous_summary: Option<&str>,
|
||||
max_tokens: u32,
|
||||
) -> Result<String, String> {
|
||||
let mut conversation_text = String::new();
|
||||
@@ -470,11 +486,21 @@ async fn generate_llm_summary(
|
||||
conversation_text.push_str("\n...(对话已截断)");
|
||||
}
|
||||
|
||||
let prompt = format!(
|
||||
"请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\
|
||||
输出格式为段落式摘要,不超过200字。\n\n{}",
|
||||
conversation_text
|
||||
);
|
||||
let prompt = match previous_summary {
|
||||
Some(prev) => format!(
|
||||
"你是一个对话摘要助手。\n\n\
|
||||
## 上一轮摘要\n{}\n\n\
|
||||
## 新增对话内容\n{}\n\n\
|
||||
请在上一轮摘要的基础上更新,保留所有关键决策、用户偏好和文件操作。\
|
||||
输出200字以内的中文摘要。",
|
||||
prev, conversation_text
|
||||
),
|
||||
None => format!(
|
||||
"请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\
|
||||
输出格式为段落式摘要,不超过200字。\n\n{}",
|
||||
conversation_text
|
||||
),
|
||||
};
|
||||
|
||||
let request = CompletionRequest {
|
||||
model: String::new(),
|
||||
@@ -517,13 +543,22 @@ async fn generate_llm_summary(
|
||||
}
|
||||
|
||||
/// Generate a rule-based summary of old messages.
|
||||
fn generate_summary(messages: &[Message]) -> String {
|
||||
/// If `previous_summary` is provided, carries forward key info.
|
||||
fn generate_summary(messages: &[Message], previous_summary: Option<&str>) -> String {
|
||||
if messages.is_empty() {
|
||||
return "[对话开始]".to_string();
|
||||
}
|
||||
|
||||
let mut sections: Vec<String> = vec!["[以下是之前对话的摘要]".to_string()];
|
||||
|
||||
// Carry forward previous summary if available
|
||||
if let Some(prev) = previous_summary {
|
||||
// Strip the header line from previous summary for cleaner nesting
|
||||
let prev_body = prev.strip_prefix("[以下是之前对话的摘要]\n")
|
||||
.unwrap_or(prev);
|
||||
sections.push(format!("[上轮摘要保留]: {}", truncate(prev_body, 200)));
|
||||
}
|
||||
|
||||
let mut user_count = 0;
|
||||
let mut assistant_count = 0;
|
||||
let mut topics: Vec<String> = Vec::new();
|
||||
@@ -729,8 +764,21 @@ mod tests {
|
||||
Message::user("How does ownership work?"),
|
||||
Message::assistant("Ownership is Rust's memory management system"),
|
||||
];
|
||||
let summary = generate_summary(&messages);
|
||||
let summary = generate_summary(&messages, None);
|
||||
assert!(summary.contains("摘要"));
|
||||
assert!(summary.contains("2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_generate_summary_iterative() {
|
||||
let messages = vec![
|
||||
Message::user("What is async/await?"),
|
||||
Message::assistant("Async/await is a concurrency model"),
|
||||
];
|
||||
let prev = "[以下是之前对话的摘要]\n讨论主题: Rust; 所有权\n(已压缩 4 条消息)";
|
||||
let summary = generate_summary(&messages, Some(prev));
|
||||
assert!(summary.contains("摘要"));
|
||||
assert!(summary.contains("上轮摘要保留"));
|
||||
assert!(summary.contains("所有权"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,21 +1,49 @@
|
||||
//! Compaction middleware — wraps the existing compaction module.
|
||||
//!
|
||||
//! Supports debounce (cooldown + min-round checks), async LLM compression
|
||||
//! with cached fallback, and iterative summaries that carry forward key info.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use zclaw_types::Result;
|
||||
use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision};
|
||||
use crate::compaction::{self, CompactionConfig};
|
||||
use crate::growth::GrowthIntegration;
|
||||
use crate::driver::LlmDriver;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use zclaw_types::{Message, Result};
|
||||
use crate::compaction::{self, CompactionConfig};
|
||||
use crate::driver::LlmDriver;
|
||||
use crate::growth::GrowthIntegration;
|
||||
use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision};
|
||||
|
||||
/// Minimum seconds between consecutive compactions.
|
||||
const COMPACTION_COOLDOWN_SECS: u64 = 30;
|
||||
/// Minimum message pairs (user+assistant) since last compaction before triggering again.
|
||||
const COMPACTION_MIN_ROUNDS: u64 = 3;
|
||||
|
||||
fn now_millis() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64
|
||||
}
|
||||
|
||||
/// Shared compaction debounce state (lock-free).
|
||||
struct CompactionState {
|
||||
last_compaction_ms: AtomicU64,
|
||||
last_compaction_msg_count: AtomicU64,
|
||||
}
|
||||
|
||||
/// Cached result from a previous async LLM compaction.
|
||||
struct AsyncCompactionCache {
|
||||
last_result: RwLock<Option<Vec<Message>>>,
|
||||
}
|
||||
|
||||
/// Middleware that compresses conversation history when it exceeds a token threshold.
|
||||
pub struct CompactionMiddleware {
|
||||
threshold: usize,
|
||||
config: CompactionConfig,
|
||||
/// Optional LLM driver for async compaction (LLM summarisation, memory flush).
|
||||
driver: Option<Arc<dyn LlmDriver>>,
|
||||
/// Optional growth integration for memory flushing during compaction.
|
||||
growth: Option<GrowthIntegration>,
|
||||
state: Arc<CompactionState>,
|
||||
cache: Arc<AsyncCompactionCache>,
|
||||
}
|
||||
|
||||
impl CompactionMiddleware {
|
||||
@@ -25,7 +53,39 @@ impl CompactionMiddleware {
|
||||
driver: Option<Arc<dyn LlmDriver>>,
|
||||
growth: Option<GrowthIntegration>,
|
||||
) -> Self {
|
||||
Self { threshold, config, driver, growth }
|
||||
Self {
|
||||
threshold,
|
||||
config,
|
||||
driver,
|
||||
growth,
|
||||
state: Arc::new(CompactionState {
|
||||
last_compaction_ms: AtomicU64::new(0),
|
||||
last_compaction_msg_count: AtomicU64::new(0),
|
||||
}),
|
||||
cache: Arc::new(AsyncCompactionCache {
|
||||
last_result: RwLock::new(None),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn should_compact(&self, msg_count: u64) -> bool {
|
||||
let last_ms = self.state.last_compaction_ms.load(Ordering::Relaxed);
|
||||
let last_count = self.state.last_compaction_msg_count.load(Ordering::Relaxed);
|
||||
|
||||
if now_millis().saturating_sub(last_ms) < COMPACTION_COOLDOWN_SECS * 1000 {
|
||||
return false;
|
||||
}
|
||||
|
||||
if msg_count.saturating_sub(last_count) < COMPACTION_MIN_ROUNDS * 2 {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn record_compaction(&self, msg_count: u64) {
|
||||
self.state.last_compaction_ms.store(now_millis(), Ordering::Relaxed);
|
||||
self.state.last_compaction_msg_count.store(msg_count, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +111,17 @@ impl AgentMiddleware for CompactionMiddleware {
|
||||
return Ok(MiddlewareDecision::Continue);
|
||||
}
|
||||
|
||||
// Step 3: Still over threshold — compact
|
||||
// Step 3: Debounce check
|
||||
if !self.should_compact(ctx.messages.len() as u64) {
|
||||
// Still over threshold but within cooldown — use cached result if available
|
||||
if let Some(cached) = self.cache.last_result.read().await.clone() {
|
||||
tracing::debug!("[CompactionMiddleware] Cooldown active, using cached compaction result");
|
||||
ctx.messages = cached;
|
||||
}
|
||||
return Ok(MiddlewareDecision::Continue);
|
||||
}
|
||||
|
||||
// Step 4: Execute compaction
|
||||
let needs_async = self.config.use_llm || self.config.memory_flush_enabled;
|
||||
if needs_async {
|
||||
let outcome = compaction::maybe_compact_with_config(
|
||||
@@ -69,6 +139,14 @@ impl AgentMiddleware for CompactionMiddleware {
|
||||
ctx.messages = compaction::maybe_compact(ctx.messages.clone(), self.threshold);
|
||||
}
|
||||
|
||||
self.record_compaction(ctx.messages.len() as u64);
|
||||
|
||||
// Cache result for cooldown fallback
|
||||
{
|
||||
let mut cache = self.cache.last_result.write().await;
|
||||
*cache = Some(ctx.messages.clone());
|
||||
}
|
||||
|
||||
Ok(MiddlewareDecision::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user