Compare commits

...

3 Commits

Author SHA1 Message Date
iven
b60b96225d docs(wiki): Hermes Phase 1-4 wiki 同步
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- hands-skills: 新增 concurrency() 声明要求不变量
- log: 追加 Hermes Phase 1-4 变更记录
- index: 更新日期
2026-04-24 08:54:48 +08:00
iven
06e93a21af perf(compaction): Hermes Phase 4 — debounce + async cache + iterative summary
Step 4.1: Compaction debounce
- 30s cooldown between consecutive compactions
- Minimum 3 rounds (6 messages) since last compaction before re-triggering
- AtomicU64 lock-free state tracking

Step 4.2: Async compaction with cached fallback
- During cooldown, use cached result from previous compaction
- RwLock<Option<Vec<Message>>> for thread-safe cache access
- Cache updated after each successful compaction

Step 4.3: Iterative summary
- generate_summary/generate_llm_summary accept previous_summary parameter
- LLM prompt includes previous summary for cumulative context preservation
- Rule-based summary carries forward [上轮摘要保留] section
- previous_summary extracted from leading System messages in message history
2026-04-24 08:53:37 +08:00
iven
9060935401 perf(runtime): Hermes Phase 1-3 — prompt caching + parallel tools + smart retry
Phase 1: Anthropic prompt caching
- Add cache_control ephemeral on system prompt blocks
- Track cache_creation/cache_read tokens in CompletionResponse + StreamChunk

Phase 2A: Parallel tool execution
- Add ToolConcurrency enum (ReadOnly/Exclusive/Interactive)
- JoinSet + Semaphore(3) for bounded parallel tool calls
- 7 tools annotated with correct concurrency level
- AtomicU32 for lock-free failure tracking in ToolErrorMiddleware

Phase 2B: Tool output pruning
- prune_tool_outputs() trims old ToolResult > 2000 chars to 500 chars
- Integrated into CompactionMiddleware before token estimation

Phase 3: Error classification + smart retry
- LlmErrorKind + ClassifiedLlmError for structured error mapping
- RetryDriver decorator with jittered exponential backoff
- Kernel wraps all LLM calls with RetryDriver
- CONTEXT_OVERFLOW recovery triggers emergency compaction in loop_runner
2026-04-24 08:39:56 +08:00
28 changed files with 857 additions and 164 deletions

View File

@@ -117,7 +117,9 @@ impl Kernel {
}
}
use zclaw_runtime::{AgentLoop, tool::builtin::PathValidator};
use std::sync::Arc;
use zclaw_runtime::{AgentLoop, LlmDriver, tool::builtin::PathValidator};
use zclaw_runtime::driver::{RetryDriver, RetryConfig};
use super::Kernel;
use super::super::MessageResponse;
@@ -161,9 +163,12 @@ impl Kernel {
let subagent_enabled = chat_mode.as_ref().and_then(|m| m.subagent_enabled).unwrap_or(false);
let tools = self.create_tool_registry(subagent_enabled);
self.skill_executor.set_tool_registry(tools.clone());
let driver: Arc<dyn LlmDriver> = Arc::new(
RetryDriver::new(self.driver.clone(), RetryConfig::default())
);
let mut loop_runner = AgentLoop::new(
*agent_id,
self.driver.clone(),
driver,
tools,
self.memory.clone(),
)
@@ -275,9 +280,12 @@ impl Kernel {
let subagent_enabled = chat_mode.as_ref().and_then(|m| m.subagent_enabled).unwrap_or(false);
let tools = self.create_tool_registry(subagent_enabled);
self.skill_executor.set_tool_registry(tools.clone());
let driver: Arc<dyn LlmDriver> = Arc::new(
RetryDriver::new(self.driver.clone(), RetryConfig::default())
);
let mut loop_runner = AgentLoop::new(
*agent_id,
self.driver.clone(),
driver,
tools,
self.memory.clone(),
)

View File

@@ -31,6 +31,8 @@ async fn seam_hand_tool_routing() {
input_tokens: 10,
output_tokens: 20,
stop_reason: "tool_use".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
])
// Second stream: final text after tool executes
@@ -40,6 +42,8 @@ async fn seam_hand_tool_routing() {
input_tokens: 10,
output_tokens: 5,
stop_reason: "end_turn".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
]);
@@ -105,6 +109,8 @@ async fn seam_hand_execution_callback() {
input_tokens: 10,
output_tokens: 5,
stop_reason: "tool_use".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
])
.with_stream_chunks(vec![
@@ -113,6 +119,8 @@ async fn seam_hand_execution_callback() {
input_tokens: 5,
output_tokens: 1,
stop_reason: "end_turn".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
]);
@@ -173,6 +181,8 @@ async fn seam_generic_tool_routing() {
input_tokens: 10,
output_tokens: 5,
stop_reason: "tool_use".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
])
.with_stream_chunks(vec![
@@ -181,6 +191,8 @@ async fn seam_generic_tool_routing() {
input_tokens: 5,
output_tokens: 3,
stop_reason: "end_turn".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
]);

View File

@@ -27,6 +27,8 @@ async fn smoke_hands_full_lifecycle() {
input_tokens: 15,
output_tokens: 10,
stop_reason: "tool_use".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
])
// After hand_quiz returns, LLM generates final response
@@ -36,6 +38,8 @@ async fn smoke_hands_full_lifecycle() {
input_tokens: 20,
output_tokens: 5,
stop_reason: "end_turn".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
]);

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use serde_json::Value;
use zclaw_types::{AgentId, Message, SessionId};
use crate::driver::{CompletionRequest, ContentBlock, LlmDriver};
@@ -136,7 +137,7 @@ pub fn update_calibration(estimated: usize, actual: u32) {
}
/// Estimate total tokens for messages with calibration applied.
fn estimate_messages_tokens_calibrated(messages: &[Message]) -> usize {
pub fn estimate_messages_tokens_calibrated(messages: &[Message]) -> usize {
let raw = estimate_messages_tokens(messages);
let factor = get_calibration_factor();
if (factor - 1.0).abs() < f64::EPSILON {
@@ -178,7 +179,7 @@ pub fn compact_messages(messages: Vec<Message>, keep_recent: usize) -> (Vec<Mess
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
let summary = generate_summary(old_messages);
let summary = generate_summary(old_messages, None);
let removed_count = old_messages.len();
let mut compacted = Vec::with_capacity(1 + recent_messages.len());
@@ -188,6 +189,38 @@ pub fn compact_messages(messages: Vec<Message>, keep_recent: usize) -> (Vec<Mess
(compacted, removed_count)
}
/// Prune old tool outputs to reduce token consumption. Runs before compaction.
/// Only prunes ToolResult messages older than PRUNE_AGE_THRESHOLD messages.
const PRUNE_AGE_THRESHOLD: usize = 8;
const PRUNE_MAX_CHARS: usize = 2000;
const PRUNE_KEEP_HEAD_CHARS: usize = 500;
pub fn prune_tool_outputs(messages: &mut [Message]) -> usize {
let total = messages.len();
let mut pruned_count = 0;
for i in 0..total.saturating_sub(PRUNE_AGE_THRESHOLD) {
if let Message::ToolResult { output, is_error, .. } = &mut messages[i] {
if *is_error { continue; }
let text = match output {
Value::String(ref s) => s.clone(),
ref other => other.to_string(),
};
if text.len() <= PRUNE_MAX_CHARS { continue; }
let end = text.floor_char_boundary(PRUNE_KEEP_HEAD_CHARS.min(text.len()));
*output = serde_json::json!({
"_pruned": true,
"_original_chars": text.len(),
"head": &text[..end],
});
pruned_count += 1;
}
}
pruned_count
}
/// Check if compaction should be triggered and perform it if needed.
///
/// Returns the (possibly compacted) message list.
@@ -315,6 +348,18 @@ pub async fn maybe_compact_with_config(
.iter()
.take_while(|m| matches!(m, Message::System { .. }))
.count();
// Extract previous summary from leading system messages for iterative summarization
let previous_summary = messages.iter()
.take(leading_system_count)
.filter_map(|m| match m {
Message::System { content } if content.starts_with("[以下是之前对话的摘要]") => {
Some(content.clone())
}
_ => None,
})
.next();
let keep_from_end = DEFAULT_KEEP_RECENT
.min(messages.len().saturating_sub(leading_system_count));
let split_index = messages.len().saturating_sub(keep_from_end);
@@ -333,14 +378,16 @@ pub async fn maybe_compact_with_config(
let recent_messages = &messages[split_index..];
let removed_count = old_messages.len();
// Step 3: Generate summary (LLM or rule-based)
// Step 3: Generate summary (LLM or rule-based), with iterative context
let prev_ref = previous_summary.as_deref();
let summary = if config.use_llm {
if let Some(driver) = driver {
match generate_llm_summary(driver, old_messages, config.summary_max_tokens).await {
match generate_llm_summary(driver, old_messages, prev_ref, config.summary_max_tokens).await {
Ok(llm_summary) => {
tracing::info!(
"[Compaction] Generated LLM summary ({} chars)",
llm_summary.len()
"[Compaction] Generated LLM summary ({} chars, iterative={})",
llm_summary.len(),
previous_summary.is_some()
);
llm_summary
}
@@ -350,7 +397,7 @@ pub async fn maybe_compact_with_config(
"[Compaction] LLM summary failed: {}, falling back to rules",
e
);
generate_summary(old_messages)
generate_summary(old_messages, prev_ref)
} else {
tracing::warn!(
"[Compaction] LLM summary failed: {}, returning original messages",
@@ -369,10 +416,10 @@ pub async fn maybe_compact_with_config(
tracing::warn!(
"[Compaction] LLM compaction requested but no driver available, using rules"
);
generate_summary(old_messages)
generate_summary(old_messages, prev_ref)
}
} else {
generate_summary(old_messages)
generate_summary(old_messages, prev_ref)
};
let used_llm = config.use_llm && driver.is_some();
@@ -398,9 +445,11 @@ pub async fn maybe_compact_with_config(
}
/// Generate a summary using an LLM driver.
/// If `previous_summary` is provided, builds on it iteratively.
async fn generate_llm_summary(
driver: &Arc<dyn LlmDriver>,
messages: &[Message],
previous_summary: Option<&str>,
max_tokens: u32,
) -> Result<String, String> {
let mut conversation_text = String::new();
@@ -437,11 +486,21 @@ async fn generate_llm_summary(
conversation_text.push_str("\n...(对话已截断)");
}
let prompt = format!(
"请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\
输出格式为段落式摘要不超过200字。\n\n{}",
conversation_text
);
let prompt = match previous_summary {
Some(prev) => format!(
"你是一个对话摘要助手。\n\n\
## 上一轮摘要\n{}\n\n\
## 新增对话内容\n{}\n\n\
请在上一轮摘要的基础上更新,保留所有关键决策、用户偏好和文件操作。\
输出200字以内的中文摘要。",
prev, conversation_text
),
None => format!(
"请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\
输出格式为段落式摘要不超过200字。\n\n{}",
conversation_text
),
};
let request = CompletionRequest {
model: String::new(),
@@ -484,13 +543,22 @@ async fn generate_llm_summary(
}
/// Generate a rule-based summary of old messages.
fn generate_summary(messages: &[Message]) -> String {
/// If `previous_summary` is provided, carries forward key info.
fn generate_summary(messages: &[Message], previous_summary: Option<&str>) -> String {
if messages.is_empty() {
return "[对话开始]".to_string();
}
let mut sections: Vec<String> = vec!["[以下是之前对话的摘要]".to_string()];
// Carry forward previous summary if available
if let Some(prev) = previous_summary {
// Strip the header line from previous summary for cleaner nesting
let prev_body = prev.strip_prefix("[以下是之前对话的摘要]\n")
.unwrap_or(prev);
sections.push(format!("[上轮摘要保留]: {}", truncate(prev_body, 200)));
}
let mut user_count = 0;
let mut assistant_count = 0;
let mut topics: Vec<String> = Vec::new();
@@ -696,8 +764,21 @@ mod tests {
Message::user("How does ownership work?"),
Message::assistant("Ownership is Rust's memory management system"),
];
let summary = generate_summary(&messages);
let summary = generate_summary(&messages, None);
assert!(summary.contains("摘要"));
assert!(summary.contains("2"));
}
#[test]
fn test_generate_summary_iterative() {
let messages = vec![
Message::user("What is async/await?"),
Message::assistant("Async/await is a concurrency model"),
];
let prev = "[以下是之前对话的摘要]\n讨论主题: Rust; 所有权\n(已压缩 4 条消息)";
let summary = generate_summary(&messages, Some(prev));
assert!(summary.contains("摘要"));
assert!(summary.contains("上轮摘要保留"));
assert!(summary.contains("所有权"));
}
}

View File

@@ -121,6 +121,8 @@ impl LlmDriver for AnthropicDriver {
let mut byte_stream = response.bytes_stream();
let mut current_tool_id: Option<String> = None;
let mut tool_input_buffer = String::new();
let mut cache_creation_input_tokens: Option<u32> = None;
let mut cache_read_input_tokens: Option<u32> = None;
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
@@ -141,6 +143,15 @@ impl LlmDriver for AnthropicDriver {
match serde_json::from_str::<AnthropicStreamEvent>(data) {
Ok(event) => {
match event.event_type.as_str() {
"message_start" => {
// Capture cache token info from message_start event
if let Some(msg) = event.message {
if let Some(usage) = msg.usage {
cache_creation_input_tokens = usage.cache_creation_input_tokens;
cache_read_input_tokens = usage.cache_read_input_tokens;
}
}
}
"content_block_delta" => {
if let Some(delta) = event.delta {
if let Some(text) = delta.text {
@@ -186,6 +197,8 @@ impl LlmDriver for AnthropicDriver {
input_tokens: msg.usage.as_ref().map(|u| u.input_tokens).unwrap_or(0),
output_tokens: msg.usage.as_ref().map(|u| u.output_tokens).unwrap_or(0),
stop_reason: msg.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
cache_creation_input_tokens,
cache_read_input_tokens,
});
}
}
@@ -298,7 +311,15 @@ impl AnthropicDriver {
AnthropicRequest {
model: request.model.clone(),
max_tokens: effective_max,
system: request.system.clone(),
system: request.system.as_ref().map(|s| {
vec![SystemContentBlock {
r#type: "text".to_string(),
text: s.clone(),
cache_control: Some(CacheControl {
r#type: "ephemeral".to_string(),
}),
}]
}),
messages,
tools: if tools.is_empty() { None } else { Some(tools) },
temperature: request.temperature,
@@ -337,18 +358,35 @@ impl AnthropicDriver {
input_tokens: api_response.usage.input_tokens,
output_tokens: api_response.usage.output_tokens,
stop_reason,
cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens,
cache_read_input_tokens: api_response.usage.cache_read_input_tokens,
}
}
}
// Anthropic API types
/// Anthropic cache_control 标记
#[derive(Serialize, Clone)]
struct CacheControl {
r#type: String, // "ephemeral"
}
/// Anthropic system prompt 内容块(支持 cache_control
#[derive(Serialize, Clone)]
struct SystemContentBlock {
r#type: String, // "text"
text: String,
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<CacheControl>,
}
#[derive(Serialize)]
struct AnthropicRequest {
model: String,
max_tokens: u32,
#[serde(skip_serializing_if = "Option::is_none")]
system: Option<String>,
system: Option<Vec<SystemContentBlock>>,
messages: Vec<AnthropicMessage>,
#[serde(skip_serializing_if = "Option::is_none")]
tools: Option<Vec<AnthropicTool>>,
@@ -404,6 +442,10 @@ struct AnthropicContentBlock {
struct AnthropicUsage {
input_tokens: u32,
output_tokens: u32,
#[serde(default)]
cache_creation_input_tokens: Option<u32>,
#[serde(default)]
cache_read_input_tokens: Option<u32>,
}
// Streaming types
@@ -458,4 +500,8 @@ struct AnthropicStreamUsage {
input_tokens: u32,
#[serde(default)]
output_tokens: u32,
#[serde(default)]
cache_creation_input_tokens: Option<u32>,
#[serde(default)]
cache_read_input_tokens: Option<u32>,
}

View File

@@ -0,0 +1,139 @@
//! LLM 错误分类器。将 HTTP 状态码 + 错误体映射为 LlmErrorKind。
use std::time::Duration;
use zclaw_types::{LlmErrorKind, ClassifiedLlmError};
/// 分类 LLM 错误
pub fn classify_llm_error(
provider: &str,
status: u16,
body: &str,
is_timeout: bool,
) -> ClassifiedLlmError {
let _ = provider; // reserved for per-provider overrides
if is_timeout {
return ClassifiedLlmError {
kind: LlmErrorKind::Timeout,
retryable: true,
should_compress: false,
should_rotate_credential: false,
retry_after: None,
message: "请求超时".to_string(),
};
}
match status {
401 | 403 => ClassifiedLlmError {
kind: LlmErrorKind::Auth,
retryable: false,
should_compress: false,
should_rotate_credential: true,
retry_after: None,
message: "认证失败,请检查 API Key".to_string(),
},
402 => {
let is_quota_transient = body.contains("retry")
|| body.contains("limit")
|| body.contains("usage");
ClassifiedLlmError {
kind: if is_quota_transient { LlmErrorKind::RateLimited } else { LlmErrorKind::BillingExhausted },
retryable: is_quota_transient,
should_compress: false,
should_rotate_credential: !is_quota_transient,
retry_after: if is_quota_transient { Some(Duration::from_secs(30)) } else { None },
message: if is_quota_transient { "使用限制,稍后重试".to_string() } else { "计费额度已耗尽".to_string() },
}
}
429 => ClassifiedLlmError {
kind: LlmErrorKind::RateLimited,
retryable: true,
should_compress: false,
should_rotate_credential: true,
retry_after: parse_retry_after(body),
message: "速率限制".to_string(),
},
529 => ClassifiedLlmError {
kind: LlmErrorKind::Overloaded,
retryable: true,
should_compress: false,
should_rotate_credential: false,
retry_after: Some(Duration::from_secs(5)),
message: "提供商过载".to_string(),
},
500 | 502 => ClassifiedLlmError {
kind: LlmErrorKind::ServerError,
retryable: true,
should_compress: false,
should_rotate_credential: false,
retry_after: None,
message: "服务端错误".to_string(),
},
503 => ClassifiedLlmError {
kind: LlmErrorKind::Overloaded,
retryable: true,
should_compress: false,
should_rotate_credential: false,
retry_after: Some(Duration::from_secs(3)),
message: "服务暂时不可用".to_string(),
},
400 => {
let is_context_overflow = body.contains("context_length")
|| body.contains("max_tokens")
|| body.contains("too many tokens")
|| body.contains("prompt is too long");
ClassifiedLlmError {
kind: if is_context_overflow { LlmErrorKind::ContextOverflow } else { LlmErrorKind::Unknown },
retryable: false,
should_compress: is_context_overflow,
should_rotate_credential: false,
retry_after: None,
message: if is_context_overflow {
"上下文过长,需要压缩".to_string()
} else {
format!("请求错误: {}", &body[..body.len().min(200)])
},
}
}
404 => ClassifiedLlmError {
kind: LlmErrorKind::ModelNotFound,
retryable: false,
should_compress: false,
should_rotate_credential: false,
retry_after: None,
message: "模型不存在".to_string(),
},
_ => ClassifiedLlmError {
kind: LlmErrorKind::Unknown,
retryable: true,
should_compress: false,
should_rotate_credential: false,
retry_after: None,
message: format!("未知错误 ({}) {}", status, &body[..body.len().min(200)]),
},
}
}
fn parse_retry_after(body: &str) -> Option<Duration> {
// Anthropic: "Please retry after X seconds"
// OpenAI: "Please retry after Xms"
if let Some(secs) = extract_retry_seconds(body) {
return Some(Duration::from_secs(secs));
}
if let Some(ms) = extract_retry_millis(body) {
return Some(Duration::from_millis(ms));
}
Some(Duration::from_secs(2))
}
fn extract_retry_seconds(body: &str) -> Option<u64> {
let re = regex::Regex::new(r"retry\s+(?:after\s+)?(\d+)\s*(?:s|sec|seconds?)").ok()?;
let caps = re.captures(body)?;
caps[1].parse().ok()
}
fn extract_retry_millis(body: &str) -> Option<u64> {
let re = regex::Regex::new(r"retry\s+(?:after\s+)?(\d+)\s*ms").ok()?;
let caps = re.captures(body)?;
caps[1].parse().ok()
}

View File

@@ -238,6 +238,8 @@ impl LlmDriver for GeminiDriver {
input_tokens,
output_tokens,
stop_reason: stop_reason.to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
});
}
}
@@ -500,6 +502,8 @@ impl GeminiDriver {
input_tokens,
output_tokens,
stop_reason,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
}
}
}

View File

@@ -238,6 +238,8 @@ impl LocalDriver {
input_tokens,
output_tokens,
stop_reason,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
}
}
@@ -396,6 +398,8 @@ impl LlmDriver for LocalDriver {
input_tokens: 0,
output_tokens: 0,
stop_reason: "end_turn".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
});
continue;
}

View File

@@ -15,11 +15,14 @@ mod anthropic;
mod openai;
mod gemini;
mod local;
mod error_classifier;
mod retry_driver;
pub use anthropic::AnthropicDriver;
pub use openai::OpenAiDriver;
pub use gemini::GeminiDriver;
pub use local::LocalDriver;
pub use retry_driver::{RetryDriver, RetryConfig};
/// LLM Driver trait - unified interface for all providers
#[async_trait]
@@ -106,6 +109,12 @@ pub struct CompletionResponse {
pub output_tokens: u32,
/// Stop reason
pub stop_reason: StopReason,
/// Cache creation input tokens (Anthropic prompt caching)
#[serde(default)]
pub cache_creation_input_tokens: Option<u32>,
/// Cache read input tokens (Anthropic prompt caching)
#[serde(default)]
pub cache_read_input_tokens: Option<u32>,
}
/// LLM driver response content block (subset of canonical zclaw_types::ContentBlock).

View File

@@ -237,6 +237,8 @@ impl LlmDriver for OpenAiDriver {
input_tokens: 0,
output_tokens: 0,
stop_reason: "end_turn".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
});
continue;
}
@@ -638,6 +640,8 @@ impl OpenAiDriver {
input_tokens,
output_tokens,
stop_reason,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
}
}
@@ -761,6 +765,8 @@ impl OpenAiDriver {
StopReason::StopSequence => "stop",
StopReason::Error => "error",
}.to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
});
})
}

View File

@@ -0,0 +1,123 @@
//! RetryDriver: LlmDriver 的重试装饰器。
//! 仅在本地 Kernel 路径使用SaaS Relay 已有自己的重试逻辑。
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::Stream;
use rand::Rng;
use zclaw_types::{Result, ZclawError};
use super::{LlmDriver, CompletionRequest, CompletionResponse, StreamChunk};
use super::error_classifier::classify_llm_error;
/// 重试配置
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub base_delay_secs: f64,
pub max_delay_secs: f64,
pub jitter_ratio: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
base_delay_secs: 1.0,
max_delay_secs: 8.0,
jitter_ratio: 0.5,
}
}
}
/// 重试装饰器
pub struct RetryDriver {
inner: Arc<dyn LlmDriver>,
config: RetryConfig,
}
impl RetryDriver {
pub fn new(inner: Arc<dyn LlmDriver>, config: RetryConfig) -> Self {
Self { inner, config }
}
fn jittered_backoff(&self, attempt: u32) -> Duration {
let base = self.config.base_delay_secs * 2_f64.powi(attempt as i32);
let capped = base.min(self.config.max_delay_secs);
let mut rng = rand::thread_rng();
let jitter = capped * self.config.jitter_ratio * rng.gen::<f64>();
Duration::from_secs_f64(capped + jitter)
}
}
#[async_trait]
impl LlmDriver for RetryDriver {
fn provider(&self) -> &str {
self.inner.provider()
}
async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
let mut last_error: Option<ZclawError> = None;
for attempt in 0..self.config.max_attempts {
match self.inner.complete(request.clone()).await {
Ok(response) => return Ok(response),
Err(e) => {
let message = e.to_string();
let status = extract_status_from_error(&message);
let classified = classify_llm_error(
self.inner.provider(),
status,
&message,
message.contains("timeout") || message.contains("Timeout"),
);
if !classified.retryable {
return Err(e);
}
if classified.should_compress {
return Err(ZclawError::LlmError(
format!("[CONTEXT_OVERFLOW] {}", message)
));
}
last_error = Some(e);
if attempt + 1 < self.config.max_attempts {
let delay = classified.retry_after
.unwrap_or_else(|| self.jittered_backoff(attempt));
tracing::warn!(
"[RetryDriver] Attempt {}/{} failed ({}), retrying in {:.1}s",
attempt + 1, self.config.max_attempts, classified.message,
delay.as_secs_f64()
);
tokio::time::sleep(delay).await;
}
}
}
}
Err(last_error.unwrap_or_else(|| ZclawError::LlmError("重试耗尽".to_string())))
}
fn stream(
&self,
request: CompletionRequest,
) -> std::pin::Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
// 流式路径不重试——部分 delta 已发送,重试会导致 UI 重复
self.inner.stream(request)
}
fn is_configured(&self) -> bool {
self.inner.is_configured()
}
}
fn extract_status_from_error(message: &str) -> u16 {
let re = regex::Regex::new(r"(?:error|status)[:\s]+(\d{3})").ok();
re.and_then(|re| re.captures(message))
.and_then(|caps| caps[1].parse().ok())
.unwrap_or(0)
}

View File

@@ -4,10 +4,11 @@ use std::sync::Arc;
use futures::StreamExt;
use tokio::sync::mpsc;
use zclaw_types::{AgentId, SessionId, Message, Result};
use serde_json::Value;
use crate::driver::{LlmDriver, CompletionRequest, ContentBlock};
use crate::stream::StreamChunk;
use crate::tool::{ToolRegistry, ToolContext, SkillExecutor, HandExecutor};
use crate::tool::{ToolRegistry, ToolContext, SkillExecutor, HandExecutor, ToolConcurrency};
use crate::tool::builtin::PathValidator;
use crate::growth::GrowthIntegration;
use crate::compaction::{self, CompactionConfig};
@@ -303,8 +304,28 @@ impl AgentLoop {
plan_mode: self.plan_mode,
};
// Call LLM
let response = self.driver.complete(request).await?;
// Call LLM with context-overflow recovery
let response = match self.driver.complete(request).await {
Ok(r) => r,
Err(e) => {
let err_str = e.to_string();
if err_str.contains("[CONTEXT_OVERFLOW]") && self.compaction_threshold > 0 {
tracing::warn!("[AgentLoop] Context overflow detected, triggering emergency compaction");
let pruned = compaction::prune_tool_outputs(&mut messages);
if pruned > 0 {
tracing::info!("[AgentLoop] Emergency pruning removed {} tool outputs", pruned);
}
let keep_recent = messages.len().saturating_sub(messages.len() / 3);
let (compacted, removed) = compaction::compact_messages(messages, keep_recent.max(4));
if removed > 0 {
tracing::info!("[AgentLoop] Emergency compaction removed {} messages", removed);
messages = compacted;
continue; // retry the iteration with compacted messages
}
}
return Err(e);
}
};
total_input_tokens += response.input_tokens;
total_output_tokens += response.output_tokens;
@@ -375,21 +396,22 @@ impl AgentLoop {
let tool_context = self.create_tool_context(session_id.clone());
let mut abort_result: Option<AgentLoopResult> = None;
let mut clarification_result: Option<AgentLoopResult> = None;
for (id, name, input) in tool_calls {
// Check if loop was already aborted
if abort_result.is_some() {
break;
}
// Phase 1: Pre-process inputs + middleware checks (serial)
struct ToolPlan {
idx: usize,
id: String,
name: String,
input: Value,
}
let mut plans: Vec<ToolPlan> = Vec::new();
for (idx, (id, name, input)) in tool_calls.into_iter().enumerate() {
if abort_result.is_some() { break; }
// GLM and other models sometimes send tool calls with empty arguments `{}`
// Inject the last user message as a fallback query so the tool can infer intent.
let input = if input.as_object().map_or(false, |obj| obj.is_empty()) {
if let Some(last_user_msg) = messages.iter().rev().find_map(|m| {
if let Message::User { content } = m {
Some(content.clone())
} else {
None
}
if let Message::User { content } = m { Some(content.clone()) } else { None }
}) {
tracing::info!("[AgentLoop] Tool '{}' received empty input, injecting user message as fallback query", name);
serde_json::json!({ "_fallback_query": last_user_msg })
@@ -400,101 +422,138 @@ impl AgentLoop {
input
};
// Check tool call safety — via middleware chain
{
let mw_ctx_ref = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
user_input: input.to_string(),
system_prompt: enhanced_prompt.clone(),
messages: messages.clone(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
match self.middleware_chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? {
middleware::ToolCallDecision::Allow => {}
middleware::ToolCallDecision::Block(msg) => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
let error_output = serde_json::json!({ "error": msg });
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true));
continue;
}
middleware::ToolCallDecision::ReplaceInput(new_input) => {
// Execute with replaced input (with timeout)
let tool_result = match tokio::time::timeout(
std::time::Duration::from_secs(30),
self.execute_tool(&name, new_input, &tool_context),
).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
Err(_) => {
tracing::warn!("[AgentLoop] Tool '{}' (replaced input) timed out after 30s", name);
serde_json::json!({ "error": format!("工具 '{}' 执行超时30秒请重试", name) })
}
};
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), tool_result, false));
continue;
}
middleware::ToolCallDecision::AbortLoop(reason) => {
tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason);
let msg = format!("{}\n已自动终止", reason);
self.memory.append_message(&session_id, &Message::assistant(&msg)).await?;
abort_result = Some(AgentLoopResult {
response: msg,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
});
}
}
}
let tool_result = match tokio::time::timeout(
std::time::Duration::from_secs(30),
self.execute_tool(&name, input, &tool_context),
).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
Err(_) => {
tracing::warn!("[AgentLoop] Tool '{}' timed out after 30s", name);
serde_json::json!({ "error": format!("工具 '{}' 执行超时30秒请重试", name) })
}
let mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
user_input: input.to_string(),
system_prompt: enhanced_prompt.clone(),
messages: messages.clone(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
match self.middleware_chain.run_before_tool_call(&mw_ctx, &name, &input).await? {
middleware::ToolCallDecision::Allow => {
plans.push(ToolPlan { idx, id, name, input });
}
middleware::ToolCallDecision::Block(msg) => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
messages.push(Message::tool_result(&id, zclaw_types::ToolId::new(&name), serde_json::json!({ "error": msg }), true));
}
middleware::ToolCallDecision::ReplaceInput(new_input) => {
plans.push(ToolPlan { idx, id, name, input: new_input });
}
middleware::ToolCallDecision::AbortLoop(reason) => {
tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason);
let msg = format!("{}\n已自动终止", reason);
self.memory.append_message(&session_id, &Message::assistant(&msg)).await?;
abort_result = Some(AgentLoopResult {
response: msg,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
});
}
}
}
// Check if this is a clarification response — terminate loop immediately
// so the LLM waits for user input instead of continuing to generate.
if name == "ask_clarification"
&& tool_result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed")
{
tracing::info!("[AgentLoop] Clarification requested, terminating loop");
let question = tool_result.get("question")
.and_then(|v| v.as_str())
.unwrap_or("需要更多信息")
.to_string();
messages.push(Message::tool_result(
id,
zclaw_types::ToolId::new(&name),
tool_result,
false,
));
self.memory.append_message(&session_id, &Message::assistant(&question)).await?;
clarification_result = Some(AgentLoopResult {
response: question,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
// Phase 2: Execute tools (parallel for ReadOnly, serial for others)
if abort_result.is_none() && !plans.is_empty() {
let (parallel_plans, sequential_plans): (Vec<_>, Vec<_>) = plans.iter()
.partition(|p| {
self.tools.get(&p.name)
.map(|t| t.concurrency())
.unwrap_or(ToolConcurrency::Exclusive) == ToolConcurrency::ReadOnly
});
break;
let mut results: std::collections::HashMap<usize, (String, String, serde_json::Value)> = std::collections::HashMap::new();
// Execute parallel (ReadOnly) tools with JoinSet (max 3 concurrent)
if !parallel_plans.is_empty() {
let semaphore = Arc::new(tokio::sync::Semaphore::new(3));
let mut join_set = tokio::task::JoinSet::new();
for plan in &parallel_plans {
let tool = self.tools.get(&plan.name).unwrap();
let ctx = tool_context.clone();
let input = plan.input.clone();
let idx = plan.idx;
let id = plan.id.clone();
let name = plan.name.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_set.spawn(async move {
let result = tokio::time::timeout(
std::time::Duration::from_secs(30),
tool.execute(input, &ctx)
).await;
drop(permit);
(idx, id, name, result)
});
}
while let Some(res) = join_set.join_next().await {
match res {
Ok((idx, id, name, Ok(Ok(value)))) => {
results.insert(idx, (id, name, value));
}
Ok((idx, id, name, Ok(Err(e)))) => {
results.insert(idx, (id, name, serde_json::json!({ "error": e.to_string() })));
}
Ok((idx, id, name, Err(_))) => {
tracing::warn!("[AgentLoop] Tool '{}' timed out after 30s (parallel)", name);
results.insert(idx, (id, name.clone(), serde_json::json!({ "error": format!("工具 '{}' 执行超时30秒请重试", name) })));
}
Err(e) => {
tracing::warn!("[AgentLoop] JoinError in parallel tool execution: {}", e);
}
}
}
}
// Add tool result to messages
messages.push(Message::tool_result(
id,
zclaw_types::ToolId::new(&name),
tool_result,
false, // is_error - we include errors in the result itself
));
// Execute sequential (Exclusive/Interactive) tools
for plan in &sequential_plans {
let tool_result = match tokio::time::timeout(
std::time::Duration::from_secs(30),
self.execute_tool(&plan.name, plan.input.clone(), &tool_context),
).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
Err(_) => {
tracing::warn!("[AgentLoop] Tool '{}' timed out after 30s", plan.name);
serde_json::json!({ "error": format!("工具 '{}' 执行超时30秒请重试", plan.name) })
}
};
// Check if this is a clarification response
if plan.name == "ask_clarification"
&& tool_result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed")
{
tracing::info!("[AgentLoop] Clarification requested, terminating loop");
let question = tool_result.get("question")
.and_then(|v| v.as_str())
.unwrap_or("需要更多信息")
.to_string();
results.insert(plan.idx, (plan.id.clone(), plan.name.clone(), tool_result));
self.memory.append_message(&session_id, &Message::assistant(&question)).await?;
clarification_result = Some(AgentLoopResult {
response: question,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
});
break;
}
results.insert(plan.idx, (plan.id.clone(), plan.name.clone(), tool_result));
}
// Push results in original tool_call order
let mut sorted_indices: Vec<usize> = results.keys().copied().collect();
sorted_indices.sort();
for idx in sorted_indices {
let (id, name, result) = results.remove(&idx).unwrap();
messages.push(Message::tool_result(&id, zclaw_types::ToolId::new(&name), result, false));
}
}
// Continue the loop - LLM will process tool results and generate final response

View File

@@ -1,21 +1,49 @@
//! Compaction middleware — wraps the existing compaction module.
//!
//! Supports debounce (cooldown + min-round checks), async LLM compression
//! with cached fallback, and iterative summaries that carry forward key info.
use async_trait::async_trait;
use zclaw_types::Result;
use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision};
use crate::compaction::{self, CompactionConfig};
use crate::growth::GrowthIntegration;
use crate::driver::LlmDriver;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
use zclaw_types::{Message, Result};
use crate::compaction::{self, CompactionConfig};
use crate::driver::LlmDriver;
use crate::growth::GrowthIntegration;
use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision};
/// Minimum seconds between consecutive compactions.
const COMPACTION_COOLDOWN_SECS: u64 = 30;
/// Minimum message pairs (user+assistant) since last compaction before triggering again.
const COMPACTION_MIN_ROUNDS: u64 = 3;
fn now_millis() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
/// Shared compaction debounce state (lock-free).
struct CompactionState {
last_compaction_ms: AtomicU64,
last_compaction_msg_count: AtomicU64,
}
/// Cached result from a previous async LLM compaction.
struct AsyncCompactionCache {
last_result: RwLock<Option<Vec<Message>>>,
}
/// Middleware that compresses conversation history when it exceeds a token threshold.
pub struct CompactionMiddleware {
threshold: usize,
config: CompactionConfig,
/// Optional LLM driver for async compaction (LLM summarisation, memory flush).
driver: Option<Arc<dyn LlmDriver>>,
/// Optional growth integration for memory flushing during compaction.
growth: Option<GrowthIntegration>,
state: Arc<CompactionState>,
cache: Arc<AsyncCompactionCache>,
}
impl CompactionMiddleware {
@@ -25,7 +53,39 @@ impl CompactionMiddleware {
driver: Option<Arc<dyn LlmDriver>>,
growth: Option<GrowthIntegration>,
) -> Self {
Self { threshold, config, driver, growth }
Self {
threshold,
config,
driver,
growth,
state: Arc::new(CompactionState {
last_compaction_ms: AtomicU64::new(0),
last_compaction_msg_count: AtomicU64::new(0),
}),
cache: Arc::new(AsyncCompactionCache {
last_result: RwLock::new(None),
}),
}
}
fn should_compact(&self, msg_count: u64) -> bool {
let last_ms = self.state.last_compaction_ms.load(Ordering::Relaxed);
let last_count = self.state.last_compaction_msg_count.load(Ordering::Relaxed);
if now_millis().saturating_sub(last_ms) < COMPACTION_COOLDOWN_SECS * 1000 {
return false;
}
if msg_count.saturating_sub(last_count) < COMPACTION_MIN_ROUNDS * 2 {
return false;
}
true
}
fn record_compaction(&self, msg_count: u64) {
self.state.last_compaction_ms.store(now_millis(), Ordering::Relaxed);
self.state.last_compaction_msg_count.store(msg_count, Ordering::Relaxed);
}
}
@@ -39,6 +99,29 @@ impl AgentMiddleware for CompactionMiddleware {
return Ok(MiddlewareDecision::Continue);
}
// Step 1: Prune old tool outputs (cheap, no LLM needed)
let pruned = compaction::prune_tool_outputs(&mut ctx.messages);
if pruned > 0 {
tracing::info!("[CompactionMiddleware] Pruned {} old tool outputs", pruned);
}
// Step 2: Re-estimate tokens after pruning
let tokens = compaction::estimate_messages_tokens_calibrated(&ctx.messages);
if tokens < self.threshold {
return Ok(MiddlewareDecision::Continue);
}
// Step 3: Debounce check
if !self.should_compact(ctx.messages.len() as u64) {
// Still over threshold but within cooldown — use cached result if available
if let Some(cached) = self.cache.last_result.read().await.clone() {
tracing::debug!("[CompactionMiddleware] Cooldown active, using cached compaction result");
ctx.messages = cached;
}
return Ok(MiddlewareDecision::Continue);
}
// Step 4: Execute compaction
let needs_async = self.config.use_llm || self.config.memory_flush_enabled;
if needs_async {
let outcome = compaction::maybe_compact_with_config(
@@ -56,6 +139,14 @@ impl AgentMiddleware for CompactionMiddleware {
ctx.messages = compaction::maybe_compact(ctx.messages.clone(), self.threshold);
}
self.record_compaction(ctx.messages.len() as u64);
// Cache result for cooldown fallback
{
let mut cache = self.cache.last_result.write().await;
*cache = Some(ctx.messages.clone());
}
Ok(MiddlewareDecision::Continue)
}
}

View File

@@ -13,7 +13,7 @@ use serde_json::Value;
use zclaw_types::Result;
use crate::driver::ContentBlock;
use crate::middleware::{AgentMiddleware, MiddlewareContext, ToolCallDecision};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU32, Ordering};
/// Middleware that intercepts tool call errors and formats recovery messages.
///
@@ -24,7 +24,7 @@ pub struct ToolErrorMiddleware {
/// Maximum consecutive failures before aborting the loop.
max_consecutive_failures: u32,
/// Tracks consecutive tool failures.
consecutive_failures: Mutex<u32>,
consecutive_failures: AtomicU32,
}
impl ToolErrorMiddleware {
@@ -32,7 +32,7 @@ impl ToolErrorMiddleware {
Self {
max_error_length: 500,
max_consecutive_failures: 3,
consecutive_failures: Mutex::new(0),
consecutive_failures: AtomicU32::new(0),
}
}
@@ -80,14 +80,14 @@ impl AgentMiddleware for ToolErrorMiddleware {
}
// Check consecutive failure count — abort if too many failures
let failures = self.consecutive_failures.lock().unwrap_or_else(|e| e.into_inner());
if *failures >= self.max_consecutive_failures {
let failures = self.consecutive_failures.load(Ordering::SeqCst);
if failures >= self.max_consecutive_failures {
tracing::warn!(
"[ToolErrorMiddleware] Aborting loop: {} consecutive tool failures",
*failures
failures
);
return Ok(ToolCallDecision::AbortLoop(
format!("连续 {} 次工具调用失败,已自动终止以避免无限重试", *failures)
format!("连续 {} 次工具调用失败,已自动终止以避免无限重试", failures)
));
}
@@ -100,11 +100,9 @@ impl AgentMiddleware for ToolErrorMiddleware {
tool_name: &str,
result: &Value,
) -> Result<()> {
let mut failures = self.consecutive_failures.lock().unwrap_or_else(|e| e.into_inner());
// Check if the tool result indicates an error.
if let Some(error) = result.get("error") {
*failures += 1;
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
let error_msg = match error {
Value::String(s) => s.clone(),
other => other.to_string(),
@@ -118,7 +116,7 @@ impl AgentMiddleware for ToolErrorMiddleware {
tracing::warn!(
"[ToolErrorMiddleware] Tool '{}' failed ({}/{} consecutive): {}",
tool_name, *failures, self.max_consecutive_failures, truncated
tool_name, failures, self.max_consecutive_failures, truncated
);
let guided_message = self.format_tool_error(tool_name, &truncated);
@@ -127,7 +125,7 @@ impl AgentMiddleware for ToolErrorMiddleware {
});
} else {
// Success — reset consecutive failure counter
*failures = 0;
self.consecutive_failures.store(0, Ordering::SeqCst);
}
Ok(())

View File

@@ -24,6 +24,10 @@ pub enum StreamChunk {
input_tokens: u32,
output_tokens: u32,
stop_reason: String,
#[serde(default)]
cache_creation_input_tokens: Option<u32>,
#[serde(default)]
cache_read_input_tokens: Option<u32>,
},
/// Error occurred
Error { message: String },

View File

@@ -55,6 +55,8 @@ impl MockLlmDriver {
input_tokens: 10,
output_tokens: text.len() as u32 / 4,
stop_reason: StopReason::EndTurn,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
});
self
}
@@ -74,6 +76,8 @@ impl MockLlmDriver {
input_tokens: 10,
output_tokens: 20,
stop_reason: StopReason::ToolUse,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
});
self
}
@@ -86,6 +90,8 @@ impl MockLlmDriver {
input_tokens: 0,
output_tokens: 0,
stop_reason: StopReason::Error,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
});
self
}
@@ -142,6 +148,8 @@ impl MockLlmDriver {
input_tokens: 0,
output_tokens: 0,
stop_reason: StopReason::EndTurn,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
})
}
}
@@ -190,6 +198,8 @@ impl LlmDriver for MockLlmDriver {
input_tokens: 10,
output_tokens: 2,
stop_reason: "end_turn".to_string(),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
]
})

View File

@@ -11,6 +11,17 @@ use crate::driver::ToolDefinition;
use crate::loop_runner::LoopEvent;
use crate::tool::builtin::PathValidator;
/// Tool concurrency safety level
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ToolConcurrency {
/// Read-only operations, always safe to parallelize (file_read, web_fetch, etc.)
ReadOnly,
/// Exclusive operations, must be serial (file_write, shell_exec, etc.)
Exclusive,
/// Interactive operations, never parallelize (ask_clarification, etc.)
Interactive,
}
/// Tool trait for implementing agent tools
#[async_trait]
pub trait Tool: Send + Sync {
@@ -25,6 +36,11 @@ pub trait Tool: Send + Sync {
/// Execute the tool
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value>;
/// Tool concurrency safety level. Default: ReadOnly.
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::ReadOnly
}
}
/// Skill executor trait for runtime skill execution

View File

@@ -9,7 +9,7 @@ use async_trait::async_trait;
use serde_json::{json, Value};
use zclaw_types::{Result, ZclawError};
use crate::tool::{Tool, ToolContext};
use crate::tool::{Tool, ToolContext, ToolConcurrency};
/// Clarification type — categorizes the reason for asking.
#[derive(Debug, Clone, PartialEq)]
@@ -96,6 +96,10 @@ impl Tool for AskClarificationTool {
})
}
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::Interactive
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<Value> {
let question = input["question"].as_str()
.ok_or_else(|| ZclawError::InvalidInput("Missing 'question' parameter".into()))?;

View File

@@ -4,7 +4,7 @@ use async_trait::async_trait;
use serde_json::{json, Value};
use zclaw_types::{Result, ZclawError};
use crate::tool::{Tool, ToolContext};
use crate::tool::{Tool, ToolContext, ToolConcurrency};
pub struct ExecuteSkillTool;
@@ -42,6 +42,10 @@ impl Tool for ExecuteSkillTool {
})
}
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::Exclusive
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
let skill_id = input["skill_id"].as_str()
.ok_or_else(|| ZclawError::InvalidInput("Missing 'skill_id' parameter".into()))?;

View File

@@ -6,7 +6,7 @@ use zclaw_types::{Result, ZclawError};
use std::fs;
use std::io::Write;
use crate::tool::{Tool, ToolContext};
use crate::tool::{Tool, ToolContext, ToolConcurrency};
use super::path_validator::PathValidator;
pub struct FileWriteTool;
@@ -55,6 +55,10 @@ impl Tool for FileWriteTool {
})
}
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::Exclusive
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
let path = input["path"].as_str()
.ok_or_else(|| ZclawError::InvalidInput("Missing 'path' parameter".into()))?;

View File

@@ -8,7 +8,7 @@ use serde_json::Value;
use std::sync::Arc;
use zclaw_types::Result;
use crate::tool::{Tool, ToolContext};
use crate::tool::{Tool, ToolContext, ToolConcurrency};
/// Wraps an MCP tool adapter into the `Tool` trait.
///
@@ -42,6 +42,10 @@ impl Tool for McpToolWrapper {
self.adapter.input_schema().clone()
}
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::Exclusive
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<Value> {
self.adapter.execute(input).await
}

View File

@@ -8,7 +8,7 @@ use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
use zclaw_types::{Result, ZclawError};
use crate::tool::{Tool, ToolContext};
use crate::tool::{Tool, ToolContext, ToolConcurrency};
/// Parse a command string into program and arguments using proper shell quoting
fn parse_command(command: &str) -> Result<(String, Vec<String>)> {
@@ -175,6 +175,10 @@ impl Tool for ShellExecTool {
})
}
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::Exclusive
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<Value> {
let command = input["command"].as_str()
.ok_or_else(|| ZclawError::InvalidInput("Missing 'command' parameter".into()))?;

View File

@@ -11,7 +11,7 @@ use zclaw_memory::MemoryStore;
use crate::driver::LlmDriver;
use crate::loop_runner::{AgentLoop, LoopEvent};
use crate::tool::{Tool, ToolContext, ToolRegistry};
use crate::tool::{Tool, ToolContext, ToolRegistry, ToolConcurrency};
use crate::tool::builtin::register_builtin_tools;
use std::sync::Arc;
@@ -91,6 +91,10 @@ impl Tool for TaskTool {
})
}
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::Exclusive
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
let description = input["description"].as_str()
.ok_or_else(|| ZclawError::InvalidInput("Missing 'description' parameter".into()))?;

View File

@@ -7,7 +7,7 @@ use async_trait::async_trait;
use serde_json::{json, Value};
use zclaw_types::Result;
use crate::tool::{Tool, ToolContext};
use crate::tool::{Tool, ToolContext, ToolConcurrency};
/// Wrapper that exposes a Hand as a Tool in the agent's tool registry.
///
@@ -78,6 +78,10 @@ impl Tool for HandTool {
self.input_schema.clone()
}
fn concurrency(&self) -> ToolConcurrency {
ToolConcurrency::Exclusive
}
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
// Delegate to the HandExecutor (bridged from HandRegistry via kernel).
// If no hand_executor is available (e.g., standalone runtime without kernel),

View File

@@ -223,6 +223,33 @@ impl Serialize for ZclawError {
/// Result type alias for ZCLAW operations
pub type Result<T> = std::result::Result<T, ZclawError>;
/// LLM 调用错误的细粒度分类,指导重试和恢复策略
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LlmErrorKind {
Auth,
AuthPermanent,
BillingExhausted,
RateLimited,
Overloaded,
ServerError,
Timeout,
ContextOverflow,
ModelNotFound,
Unknown,
}
/// 分类后的 LLM 错误,附带恢复提示
#[derive(Debug, Clone)]
pub struct ClassifiedLlmError {
pub kind: LlmErrorKind,
pub retryable: bool,
pub should_compress: bool,
pub should_rotate_credential: bool,
pub retry_after: Option<std::time::Duration>,
pub message: String,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -133,6 +133,18 @@ skills/ -> SkillRegistry 加载 -> SkillIndexMiddleware@200 注入系统提示
- MCP 限定名 `service_name.tool_name` 避免与内置工具冲突
- 已删除空壳 Hands (04-17): Whiteboard/Slideshow/Speech净减 ~5400 行
### ⚡ 新增工具/技能必须声明 concurrency 级别
`Tool` trait 的 `concurrency()` 方法决定并行执行策略 (04-24 Hermes Phase 2A):
| 级别 | 含义 | 适用场景 |
|------|------|---------|
| `ReadOnly` (默认) | 只读,始终可并行 | file_read, web_search, calculator |
| `Exclusive` | 有副作用,必须串行 | file_write, shell_exec, send_message, execute_skill, task |
| `Interactive` | 需要用户交互,永不并行 | ask_clarification |
**新增工具时**:在 `impl Tool for YourTool` 中覆盖 `concurrency()` 方法。默认 `ReadOnly`,如果有写操作/副作用必须返回 `ToolConcurrency::Exclusive`。未正确声明会导致并行执行时产生竞态条件。
## 4. 活跃问题 + 陷阱
### 活跃
@@ -155,6 +167,7 @@ skills/ -> SkillRegistry 加载 -> SkillIndexMiddleware@200 注入系统提示
| 日期 | 变更 | 关联 |
|------|------|------|
| 2026-04-24 | Hermes Phase 2A: ToolConcurrency 枚举 + 并行执行 + concurrency() 声明要求 | commit 9060935 |
| 2026-04-22 | Wiki 5-section 重构: 281->~195 行,语义路由细节引用 [[butler]] | wiki/ |
| 2026-04-22 | Researcher 搜索修复: schema 扁平化 + 空参数回退 + 排版修复 | commit 5816f56+81005c3 |
| 2026-04-17 | 空壳 Hand 清理: Whiteboard/Slideshow/Speech 删除,净减 ~5400 行 | Phase 5 清理 |

View File

@@ -1,6 +1,6 @@
---
title: ZCLAW 项目知识库
updated: 2026-04-22
updated: 2026-04-24
status: active
---
@@ -8,29 +8,29 @@ status: active
> 面向中文用户的 AI Agent 桌面客户端。管家模式 + 多模型 + 7 自主能力 + 75 技能。
> **使用方式**: 找到你要处理的模块,读对应页面,直接开始工作。
> **数据来源**: 2026-04-22 代码全量扫描验证,非文档推测。
> **数据来源**: 2026-04-23 代码全量扫描验证,非文档推测。
## 项目画像
| 维度 | 值 |
|------|-----|
| 定位 | AI Agent 桌面客户端 (Tauri 2.x) |
| 技术栈 | Rust 10 crates + src-tauri (~102K行, 357 .rs) + React 19 + TypeScript + PostgreSQL |
| 技术栈 | Rust 10 crates + src-tauri (~148K行, 384 .rs) + React 19 + TypeScript + PostgreSQL |
| 阶段 | 发布前稳定化,功能冻结中 |
## 关键数字2026-04-22 代码验证)
## 关键数字2026-04-23 代码验证)
| 指标 | 值 |
|------|-----|
| Rust Crates | 10 + src-tauri |
| Rust 代码 | 101,967 行 (357 .rs文件) |
| Rust 测试 | 987 定义 / 797 通过 |
| Tauri 命令 | 190 定义 / 97 @reserved / 104 invoke |
| Rust 代码 | 148,185 行 (384 .rs文件) |
| Rust 测试 | 997 定义 (619 #[test] + 378 #[tokio::test]) |
| Tauri 命令 | 193 定义 / 104 invoke |
| SaaS API | 137 .route() / 16 模块 / 38 SQL 迁移 / 42 表 |
| 中间件 | 14 层 runtime + 10 层 SaaS HTTP |
| SKILL / HAND | 75 技能目录 / 7 注册 Hand (6 TOML + _reminder) |
| Pipeline | 18 YAML 模板 (8 目录) |
| 前端 | 25 Store / 102 组件 / 75 lib / 17 Admin 页面 |
| 前端 | 25 Store / 103 组件 / 78 lib / 17 Admin 页面 |
| Intelligence | 16 .rs 文件 |
| 质量指标 | 0 cargo warnings / 2 TODO/FIXME / 0 dead_code |
@@ -38,13 +38,13 @@ status: active
| 类别 | 功能 | 入口 | Wiki |
|------|------|------|------|
| 对话 | 发消息、流式响应、多模型切换 | 聊天面板 | [[chat]] |
| 分身 | 创建/切换/配置 Agent | 侧边栏 Agent 列表 | [[chat]] |
| 对话 | 发消息、流式响应、多模型切换、LLM 动态建议 | 聊天面板 | [[chat]] |
| 分身 | 创建/切换/配置 Agent、跨会话身份记忆 (soul.md) | 侧边栏 Agent 列表 | [[chat]] |
| 自主 | 触发 Browser/Collector/Twitter 等 | 自动化面板 | [[hands-skills]] |
| 记忆 | 搜索历史、自动注入上下文 | 设置 > 语义记忆 | [[memory]] |
| 记忆 | 搜索历史、自动注入上下文、身份信号提取 | 设置 > 语义记忆 | [[memory]] |
| 配置 | 模型/API/工作区/安全存储 | 设置面板 (19 页) | [[development]] |
| SaaS | 登录注册、订阅计费、Admin 管理 | SaaS 平台 / Admin 后台 | [[saas]] |
| 管家 | 痛点积累、行业配置、简洁/专业模式 | 聊天面板 (默认模式) | [[butler]] |
| 管家 | 痛点积累、行业配置、简洁/专业模式、跨会话身份、动态建议 | 聊天面板 (默认模式) | [[butler]] |
| Pipeline | YAML 模板选择、配置、DAG 执行 | 工作流面板 | [[pipeline]] |
| 安全 | JWT 认证、TOTP 2FA、操作审计 | 设置 > 安全存储 | [[security]] |
| 数据 | PostgreSQL (42表) + SQLite/FTS5 (本地记忆) | — | [[data-model]] |
@@ -97,5 +97,7 @@ ZCLAW
| Agent 创建失败 | [[chat]] | [[saas]] | 权限或持久化问题 |
| Pipeline 执行卡住 | [[pipeline]] | [[middleware]] | DAG 循环 / 依赖缺失 |
| Admin 页面 403 | [[saas]] | [[security]] | JWT 过期 / admin_guard 拦截 |
| Agent 名字不记住 | [[butler]] | [[memory]] | soul.md 写入失败 / identity signal 未提取 |
| 建议不个性化 | [[chat]] | [[butler]] | 4路上下文超时 / ExperienceExtractor 未初始化 |
> 数字真相源: `docs/TRUTH.md` — 如有冲突以代码实际为准

View File

@@ -9,6 +9,15 @@ tags: [log, history]
> Append-only 操作记录。格式: `## [日期] 类型 | 描述`
## [2026-04-24] perf | Hermes 高价值设计实施 Phase 1-4
- **Phase 1**: Anthropic prompt caching — cache_control ephemeral + cache token tracking (CompletionResponse + StreamChunk)
- **Phase 2A**: 并行工具执行 — ToolConcurrency 枚举 (ReadOnly/Exclusive/Interactive) + JoinSet + Semaphore(3) + AtomicU32
- **Phase 2B**: 工具输出修剪 — prune_tool_outputs() (2000→500 chars) + 集成到 CompactionMiddleware
- **Phase 3**: 错误分类+智能重试 — LlmErrorKind + ClassifiedLlmError + RetryDriver (jittered backoff) + CONTEXT_OVERFLOW recovery
- **Phase 4**: 异步压缩+迭代摘要 — 30s 防抖 + cached fallback + previous_summary 迭代累积
- **新增文件**: error_classifier.rs, retry_driver.rs
- **验证**: 997 workspace tests PASS
## [2026-04-23] perf | 回复效率+建议生成并行化优化 (三部分)
- **perf(src-tauri)**: identity prompt 缓存 (`LazyLock<RwLock<HashMap>>`) + `pre_conversation_hook` 并行化 (`tokio::join!`)
- **perf(runtime)**: middleware `before_completion` 分波并行 — `parallel_safe()` trait + wave detection + `tokio::spawn`5 层 safe 中间件可并行