Compare commits
3 Commits
6d6673bf5b
...
b60b96225d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b60b96225d | ||
|
|
06e93a21af | ||
|
|
9060935401 |
@@ -117,7 +117,9 @@ impl Kernel {
|
||||
}
|
||||
}
|
||||
|
||||
use zclaw_runtime::{AgentLoop, tool::builtin::PathValidator};
|
||||
use std::sync::Arc;
|
||||
use zclaw_runtime::{AgentLoop, LlmDriver, tool::builtin::PathValidator};
|
||||
use zclaw_runtime::driver::{RetryDriver, RetryConfig};
|
||||
|
||||
use super::Kernel;
|
||||
use super::super::MessageResponse;
|
||||
@@ -161,9 +163,12 @@ impl Kernel {
|
||||
let subagent_enabled = chat_mode.as_ref().and_then(|m| m.subagent_enabled).unwrap_or(false);
|
||||
let tools = self.create_tool_registry(subagent_enabled);
|
||||
self.skill_executor.set_tool_registry(tools.clone());
|
||||
let driver: Arc<dyn LlmDriver> = Arc::new(
|
||||
RetryDriver::new(self.driver.clone(), RetryConfig::default())
|
||||
);
|
||||
let mut loop_runner = AgentLoop::new(
|
||||
*agent_id,
|
||||
self.driver.clone(),
|
||||
driver,
|
||||
tools,
|
||||
self.memory.clone(),
|
||||
)
|
||||
@@ -275,9 +280,12 @@ impl Kernel {
|
||||
let subagent_enabled = chat_mode.as_ref().and_then(|m| m.subagent_enabled).unwrap_or(false);
|
||||
let tools = self.create_tool_registry(subagent_enabled);
|
||||
self.skill_executor.set_tool_registry(tools.clone());
|
||||
let driver: Arc<dyn LlmDriver> = Arc::new(
|
||||
RetryDriver::new(self.driver.clone(), RetryConfig::default())
|
||||
);
|
||||
let mut loop_runner = AgentLoop::new(
|
||||
*agent_id,
|
||||
self.driver.clone(),
|
||||
driver,
|
||||
tools,
|
||||
self.memory.clone(),
|
||||
)
|
||||
|
||||
@@ -31,6 +31,8 @@ async fn seam_hand_tool_routing() {
|
||||
input_tokens: 10,
|
||||
output_tokens: 20,
|
||||
stop_reason: "tool_use".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
])
|
||||
// Second stream: final text after tool executes
|
||||
@@ -40,6 +42,8 @@ async fn seam_hand_tool_routing() {
|
||||
input_tokens: 10,
|
||||
output_tokens: 5,
|
||||
stop_reason: "end_turn".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
]);
|
||||
|
||||
@@ -105,6 +109,8 @@ async fn seam_hand_execution_callback() {
|
||||
input_tokens: 10,
|
||||
output_tokens: 5,
|
||||
stop_reason: "tool_use".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
])
|
||||
.with_stream_chunks(vec![
|
||||
@@ -113,6 +119,8 @@ async fn seam_hand_execution_callback() {
|
||||
input_tokens: 5,
|
||||
output_tokens: 1,
|
||||
stop_reason: "end_turn".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
]);
|
||||
|
||||
@@ -173,6 +181,8 @@ async fn seam_generic_tool_routing() {
|
||||
input_tokens: 10,
|
||||
output_tokens: 5,
|
||||
stop_reason: "tool_use".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
])
|
||||
.with_stream_chunks(vec![
|
||||
@@ -181,6 +191,8 @@ async fn seam_generic_tool_routing() {
|
||||
input_tokens: 5,
|
||||
output_tokens: 3,
|
||||
stop_reason: "end_turn".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
]);
|
||||
|
||||
|
||||
@@ -27,6 +27,8 @@ async fn smoke_hands_full_lifecycle() {
|
||||
input_tokens: 15,
|
||||
output_tokens: 10,
|
||||
stop_reason: "tool_use".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
])
|
||||
// After hand_quiz returns, LLM generates final response
|
||||
@@ -36,6 +38,8 @@ async fn smoke_hands_full_lifecycle() {
|
||||
input_tokens: 20,
|
||||
output_tokens: 5,
|
||||
stop_reason: "end_turn".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
]);
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use serde_json::Value;
|
||||
use zclaw_types::{AgentId, Message, SessionId};
|
||||
|
||||
use crate::driver::{CompletionRequest, ContentBlock, LlmDriver};
|
||||
@@ -136,7 +137,7 @@ pub fn update_calibration(estimated: usize, actual: u32) {
|
||||
}
|
||||
|
||||
/// Estimate total tokens for messages with calibration applied.
|
||||
fn estimate_messages_tokens_calibrated(messages: &[Message]) -> usize {
|
||||
pub fn estimate_messages_tokens_calibrated(messages: &[Message]) -> usize {
|
||||
let raw = estimate_messages_tokens(messages);
|
||||
let factor = get_calibration_factor();
|
||||
if (factor - 1.0).abs() < f64::EPSILON {
|
||||
@@ -178,7 +179,7 @@ pub fn compact_messages(messages: Vec<Message>, keep_recent: usize) -> (Vec<Mess
|
||||
let old_messages = &messages[..split_index];
|
||||
let recent_messages = &messages[split_index..];
|
||||
|
||||
let summary = generate_summary(old_messages);
|
||||
let summary = generate_summary(old_messages, None);
|
||||
let removed_count = old_messages.len();
|
||||
|
||||
let mut compacted = Vec::with_capacity(1 + recent_messages.len());
|
||||
@@ -188,6 +189,38 @@ pub fn compact_messages(messages: Vec<Message>, keep_recent: usize) -> (Vec<Mess
|
||||
(compacted, removed_count)
|
||||
}
|
||||
|
||||
/// Prune old tool outputs to reduce token consumption. Runs before compaction.
|
||||
/// Only prunes ToolResult messages older than PRUNE_AGE_THRESHOLD messages.
|
||||
const PRUNE_AGE_THRESHOLD: usize = 8;
|
||||
const PRUNE_MAX_CHARS: usize = 2000;
|
||||
const PRUNE_KEEP_HEAD_CHARS: usize = 500;
|
||||
|
||||
pub fn prune_tool_outputs(messages: &mut [Message]) -> usize {
|
||||
let total = messages.len();
|
||||
let mut pruned_count = 0;
|
||||
|
||||
for i in 0..total.saturating_sub(PRUNE_AGE_THRESHOLD) {
|
||||
if let Message::ToolResult { output, is_error, .. } = &mut messages[i] {
|
||||
if *is_error { continue; }
|
||||
|
||||
let text = match output {
|
||||
Value::String(ref s) => s.clone(),
|
||||
ref other => other.to_string(),
|
||||
};
|
||||
if text.len() <= PRUNE_MAX_CHARS { continue; }
|
||||
|
||||
let end = text.floor_char_boundary(PRUNE_KEEP_HEAD_CHARS.min(text.len()));
|
||||
*output = serde_json::json!({
|
||||
"_pruned": true,
|
||||
"_original_chars": text.len(),
|
||||
"head": &text[..end],
|
||||
});
|
||||
pruned_count += 1;
|
||||
}
|
||||
}
|
||||
pruned_count
|
||||
}
|
||||
|
||||
/// Check if compaction should be triggered and perform it if needed.
|
||||
///
|
||||
/// Returns the (possibly compacted) message list.
|
||||
@@ -315,6 +348,18 @@ pub async fn maybe_compact_with_config(
|
||||
.iter()
|
||||
.take_while(|m| matches!(m, Message::System { .. }))
|
||||
.count();
|
||||
|
||||
// Extract previous summary from leading system messages for iterative summarization
|
||||
let previous_summary = messages.iter()
|
||||
.take(leading_system_count)
|
||||
.filter_map(|m| match m {
|
||||
Message::System { content } if content.starts_with("[以下是之前对话的摘要]") => {
|
||||
Some(content.clone())
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.next();
|
||||
|
||||
let keep_from_end = DEFAULT_KEEP_RECENT
|
||||
.min(messages.len().saturating_sub(leading_system_count));
|
||||
let split_index = messages.len().saturating_sub(keep_from_end);
|
||||
@@ -333,14 +378,16 @@ pub async fn maybe_compact_with_config(
|
||||
let recent_messages = &messages[split_index..];
|
||||
let removed_count = old_messages.len();
|
||||
|
||||
// Step 3: Generate summary (LLM or rule-based)
|
||||
// Step 3: Generate summary (LLM or rule-based), with iterative context
|
||||
let prev_ref = previous_summary.as_deref();
|
||||
let summary = if config.use_llm {
|
||||
if let Some(driver) = driver {
|
||||
match generate_llm_summary(driver, old_messages, config.summary_max_tokens).await {
|
||||
match generate_llm_summary(driver, old_messages, prev_ref, config.summary_max_tokens).await {
|
||||
Ok(llm_summary) => {
|
||||
tracing::info!(
|
||||
"[Compaction] Generated LLM summary ({} chars)",
|
||||
llm_summary.len()
|
||||
"[Compaction] Generated LLM summary ({} chars, iterative={})",
|
||||
llm_summary.len(),
|
||||
previous_summary.is_some()
|
||||
);
|
||||
llm_summary
|
||||
}
|
||||
@@ -350,7 +397,7 @@ pub async fn maybe_compact_with_config(
|
||||
"[Compaction] LLM summary failed: {}, falling back to rules",
|
||||
e
|
||||
);
|
||||
generate_summary(old_messages)
|
||||
generate_summary(old_messages, prev_ref)
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"[Compaction] LLM summary failed: {}, returning original messages",
|
||||
@@ -369,10 +416,10 @@ pub async fn maybe_compact_with_config(
|
||||
tracing::warn!(
|
||||
"[Compaction] LLM compaction requested but no driver available, using rules"
|
||||
);
|
||||
generate_summary(old_messages)
|
||||
generate_summary(old_messages, prev_ref)
|
||||
}
|
||||
} else {
|
||||
generate_summary(old_messages)
|
||||
generate_summary(old_messages, prev_ref)
|
||||
};
|
||||
|
||||
let used_llm = config.use_llm && driver.is_some();
|
||||
@@ -398,9 +445,11 @@ pub async fn maybe_compact_with_config(
|
||||
}
|
||||
|
||||
/// Generate a summary using an LLM driver.
|
||||
/// If `previous_summary` is provided, builds on it iteratively.
|
||||
async fn generate_llm_summary(
|
||||
driver: &Arc<dyn LlmDriver>,
|
||||
messages: &[Message],
|
||||
previous_summary: Option<&str>,
|
||||
max_tokens: u32,
|
||||
) -> Result<String, String> {
|
||||
let mut conversation_text = String::new();
|
||||
@@ -437,11 +486,21 @@ async fn generate_llm_summary(
|
||||
conversation_text.push_str("\n...(对话已截断)");
|
||||
}
|
||||
|
||||
let prompt = format!(
|
||||
"请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\
|
||||
输出格式为段落式摘要,不超过200字。\n\n{}",
|
||||
conversation_text
|
||||
);
|
||||
let prompt = match previous_summary {
|
||||
Some(prev) => format!(
|
||||
"你是一个对话摘要助手。\n\n\
|
||||
## 上一轮摘要\n{}\n\n\
|
||||
## 新增对话内容\n{}\n\n\
|
||||
请在上一轮摘要的基础上更新,保留所有关键决策、用户偏好和文件操作。\
|
||||
输出200字以内的中文摘要。",
|
||||
prev, conversation_text
|
||||
),
|
||||
None => format!(
|
||||
"请用简洁的中文总结以下对话的关键信息。保留重要的讨论主题、决策、结论和待办事项。\
|
||||
输出格式为段落式摘要,不超过200字。\n\n{}",
|
||||
conversation_text
|
||||
),
|
||||
};
|
||||
|
||||
let request = CompletionRequest {
|
||||
model: String::new(),
|
||||
@@ -484,13 +543,22 @@ async fn generate_llm_summary(
|
||||
}
|
||||
|
||||
/// Generate a rule-based summary of old messages.
|
||||
fn generate_summary(messages: &[Message]) -> String {
|
||||
/// If `previous_summary` is provided, carries forward key info.
|
||||
fn generate_summary(messages: &[Message], previous_summary: Option<&str>) -> String {
|
||||
if messages.is_empty() {
|
||||
return "[对话开始]".to_string();
|
||||
}
|
||||
|
||||
let mut sections: Vec<String> = vec!["[以下是之前对话的摘要]".to_string()];
|
||||
|
||||
// Carry forward previous summary if available
|
||||
if let Some(prev) = previous_summary {
|
||||
// Strip the header line from previous summary for cleaner nesting
|
||||
let prev_body = prev.strip_prefix("[以下是之前对话的摘要]\n")
|
||||
.unwrap_or(prev);
|
||||
sections.push(format!("[上轮摘要保留]: {}", truncate(prev_body, 200)));
|
||||
}
|
||||
|
||||
let mut user_count = 0;
|
||||
let mut assistant_count = 0;
|
||||
let mut topics: Vec<String> = Vec::new();
|
||||
@@ -696,8 +764,21 @@ mod tests {
|
||||
Message::user("How does ownership work?"),
|
||||
Message::assistant("Ownership is Rust's memory management system"),
|
||||
];
|
||||
let summary = generate_summary(&messages);
|
||||
let summary = generate_summary(&messages, None);
|
||||
assert!(summary.contains("摘要"));
|
||||
assert!(summary.contains("2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_generate_summary_iterative() {
|
||||
let messages = vec![
|
||||
Message::user("What is async/await?"),
|
||||
Message::assistant("Async/await is a concurrency model"),
|
||||
];
|
||||
let prev = "[以下是之前对话的摘要]\n讨论主题: Rust; 所有权\n(已压缩 4 条消息)";
|
||||
let summary = generate_summary(&messages, Some(prev));
|
||||
assert!(summary.contains("摘要"));
|
||||
assert!(summary.contains("上轮摘要保留"));
|
||||
assert!(summary.contains("所有权"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,6 +121,8 @@ impl LlmDriver for AnthropicDriver {
|
||||
let mut byte_stream = response.bytes_stream();
|
||||
let mut current_tool_id: Option<String> = None;
|
||||
let mut tool_input_buffer = String::new();
|
||||
let mut cache_creation_input_tokens: Option<u32> = None;
|
||||
let mut cache_read_input_tokens: Option<u32> = None;
|
||||
|
||||
while let Some(chunk_result) = byte_stream.next().await {
|
||||
let chunk = match chunk_result {
|
||||
@@ -141,6 +143,15 @@ impl LlmDriver for AnthropicDriver {
|
||||
match serde_json::from_str::<AnthropicStreamEvent>(data) {
|
||||
Ok(event) => {
|
||||
match event.event_type.as_str() {
|
||||
"message_start" => {
|
||||
// Capture cache token info from message_start event
|
||||
if let Some(msg) = event.message {
|
||||
if let Some(usage) = msg.usage {
|
||||
cache_creation_input_tokens = usage.cache_creation_input_tokens;
|
||||
cache_read_input_tokens = usage.cache_read_input_tokens;
|
||||
}
|
||||
}
|
||||
}
|
||||
"content_block_delta" => {
|
||||
if let Some(delta) = event.delta {
|
||||
if let Some(text) = delta.text {
|
||||
@@ -186,6 +197,8 @@ impl LlmDriver for AnthropicDriver {
|
||||
input_tokens: msg.usage.as_ref().map(|u| u.input_tokens).unwrap_or(0),
|
||||
output_tokens: msg.usage.as_ref().map(|u| u.output_tokens).unwrap_or(0),
|
||||
stop_reason: msg.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
|
||||
cache_creation_input_tokens,
|
||||
cache_read_input_tokens,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -298,7 +311,15 @@ impl AnthropicDriver {
|
||||
AnthropicRequest {
|
||||
model: request.model.clone(),
|
||||
max_tokens: effective_max,
|
||||
system: request.system.clone(),
|
||||
system: request.system.as_ref().map(|s| {
|
||||
vec![SystemContentBlock {
|
||||
r#type: "text".to_string(),
|
||||
text: s.clone(),
|
||||
cache_control: Some(CacheControl {
|
||||
r#type: "ephemeral".to_string(),
|
||||
}),
|
||||
}]
|
||||
}),
|
||||
messages,
|
||||
tools: if tools.is_empty() { None } else { Some(tools) },
|
||||
temperature: request.temperature,
|
||||
@@ -337,18 +358,35 @@ impl AnthropicDriver {
|
||||
input_tokens: api_response.usage.input_tokens,
|
||||
output_tokens: api_response.usage.output_tokens,
|
||||
stop_reason,
|
||||
cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens,
|
||||
cache_read_input_tokens: api_response.usage.cache_read_input_tokens,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Anthropic API types
|
||||
|
||||
/// Anthropic cache_control 标记
|
||||
#[derive(Serialize, Clone)]
|
||||
struct CacheControl {
|
||||
r#type: String, // "ephemeral"
|
||||
}
|
||||
|
||||
/// Anthropic system prompt 内容块(支持 cache_control)
|
||||
#[derive(Serialize, Clone)]
|
||||
struct SystemContentBlock {
|
||||
r#type: String, // "text"
|
||||
text: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
cache_control: Option<CacheControl>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct AnthropicRequest {
|
||||
model: String,
|
||||
max_tokens: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
system: Option<String>,
|
||||
system: Option<Vec<SystemContentBlock>>,
|
||||
messages: Vec<AnthropicMessage>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
tools: Option<Vec<AnthropicTool>>,
|
||||
@@ -404,6 +442,10 @@ struct AnthropicContentBlock {
|
||||
struct AnthropicUsage {
|
||||
input_tokens: u32,
|
||||
output_tokens: u32,
|
||||
#[serde(default)]
|
||||
cache_creation_input_tokens: Option<u32>,
|
||||
#[serde(default)]
|
||||
cache_read_input_tokens: Option<u32>,
|
||||
}
|
||||
|
||||
// Streaming types
|
||||
@@ -458,4 +500,8 @@ struct AnthropicStreamUsage {
|
||||
input_tokens: u32,
|
||||
#[serde(default)]
|
||||
output_tokens: u32,
|
||||
#[serde(default)]
|
||||
cache_creation_input_tokens: Option<u32>,
|
||||
#[serde(default)]
|
||||
cache_read_input_tokens: Option<u32>,
|
||||
}
|
||||
|
||||
139
crates/zclaw-runtime/src/driver/error_classifier.rs
Normal file
139
crates/zclaw-runtime/src/driver/error_classifier.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
//! LLM 错误分类器。将 HTTP 状态码 + 错误体映射为 LlmErrorKind。
|
||||
|
||||
use std::time::Duration;
|
||||
use zclaw_types::{LlmErrorKind, ClassifiedLlmError};
|
||||
|
||||
/// 分类 LLM 错误
|
||||
pub fn classify_llm_error(
|
||||
provider: &str,
|
||||
status: u16,
|
||||
body: &str,
|
||||
is_timeout: bool,
|
||||
) -> ClassifiedLlmError {
|
||||
let _ = provider; // reserved for per-provider overrides
|
||||
|
||||
if is_timeout {
|
||||
return ClassifiedLlmError {
|
||||
kind: LlmErrorKind::Timeout,
|
||||
retryable: true,
|
||||
should_compress: false,
|
||||
should_rotate_credential: false,
|
||||
retry_after: None,
|
||||
message: "请求超时".to_string(),
|
||||
};
|
||||
}
|
||||
|
||||
match status {
|
||||
401 | 403 => ClassifiedLlmError {
|
||||
kind: LlmErrorKind::Auth,
|
||||
retryable: false,
|
||||
should_compress: false,
|
||||
should_rotate_credential: true,
|
||||
retry_after: None,
|
||||
message: "认证失败,请检查 API Key".to_string(),
|
||||
},
|
||||
402 => {
|
||||
let is_quota_transient = body.contains("retry")
|
||||
|| body.contains("limit")
|
||||
|| body.contains("usage");
|
||||
ClassifiedLlmError {
|
||||
kind: if is_quota_transient { LlmErrorKind::RateLimited } else { LlmErrorKind::BillingExhausted },
|
||||
retryable: is_quota_transient,
|
||||
should_compress: false,
|
||||
should_rotate_credential: !is_quota_transient,
|
||||
retry_after: if is_quota_transient { Some(Duration::from_secs(30)) } else { None },
|
||||
message: if is_quota_transient { "使用限制,稍后重试".to_string() } else { "计费额度已耗尽".to_string() },
|
||||
}
|
||||
}
|
||||
429 => ClassifiedLlmError {
|
||||
kind: LlmErrorKind::RateLimited,
|
||||
retryable: true,
|
||||
should_compress: false,
|
||||
should_rotate_credential: true,
|
||||
retry_after: parse_retry_after(body),
|
||||
message: "速率限制".to_string(),
|
||||
},
|
||||
529 => ClassifiedLlmError {
|
||||
kind: LlmErrorKind::Overloaded,
|
||||
retryable: true,
|
||||
should_compress: false,
|
||||
should_rotate_credential: false,
|
||||
retry_after: Some(Duration::from_secs(5)),
|
||||
message: "提供商过载".to_string(),
|
||||
},
|
||||
500 | 502 => ClassifiedLlmError {
|
||||
kind: LlmErrorKind::ServerError,
|
||||
retryable: true,
|
||||
should_compress: false,
|
||||
should_rotate_credential: false,
|
||||
retry_after: None,
|
||||
message: "服务端错误".to_string(),
|
||||
},
|
||||
503 => ClassifiedLlmError {
|
||||
kind: LlmErrorKind::Overloaded,
|
||||
retryable: true,
|
||||
should_compress: false,
|
||||
should_rotate_credential: false,
|
||||
retry_after: Some(Duration::from_secs(3)),
|
||||
message: "服务暂时不可用".to_string(),
|
||||
},
|
||||
400 => {
|
||||
let is_context_overflow = body.contains("context_length")
|
||||
|| body.contains("max_tokens")
|
||||
|| body.contains("too many tokens")
|
||||
|| body.contains("prompt is too long");
|
||||
ClassifiedLlmError {
|
||||
kind: if is_context_overflow { LlmErrorKind::ContextOverflow } else { LlmErrorKind::Unknown },
|
||||
retryable: false,
|
||||
should_compress: is_context_overflow,
|
||||
should_rotate_credential: false,
|
||||
retry_after: None,
|
||||
message: if is_context_overflow {
|
||||
"上下文过长,需要压缩".to_string()
|
||||
} else {
|
||||
format!("请求错误: {}", &body[..body.len().min(200)])
|
||||
},
|
||||
}
|
||||
}
|
||||
404 => ClassifiedLlmError {
|
||||
kind: LlmErrorKind::ModelNotFound,
|
||||
retryable: false,
|
||||
should_compress: false,
|
||||
should_rotate_credential: false,
|
||||
retry_after: None,
|
||||
message: "模型不存在".to_string(),
|
||||
},
|
||||
_ => ClassifiedLlmError {
|
||||
kind: LlmErrorKind::Unknown,
|
||||
retryable: true,
|
||||
should_compress: false,
|
||||
should_rotate_credential: false,
|
||||
retry_after: None,
|
||||
message: format!("未知错误 ({}) {}", status, &body[..body.len().min(200)]),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_retry_after(body: &str) -> Option<Duration> {
|
||||
// Anthropic: "Please retry after X seconds"
|
||||
// OpenAI: "Please retry after Xms"
|
||||
if let Some(secs) = extract_retry_seconds(body) {
|
||||
return Some(Duration::from_secs(secs));
|
||||
}
|
||||
if let Some(ms) = extract_retry_millis(body) {
|
||||
return Some(Duration::from_millis(ms));
|
||||
}
|
||||
Some(Duration::from_secs(2))
|
||||
}
|
||||
|
||||
fn extract_retry_seconds(body: &str) -> Option<u64> {
|
||||
let re = regex::Regex::new(r"retry\s+(?:after\s+)?(\d+)\s*(?:s|sec|seconds?)").ok()?;
|
||||
let caps = re.captures(body)?;
|
||||
caps[1].parse().ok()
|
||||
}
|
||||
|
||||
fn extract_retry_millis(body: &str) -> Option<u64> {
|
||||
let re = regex::Regex::new(r"retry\s+(?:after\s+)?(\d+)\s*ms").ok()?;
|
||||
let caps = re.captures(body)?;
|
||||
caps[1].parse().ok()
|
||||
}
|
||||
@@ -238,6 +238,8 @@ impl LlmDriver for GeminiDriver {
|
||||
input_tokens,
|
||||
output_tokens,
|
||||
stop_reason: stop_reason.to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -500,6 +502,8 @@ impl GeminiDriver {
|
||||
input_tokens,
|
||||
output_tokens,
|
||||
stop_reason,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,6 +238,8 @@ impl LocalDriver {
|
||||
input_tokens,
|
||||
output_tokens,
|
||||
stop_reason,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,6 +398,8 @@ impl LlmDriver for LocalDriver {
|
||||
input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
stop_reason: "end_turn".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -15,11 +15,14 @@ mod anthropic;
|
||||
mod openai;
|
||||
mod gemini;
|
||||
mod local;
|
||||
mod error_classifier;
|
||||
mod retry_driver;
|
||||
|
||||
pub use anthropic::AnthropicDriver;
|
||||
pub use openai::OpenAiDriver;
|
||||
pub use gemini::GeminiDriver;
|
||||
pub use local::LocalDriver;
|
||||
pub use retry_driver::{RetryDriver, RetryConfig};
|
||||
|
||||
/// LLM Driver trait - unified interface for all providers
|
||||
#[async_trait]
|
||||
@@ -106,6 +109,12 @@ pub struct CompletionResponse {
|
||||
pub output_tokens: u32,
|
||||
/// Stop reason
|
||||
pub stop_reason: StopReason,
|
||||
/// Cache creation input tokens (Anthropic prompt caching)
|
||||
#[serde(default)]
|
||||
pub cache_creation_input_tokens: Option<u32>,
|
||||
/// Cache read input tokens (Anthropic prompt caching)
|
||||
#[serde(default)]
|
||||
pub cache_read_input_tokens: Option<u32>,
|
||||
}
|
||||
|
||||
/// LLM driver response content block (subset of canonical zclaw_types::ContentBlock).
|
||||
|
||||
@@ -237,6 +237,8 @@ impl LlmDriver for OpenAiDriver {
|
||||
input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
stop_reason: "end_turn".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
@@ -638,6 +640,8 @@ impl OpenAiDriver {
|
||||
input_tokens,
|
||||
output_tokens,
|
||||
stop_reason,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -761,6 +765,8 @@ impl OpenAiDriver {
|
||||
StopReason::StopSequence => "stop",
|
||||
StopReason::Error => "error",
|
||||
}.to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
123
crates/zclaw-runtime/src/driver/retry_driver.rs
Normal file
123
crates/zclaw-runtime/src/driver/retry_driver.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
//! RetryDriver: LlmDriver 的重试装饰器。
|
||||
//! 仅在本地 Kernel 路径使用,SaaS Relay 已有自己的重试逻辑。
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use futures::Stream;
|
||||
use rand::Rng;
|
||||
use zclaw_types::{Result, ZclawError};
|
||||
|
||||
use super::{LlmDriver, CompletionRequest, CompletionResponse, StreamChunk};
|
||||
use super::error_classifier::classify_llm_error;
|
||||
|
||||
/// 重试配置
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RetryConfig {
|
||||
pub max_attempts: u32,
|
||||
pub base_delay_secs: f64,
|
||||
pub max_delay_secs: f64,
|
||||
pub jitter_ratio: f64,
|
||||
}
|
||||
|
||||
impl Default for RetryConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_attempts: 3,
|
||||
base_delay_secs: 1.0,
|
||||
max_delay_secs: 8.0,
|
||||
jitter_ratio: 0.5,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 重试装饰器
|
||||
pub struct RetryDriver {
|
||||
inner: Arc<dyn LlmDriver>,
|
||||
config: RetryConfig,
|
||||
}
|
||||
|
||||
impl RetryDriver {
|
||||
pub fn new(inner: Arc<dyn LlmDriver>, config: RetryConfig) -> Self {
|
||||
Self { inner, config }
|
||||
}
|
||||
|
||||
fn jittered_backoff(&self, attempt: u32) -> Duration {
|
||||
let base = self.config.base_delay_secs * 2_f64.powi(attempt as i32);
|
||||
let capped = base.min(self.config.max_delay_secs);
|
||||
let mut rng = rand::thread_rng();
|
||||
let jitter = capped * self.config.jitter_ratio * rng.gen::<f64>();
|
||||
Duration::from_secs_f64(capped + jitter)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LlmDriver for RetryDriver {
|
||||
fn provider(&self) -> &str {
|
||||
self.inner.provider()
|
||||
}
|
||||
|
||||
async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
|
||||
let mut last_error: Option<ZclawError> = None;
|
||||
|
||||
for attempt in 0..self.config.max_attempts {
|
||||
match self.inner.complete(request.clone()).await {
|
||||
Ok(response) => return Ok(response),
|
||||
Err(e) => {
|
||||
let message = e.to_string();
|
||||
let status = extract_status_from_error(&message);
|
||||
let classified = classify_llm_error(
|
||||
self.inner.provider(),
|
||||
status,
|
||||
&message,
|
||||
message.contains("timeout") || message.contains("Timeout"),
|
||||
);
|
||||
|
||||
if !classified.retryable {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if classified.should_compress {
|
||||
return Err(ZclawError::LlmError(
|
||||
format!("[CONTEXT_OVERFLOW] {}", message)
|
||||
));
|
||||
}
|
||||
|
||||
last_error = Some(e);
|
||||
|
||||
if attempt + 1 < self.config.max_attempts {
|
||||
let delay = classified.retry_after
|
||||
.unwrap_or_else(|| self.jittered_backoff(attempt));
|
||||
tracing::warn!(
|
||||
"[RetryDriver] Attempt {}/{} failed ({}), retrying in {:.1}s",
|
||||
attempt + 1, self.config.max_attempts, classified.message,
|
||||
delay.as_secs_f64()
|
||||
);
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(last_error.unwrap_or_else(|| ZclawError::LlmError("重试耗尽".to_string())))
|
||||
}
|
||||
|
||||
fn stream(
|
||||
&self,
|
||||
request: CompletionRequest,
|
||||
) -> std::pin::Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
|
||||
// 流式路径不重试——部分 delta 已发送,重试会导致 UI 重复
|
||||
self.inner.stream(request)
|
||||
}
|
||||
|
||||
fn is_configured(&self) -> bool {
|
||||
self.inner.is_configured()
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_status_from_error(message: &str) -> u16 {
|
||||
let re = regex::Regex::new(r"(?:error|status)[:\s]+(\d{3})").ok();
|
||||
re.and_then(|re| re.captures(message))
|
||||
.and_then(|caps| caps[1].parse().ok())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
@@ -4,10 +4,11 @@ use std::sync::Arc;
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::mpsc;
|
||||
use zclaw_types::{AgentId, SessionId, Message, Result};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::driver::{LlmDriver, CompletionRequest, ContentBlock};
|
||||
use crate::stream::StreamChunk;
|
||||
use crate::tool::{ToolRegistry, ToolContext, SkillExecutor, HandExecutor};
|
||||
use crate::tool::{ToolRegistry, ToolContext, SkillExecutor, HandExecutor, ToolConcurrency};
|
||||
use crate::tool::builtin::PathValidator;
|
||||
use crate::growth::GrowthIntegration;
|
||||
use crate::compaction::{self, CompactionConfig};
|
||||
@@ -303,8 +304,28 @@ impl AgentLoop {
|
||||
plan_mode: self.plan_mode,
|
||||
};
|
||||
|
||||
// Call LLM
|
||||
let response = self.driver.complete(request).await?;
|
||||
// Call LLM with context-overflow recovery
|
||||
let response = match self.driver.complete(request).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let err_str = e.to_string();
|
||||
if err_str.contains("[CONTEXT_OVERFLOW]") && self.compaction_threshold > 0 {
|
||||
tracing::warn!("[AgentLoop] Context overflow detected, triggering emergency compaction");
|
||||
let pruned = compaction::prune_tool_outputs(&mut messages);
|
||||
if pruned > 0 {
|
||||
tracing::info!("[AgentLoop] Emergency pruning removed {} tool outputs", pruned);
|
||||
}
|
||||
let keep_recent = messages.len().saturating_sub(messages.len() / 3);
|
||||
let (compacted, removed) = compaction::compact_messages(messages, keep_recent.max(4));
|
||||
if removed > 0 {
|
||||
tracing::info!("[AgentLoop] Emergency compaction removed {} messages", removed);
|
||||
messages = compacted;
|
||||
continue; // retry the iteration with compacted messages
|
||||
}
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
total_input_tokens += response.input_tokens;
|
||||
total_output_tokens += response.output_tokens;
|
||||
|
||||
@@ -375,21 +396,22 @@ impl AgentLoop {
|
||||
let tool_context = self.create_tool_context(session_id.clone());
|
||||
let mut abort_result: Option<AgentLoopResult> = None;
|
||||
let mut clarification_result: Option<AgentLoopResult> = None;
|
||||
for (id, name, input) in tool_calls {
|
||||
// Check if loop was already aborted
|
||||
if abort_result.is_some() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Phase 1: Pre-process inputs + middleware checks (serial)
|
||||
struct ToolPlan {
|
||||
idx: usize,
|
||||
id: String,
|
||||
name: String,
|
||||
input: Value,
|
||||
}
|
||||
let mut plans: Vec<ToolPlan> = Vec::new();
|
||||
for (idx, (id, name, input)) in tool_calls.into_iter().enumerate() {
|
||||
if abort_result.is_some() { break; }
|
||||
|
||||
// GLM and other models sometimes send tool calls with empty arguments `{}`
|
||||
// Inject the last user message as a fallback query so the tool can infer intent.
|
||||
let input = if input.as_object().map_or(false, |obj| obj.is_empty()) {
|
||||
if let Some(last_user_msg) = messages.iter().rev().find_map(|m| {
|
||||
if let Message::User { content } = m {
|
||||
Some(content.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
if let Message::User { content } = m { Some(content.clone()) } else { None }
|
||||
}) {
|
||||
tracing::info!("[AgentLoop] Tool '{}' received empty input, injecting user message as fallback query", name);
|
||||
serde_json::json!({ "_fallback_query": last_user_msg })
|
||||
@@ -400,101 +422,138 @@ impl AgentLoop {
|
||||
input
|
||||
};
|
||||
|
||||
// Check tool call safety — via middleware chain
|
||||
{
|
||||
let mw_ctx_ref = middleware::MiddlewareContext {
|
||||
agent_id: self.agent_id.clone(),
|
||||
session_id: session_id.clone(),
|
||||
user_input: input.to_string(),
|
||||
system_prompt: enhanced_prompt.clone(),
|
||||
messages: messages.clone(),
|
||||
response_content: Vec::new(),
|
||||
input_tokens: total_input_tokens,
|
||||
output_tokens: total_output_tokens,
|
||||
};
|
||||
match self.middleware_chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? {
|
||||
middleware::ToolCallDecision::Allow => {}
|
||||
middleware::ToolCallDecision::Block(msg) => {
|
||||
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
|
||||
let error_output = serde_json::json!({ "error": msg });
|
||||
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true));
|
||||
continue;
|
||||
}
|
||||
middleware::ToolCallDecision::ReplaceInput(new_input) => {
|
||||
// Execute with replaced input (with timeout)
|
||||
let tool_result = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
self.execute_tool(&name, new_input, &tool_context),
|
||||
).await {
|
||||
Ok(Ok(result)) => result,
|
||||
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
|
||||
Err(_) => {
|
||||
tracing::warn!("[AgentLoop] Tool '{}' (replaced input) timed out after 30s", name);
|
||||
serde_json::json!({ "error": format!("工具 '{}' 执行超时(30秒),请重试", name) })
|
||||
}
|
||||
};
|
||||
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), tool_result, false));
|
||||
continue;
|
||||
}
|
||||
middleware::ToolCallDecision::AbortLoop(reason) => {
|
||||
tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason);
|
||||
let msg = format!("{}\n已自动终止", reason);
|
||||
self.memory.append_message(&session_id, &Message::assistant(&msg)).await?;
|
||||
abort_result = Some(AgentLoopResult {
|
||||
response: msg,
|
||||
input_tokens: total_input_tokens,
|
||||
output_tokens: total_output_tokens,
|
||||
iterations,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let tool_result = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
self.execute_tool(&name, input, &tool_context),
|
||||
).await {
|
||||
Ok(Ok(result)) => result,
|
||||
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
|
||||
Err(_) => {
|
||||
tracing::warn!("[AgentLoop] Tool '{}' timed out after 30s", name);
|
||||
serde_json::json!({ "error": format!("工具 '{}' 执行超时(30秒),请重试", name) })
|
||||
}
|
||||
let mw_ctx = middleware::MiddlewareContext {
|
||||
agent_id: self.agent_id.clone(),
|
||||
session_id: session_id.clone(),
|
||||
user_input: input.to_string(),
|
||||
system_prompt: enhanced_prompt.clone(),
|
||||
messages: messages.clone(),
|
||||
response_content: Vec::new(),
|
||||
input_tokens: total_input_tokens,
|
||||
output_tokens: total_output_tokens,
|
||||
};
|
||||
match self.middleware_chain.run_before_tool_call(&mw_ctx, &name, &input).await? {
|
||||
middleware::ToolCallDecision::Allow => {
|
||||
plans.push(ToolPlan { idx, id, name, input });
|
||||
}
|
||||
middleware::ToolCallDecision::Block(msg) => {
|
||||
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
|
||||
messages.push(Message::tool_result(&id, zclaw_types::ToolId::new(&name), serde_json::json!({ "error": msg }), true));
|
||||
}
|
||||
middleware::ToolCallDecision::ReplaceInput(new_input) => {
|
||||
plans.push(ToolPlan { idx, id, name, input: new_input });
|
||||
}
|
||||
middleware::ToolCallDecision::AbortLoop(reason) => {
|
||||
tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason);
|
||||
let msg = format!("{}\n已自动终止", reason);
|
||||
self.memory.append_message(&session_id, &Message::assistant(&msg)).await?;
|
||||
abort_result = Some(AgentLoopResult {
|
||||
response: msg,
|
||||
input_tokens: total_input_tokens,
|
||||
output_tokens: total_output_tokens,
|
||||
iterations,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this is a clarification response — terminate loop immediately
|
||||
// so the LLM waits for user input instead of continuing to generate.
|
||||
if name == "ask_clarification"
|
||||
&& tool_result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed")
|
||||
{
|
||||
tracing::info!("[AgentLoop] Clarification requested, terminating loop");
|
||||
let question = tool_result.get("question")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("需要更多信息")
|
||||
.to_string();
|
||||
messages.push(Message::tool_result(
|
||||
id,
|
||||
zclaw_types::ToolId::new(&name),
|
||||
tool_result,
|
||||
false,
|
||||
));
|
||||
self.memory.append_message(&session_id, &Message::assistant(&question)).await?;
|
||||
clarification_result = Some(AgentLoopResult {
|
||||
response: question,
|
||||
input_tokens: total_input_tokens,
|
||||
output_tokens: total_output_tokens,
|
||||
iterations,
|
||||
// Phase 2: Execute tools (parallel for ReadOnly, serial for others)
|
||||
if abort_result.is_none() && !plans.is_empty() {
|
||||
let (parallel_plans, sequential_plans): (Vec<_>, Vec<_>) = plans.iter()
|
||||
.partition(|p| {
|
||||
self.tools.get(&p.name)
|
||||
.map(|t| t.concurrency())
|
||||
.unwrap_or(ToolConcurrency::Exclusive) == ToolConcurrency::ReadOnly
|
||||
});
|
||||
break;
|
||||
|
||||
let mut results: std::collections::HashMap<usize, (String, String, serde_json::Value)> = std::collections::HashMap::new();
|
||||
|
||||
// Execute parallel (ReadOnly) tools with JoinSet (max 3 concurrent)
|
||||
if !parallel_plans.is_empty() {
|
||||
let semaphore = Arc::new(tokio::sync::Semaphore::new(3));
|
||||
let mut join_set = tokio::task::JoinSet::new();
|
||||
|
||||
for plan in ¶llel_plans {
|
||||
let tool = self.tools.get(&plan.name).unwrap();
|
||||
let ctx = tool_context.clone();
|
||||
let input = plan.input.clone();
|
||||
let idx = plan.idx;
|
||||
let id = plan.id.clone();
|
||||
let name = plan.name.clone();
|
||||
let permit = semaphore.clone().acquire_owned().await.unwrap();
|
||||
|
||||
join_set.spawn(async move {
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
tool.execute(input, &ctx)
|
||||
).await;
|
||||
drop(permit);
|
||||
(idx, id, name, result)
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(res) = join_set.join_next().await {
|
||||
match res {
|
||||
Ok((idx, id, name, Ok(Ok(value)))) => {
|
||||
results.insert(idx, (id, name, value));
|
||||
}
|
||||
Ok((idx, id, name, Ok(Err(e)))) => {
|
||||
results.insert(idx, (id, name, serde_json::json!({ "error": e.to_string() })));
|
||||
}
|
||||
Ok((idx, id, name, Err(_))) => {
|
||||
tracing::warn!("[AgentLoop] Tool '{}' timed out after 30s (parallel)", name);
|
||||
results.insert(idx, (id, name.clone(), serde_json::json!({ "error": format!("工具 '{}' 执行超时(30秒),请重试", name) })));
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("[AgentLoop] JoinError in parallel tool execution: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add tool result to messages
|
||||
messages.push(Message::tool_result(
|
||||
id,
|
||||
zclaw_types::ToolId::new(&name),
|
||||
tool_result,
|
||||
false, // is_error - we include errors in the result itself
|
||||
));
|
||||
// Execute sequential (Exclusive/Interactive) tools
|
||||
for plan in &sequential_plans {
|
||||
let tool_result = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
self.execute_tool(&plan.name, plan.input.clone(), &tool_context),
|
||||
).await {
|
||||
Ok(Ok(result)) => result,
|
||||
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
|
||||
Err(_) => {
|
||||
tracing::warn!("[AgentLoop] Tool '{}' timed out after 30s", plan.name);
|
||||
serde_json::json!({ "error": format!("工具 '{}' 执行超时(30秒),请重试", plan.name) })
|
||||
}
|
||||
};
|
||||
|
||||
// Check if this is a clarification response
|
||||
if plan.name == "ask_clarification"
|
||||
&& tool_result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed")
|
||||
{
|
||||
tracing::info!("[AgentLoop] Clarification requested, terminating loop");
|
||||
let question = tool_result.get("question")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("需要更多信息")
|
||||
.to_string();
|
||||
results.insert(plan.idx, (plan.id.clone(), plan.name.clone(), tool_result));
|
||||
self.memory.append_message(&session_id, &Message::assistant(&question)).await?;
|
||||
clarification_result = Some(AgentLoopResult {
|
||||
response: question,
|
||||
input_tokens: total_input_tokens,
|
||||
output_tokens: total_output_tokens,
|
||||
iterations,
|
||||
});
|
||||
break;
|
||||
}
|
||||
results.insert(plan.idx, (plan.id.clone(), plan.name.clone(), tool_result));
|
||||
}
|
||||
|
||||
// Push results in original tool_call order
|
||||
let mut sorted_indices: Vec<usize> = results.keys().copied().collect();
|
||||
sorted_indices.sort();
|
||||
for idx in sorted_indices {
|
||||
let (id, name, result) = results.remove(&idx).unwrap();
|
||||
messages.push(Message::tool_result(&id, zclaw_types::ToolId::new(&name), result, false));
|
||||
}
|
||||
}
|
||||
|
||||
// Continue the loop - LLM will process tool results and generate final response
|
||||
|
||||
@@ -1,21 +1,49 @@
|
||||
//! Compaction middleware — wraps the existing compaction module.
|
||||
//!
|
||||
//! Supports debounce (cooldown + min-round checks), async LLM compression
|
||||
//! with cached fallback, and iterative summaries that carry forward key info.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use zclaw_types::Result;
|
||||
use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision};
|
||||
use crate::compaction::{self, CompactionConfig};
|
||||
use crate::growth::GrowthIntegration;
|
||||
use crate::driver::LlmDriver;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use zclaw_types::{Message, Result};
|
||||
use crate::compaction::{self, CompactionConfig};
|
||||
use crate::driver::LlmDriver;
|
||||
use crate::growth::GrowthIntegration;
|
||||
use crate::middleware::{AgentMiddleware, MiddlewareContext, MiddlewareDecision};
|
||||
|
||||
/// Minimum seconds between consecutive compactions.
|
||||
const COMPACTION_COOLDOWN_SECS: u64 = 30;
|
||||
/// Minimum message pairs (user+assistant) since last compaction before triggering again.
|
||||
const COMPACTION_MIN_ROUNDS: u64 = 3;
|
||||
|
||||
fn now_millis() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64
|
||||
}
|
||||
|
||||
/// Shared compaction debounce state (lock-free).
|
||||
struct CompactionState {
|
||||
last_compaction_ms: AtomicU64,
|
||||
last_compaction_msg_count: AtomicU64,
|
||||
}
|
||||
|
||||
/// Cached result from a previous async LLM compaction.
|
||||
struct AsyncCompactionCache {
|
||||
last_result: RwLock<Option<Vec<Message>>>,
|
||||
}
|
||||
|
||||
/// Middleware that compresses conversation history when it exceeds a token threshold.
|
||||
pub struct CompactionMiddleware {
|
||||
threshold: usize,
|
||||
config: CompactionConfig,
|
||||
/// Optional LLM driver for async compaction (LLM summarisation, memory flush).
|
||||
driver: Option<Arc<dyn LlmDriver>>,
|
||||
/// Optional growth integration for memory flushing during compaction.
|
||||
growth: Option<GrowthIntegration>,
|
||||
state: Arc<CompactionState>,
|
||||
cache: Arc<AsyncCompactionCache>,
|
||||
}
|
||||
|
||||
impl CompactionMiddleware {
|
||||
@@ -25,7 +53,39 @@ impl CompactionMiddleware {
|
||||
driver: Option<Arc<dyn LlmDriver>>,
|
||||
growth: Option<GrowthIntegration>,
|
||||
) -> Self {
|
||||
Self { threshold, config, driver, growth }
|
||||
Self {
|
||||
threshold,
|
||||
config,
|
||||
driver,
|
||||
growth,
|
||||
state: Arc::new(CompactionState {
|
||||
last_compaction_ms: AtomicU64::new(0),
|
||||
last_compaction_msg_count: AtomicU64::new(0),
|
||||
}),
|
||||
cache: Arc::new(AsyncCompactionCache {
|
||||
last_result: RwLock::new(None),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn should_compact(&self, msg_count: u64) -> bool {
|
||||
let last_ms = self.state.last_compaction_ms.load(Ordering::Relaxed);
|
||||
let last_count = self.state.last_compaction_msg_count.load(Ordering::Relaxed);
|
||||
|
||||
if now_millis().saturating_sub(last_ms) < COMPACTION_COOLDOWN_SECS * 1000 {
|
||||
return false;
|
||||
}
|
||||
|
||||
if msg_count.saturating_sub(last_count) < COMPACTION_MIN_ROUNDS * 2 {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn record_compaction(&self, msg_count: u64) {
|
||||
self.state.last_compaction_ms.store(now_millis(), Ordering::Relaxed);
|
||||
self.state.last_compaction_msg_count.store(msg_count, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +99,29 @@ impl AgentMiddleware for CompactionMiddleware {
|
||||
return Ok(MiddlewareDecision::Continue);
|
||||
}
|
||||
|
||||
// Step 1: Prune old tool outputs (cheap, no LLM needed)
|
||||
let pruned = compaction::prune_tool_outputs(&mut ctx.messages);
|
||||
if pruned > 0 {
|
||||
tracing::info!("[CompactionMiddleware] Pruned {} old tool outputs", pruned);
|
||||
}
|
||||
|
||||
// Step 2: Re-estimate tokens after pruning
|
||||
let tokens = compaction::estimate_messages_tokens_calibrated(&ctx.messages);
|
||||
if tokens < self.threshold {
|
||||
return Ok(MiddlewareDecision::Continue);
|
||||
}
|
||||
|
||||
// Step 3: Debounce check
|
||||
if !self.should_compact(ctx.messages.len() as u64) {
|
||||
// Still over threshold but within cooldown — use cached result if available
|
||||
if let Some(cached) = self.cache.last_result.read().await.clone() {
|
||||
tracing::debug!("[CompactionMiddleware] Cooldown active, using cached compaction result");
|
||||
ctx.messages = cached;
|
||||
}
|
||||
return Ok(MiddlewareDecision::Continue);
|
||||
}
|
||||
|
||||
// Step 4: Execute compaction
|
||||
let needs_async = self.config.use_llm || self.config.memory_flush_enabled;
|
||||
if needs_async {
|
||||
let outcome = compaction::maybe_compact_with_config(
|
||||
@@ -56,6 +139,14 @@ impl AgentMiddleware for CompactionMiddleware {
|
||||
ctx.messages = compaction::maybe_compact(ctx.messages.clone(), self.threshold);
|
||||
}
|
||||
|
||||
self.record_compaction(ctx.messages.len() as u64);
|
||||
|
||||
// Cache result for cooldown fallback
|
||||
{
|
||||
let mut cache = self.cache.last_result.write().await;
|
||||
*cache = Some(ctx.messages.clone());
|
||||
}
|
||||
|
||||
Ok(MiddlewareDecision::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use serde_json::Value;
|
||||
use zclaw_types::Result;
|
||||
use crate::driver::ContentBlock;
|
||||
use crate::middleware::{AgentMiddleware, MiddlewareContext, ToolCallDecision};
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
|
||||
/// Middleware that intercepts tool call errors and formats recovery messages.
|
||||
///
|
||||
@@ -24,7 +24,7 @@ pub struct ToolErrorMiddleware {
|
||||
/// Maximum consecutive failures before aborting the loop.
|
||||
max_consecutive_failures: u32,
|
||||
/// Tracks consecutive tool failures.
|
||||
consecutive_failures: Mutex<u32>,
|
||||
consecutive_failures: AtomicU32,
|
||||
}
|
||||
|
||||
impl ToolErrorMiddleware {
|
||||
@@ -32,7 +32,7 @@ impl ToolErrorMiddleware {
|
||||
Self {
|
||||
max_error_length: 500,
|
||||
max_consecutive_failures: 3,
|
||||
consecutive_failures: Mutex::new(0),
|
||||
consecutive_failures: AtomicU32::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,14 +80,14 @@ impl AgentMiddleware for ToolErrorMiddleware {
|
||||
}
|
||||
|
||||
// Check consecutive failure count — abort if too many failures
|
||||
let failures = self.consecutive_failures.lock().unwrap_or_else(|e| e.into_inner());
|
||||
if *failures >= self.max_consecutive_failures {
|
||||
let failures = self.consecutive_failures.load(Ordering::SeqCst);
|
||||
if failures >= self.max_consecutive_failures {
|
||||
tracing::warn!(
|
||||
"[ToolErrorMiddleware] Aborting loop: {} consecutive tool failures",
|
||||
*failures
|
||||
failures
|
||||
);
|
||||
return Ok(ToolCallDecision::AbortLoop(
|
||||
format!("连续 {} 次工具调用失败,已自动终止以避免无限重试", *failures)
|
||||
format!("连续 {} 次工具调用失败,已自动终止以避免无限重试", failures)
|
||||
));
|
||||
}
|
||||
|
||||
@@ -100,11 +100,9 @@ impl AgentMiddleware for ToolErrorMiddleware {
|
||||
tool_name: &str,
|
||||
result: &Value,
|
||||
) -> Result<()> {
|
||||
let mut failures = self.consecutive_failures.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
// Check if the tool result indicates an error.
|
||||
if let Some(error) = result.get("error") {
|
||||
*failures += 1;
|
||||
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
let error_msg = match error {
|
||||
Value::String(s) => s.clone(),
|
||||
other => other.to_string(),
|
||||
@@ -118,7 +116,7 @@ impl AgentMiddleware for ToolErrorMiddleware {
|
||||
|
||||
tracing::warn!(
|
||||
"[ToolErrorMiddleware] Tool '{}' failed ({}/{} consecutive): {}",
|
||||
tool_name, *failures, self.max_consecutive_failures, truncated
|
||||
tool_name, failures, self.max_consecutive_failures, truncated
|
||||
);
|
||||
|
||||
let guided_message = self.format_tool_error(tool_name, &truncated);
|
||||
@@ -127,7 +125,7 @@ impl AgentMiddleware for ToolErrorMiddleware {
|
||||
});
|
||||
} else {
|
||||
// Success — reset consecutive failure counter
|
||||
*failures = 0;
|
||||
self.consecutive_failures.store(0, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -24,6 +24,10 @@ pub enum StreamChunk {
|
||||
input_tokens: u32,
|
||||
output_tokens: u32,
|
||||
stop_reason: String,
|
||||
#[serde(default)]
|
||||
cache_creation_input_tokens: Option<u32>,
|
||||
#[serde(default)]
|
||||
cache_read_input_tokens: Option<u32>,
|
||||
},
|
||||
/// Error occurred
|
||||
Error { message: String },
|
||||
|
||||
@@ -55,6 +55,8 @@ impl MockLlmDriver {
|
||||
input_tokens: 10,
|
||||
output_tokens: text.len() as u32 / 4,
|
||||
stop_reason: StopReason::EndTurn,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
});
|
||||
self
|
||||
}
|
||||
@@ -74,6 +76,8 @@ impl MockLlmDriver {
|
||||
input_tokens: 10,
|
||||
output_tokens: 20,
|
||||
stop_reason: StopReason::ToolUse,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
});
|
||||
self
|
||||
}
|
||||
@@ -86,6 +90,8 @@ impl MockLlmDriver {
|
||||
input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
stop_reason: StopReason::Error,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
});
|
||||
self
|
||||
}
|
||||
@@ -142,6 +148,8 @@ impl MockLlmDriver {
|
||||
input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
stop_reason: StopReason::EndTurn,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -190,6 +198,8 @@ impl LlmDriver for MockLlmDriver {
|
||||
input_tokens: 10,
|
||||
output_tokens: 2,
|
||||
stop_reason: "end_turn".to_string(),
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
]
|
||||
})
|
||||
|
||||
@@ -11,6 +11,17 @@ use crate::driver::ToolDefinition;
|
||||
use crate::loop_runner::LoopEvent;
|
||||
use crate::tool::builtin::PathValidator;
|
||||
|
||||
/// Tool concurrency safety level
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ToolConcurrency {
|
||||
/// Read-only operations, always safe to parallelize (file_read, web_fetch, etc.)
|
||||
ReadOnly,
|
||||
/// Exclusive operations, must be serial (file_write, shell_exec, etc.)
|
||||
Exclusive,
|
||||
/// Interactive operations, never parallelize (ask_clarification, etc.)
|
||||
Interactive,
|
||||
}
|
||||
|
||||
/// Tool trait for implementing agent tools
|
||||
#[async_trait]
|
||||
pub trait Tool: Send + Sync {
|
||||
@@ -25,6 +36,11 @@ pub trait Tool: Send + Sync {
|
||||
|
||||
/// Execute the tool
|
||||
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value>;
|
||||
|
||||
/// Tool concurrency safety level. Default: ReadOnly.
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::ReadOnly
|
||||
}
|
||||
}
|
||||
|
||||
/// Skill executor trait for runtime skill execution
|
||||
|
||||
@@ -9,7 +9,7 @@ use async_trait::async_trait;
|
||||
use serde_json::{json, Value};
|
||||
use zclaw_types::{Result, ZclawError};
|
||||
|
||||
use crate::tool::{Tool, ToolContext};
|
||||
use crate::tool::{Tool, ToolContext, ToolConcurrency};
|
||||
|
||||
/// Clarification type — categorizes the reason for asking.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -96,6 +96,10 @@ impl Tool for AskClarificationTool {
|
||||
})
|
||||
}
|
||||
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::Interactive
|
||||
}
|
||||
|
||||
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<Value> {
|
||||
let question = input["question"].as_str()
|
||||
.ok_or_else(|| ZclawError::InvalidInput("Missing 'question' parameter".into()))?;
|
||||
|
||||
@@ -4,7 +4,7 @@ use async_trait::async_trait;
|
||||
use serde_json::{json, Value};
|
||||
use zclaw_types::{Result, ZclawError};
|
||||
|
||||
use crate::tool::{Tool, ToolContext};
|
||||
use crate::tool::{Tool, ToolContext, ToolConcurrency};
|
||||
|
||||
pub struct ExecuteSkillTool;
|
||||
|
||||
@@ -42,6 +42,10 @@ impl Tool for ExecuteSkillTool {
|
||||
})
|
||||
}
|
||||
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::Exclusive
|
||||
}
|
||||
|
||||
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
|
||||
let skill_id = input["skill_id"].as_str()
|
||||
.ok_or_else(|| ZclawError::InvalidInput("Missing 'skill_id' parameter".into()))?;
|
||||
|
||||
@@ -6,7 +6,7 @@ use zclaw_types::{Result, ZclawError};
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
|
||||
use crate::tool::{Tool, ToolContext};
|
||||
use crate::tool::{Tool, ToolContext, ToolConcurrency};
|
||||
use super::path_validator::PathValidator;
|
||||
|
||||
pub struct FileWriteTool;
|
||||
@@ -55,6 +55,10 @@ impl Tool for FileWriteTool {
|
||||
})
|
||||
}
|
||||
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::Exclusive
|
||||
}
|
||||
|
||||
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
|
||||
let path = input["path"].as_str()
|
||||
.ok_or_else(|| ZclawError::InvalidInput("Missing 'path' parameter".into()))?;
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use zclaw_types::Result;
|
||||
|
||||
use crate::tool::{Tool, ToolContext};
|
||||
use crate::tool::{Tool, ToolContext, ToolConcurrency};
|
||||
|
||||
/// Wraps an MCP tool adapter into the `Tool` trait.
|
||||
///
|
||||
@@ -42,6 +42,10 @@ impl Tool for McpToolWrapper {
|
||||
self.adapter.input_schema().clone()
|
||||
}
|
||||
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::Exclusive
|
||||
}
|
||||
|
||||
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<Value> {
|
||||
self.adapter.execute(input).await
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::process::{Command, Stdio};
|
||||
use std::time::{Duration, Instant};
|
||||
use zclaw_types::{Result, ZclawError};
|
||||
|
||||
use crate::tool::{Tool, ToolContext};
|
||||
use crate::tool::{Tool, ToolContext, ToolConcurrency};
|
||||
|
||||
/// Parse a command string into program and arguments using proper shell quoting
|
||||
fn parse_command(command: &str) -> Result<(String, Vec<String>)> {
|
||||
@@ -175,6 +175,10 @@ impl Tool for ShellExecTool {
|
||||
})
|
||||
}
|
||||
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::Exclusive
|
||||
}
|
||||
|
||||
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<Value> {
|
||||
let command = input["command"].as_str()
|
||||
.ok_or_else(|| ZclawError::InvalidInput("Missing 'command' parameter".into()))?;
|
||||
|
||||
@@ -11,7 +11,7 @@ use zclaw_memory::MemoryStore;
|
||||
|
||||
use crate::driver::LlmDriver;
|
||||
use crate::loop_runner::{AgentLoop, LoopEvent};
|
||||
use crate::tool::{Tool, ToolContext, ToolRegistry};
|
||||
use crate::tool::{Tool, ToolContext, ToolRegistry, ToolConcurrency};
|
||||
use crate::tool::builtin::register_builtin_tools;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -91,6 +91,10 @@ impl Tool for TaskTool {
|
||||
})
|
||||
}
|
||||
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::Exclusive
|
||||
}
|
||||
|
||||
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
|
||||
let description = input["description"].as_str()
|
||||
.ok_or_else(|| ZclawError::InvalidInput("Missing 'description' parameter".into()))?;
|
||||
|
||||
@@ -7,7 +7,7 @@ use async_trait::async_trait;
|
||||
use serde_json::{json, Value};
|
||||
use zclaw_types::Result;
|
||||
|
||||
use crate::tool::{Tool, ToolContext};
|
||||
use crate::tool::{Tool, ToolContext, ToolConcurrency};
|
||||
|
||||
/// Wrapper that exposes a Hand as a Tool in the agent's tool registry.
|
||||
///
|
||||
@@ -78,6 +78,10 @@ impl Tool for HandTool {
|
||||
self.input_schema.clone()
|
||||
}
|
||||
|
||||
fn concurrency(&self) -> ToolConcurrency {
|
||||
ToolConcurrency::Exclusive
|
||||
}
|
||||
|
||||
async fn execute(&self, input: Value, context: &ToolContext) -> Result<Value> {
|
||||
// Delegate to the HandExecutor (bridged from HandRegistry via kernel).
|
||||
// If no hand_executor is available (e.g., standalone runtime without kernel),
|
||||
|
||||
@@ -223,6 +223,33 @@ impl Serialize for ZclawError {
|
||||
/// Result type alias for ZCLAW operations
|
||||
pub type Result<T> = std::result::Result<T, ZclawError>;
|
||||
|
||||
/// LLM 调用错误的细粒度分类,指导重试和恢复策略
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum LlmErrorKind {
|
||||
Auth,
|
||||
AuthPermanent,
|
||||
BillingExhausted,
|
||||
RateLimited,
|
||||
Overloaded,
|
||||
ServerError,
|
||||
Timeout,
|
||||
ContextOverflow,
|
||||
ModelNotFound,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// 分类后的 LLM 错误,附带恢复提示
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ClassifiedLlmError {
|
||||
pub kind: LlmErrorKind,
|
||||
pub retryable: bool,
|
||||
pub should_compress: bool,
|
||||
pub should_rotate_credential: bool,
|
||||
pub retry_after: Option<std::time::Duration>,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -133,6 +133,18 @@ skills/ -> SkillRegistry 加载 -> SkillIndexMiddleware@200 注入系统提示
|
||||
- MCP 限定名 `service_name.tool_name` 避免与内置工具冲突
|
||||
- 已删除空壳 Hands (04-17): Whiteboard/Slideshow/Speech,净减 ~5400 行
|
||||
|
||||
### ⚡ 新增工具/技能必须声明 concurrency 级别
|
||||
|
||||
`Tool` trait 的 `concurrency()` 方法决定并行执行策略 (04-24 Hermes Phase 2A):
|
||||
|
||||
| 级别 | 含义 | 适用场景 |
|
||||
|------|------|---------|
|
||||
| `ReadOnly` (默认) | 只读,始终可并行 | file_read, web_search, calculator |
|
||||
| `Exclusive` | 有副作用,必须串行 | file_write, shell_exec, send_message, execute_skill, task |
|
||||
| `Interactive` | 需要用户交互,永不并行 | ask_clarification |
|
||||
|
||||
**新增工具时**:在 `impl Tool for YourTool` 中覆盖 `concurrency()` 方法。默认 `ReadOnly`,如果有写操作/副作用必须返回 `ToolConcurrency::Exclusive`。未正确声明会导致并行执行时产生竞态条件。
|
||||
|
||||
## 4. 活跃问题 + 陷阱
|
||||
|
||||
### 活跃
|
||||
@@ -155,6 +167,7 @@ skills/ -> SkillRegistry 加载 -> SkillIndexMiddleware@200 注入系统提示
|
||||
|
||||
| 日期 | 变更 | 关联 |
|
||||
|------|------|------|
|
||||
| 2026-04-24 | Hermes Phase 2A: ToolConcurrency 枚举 + 并行执行 + concurrency() 声明要求 | commit 9060935 |
|
||||
| 2026-04-22 | Wiki 5-section 重构: 281->~195 行,语义路由细节引用 [[butler]] | wiki/ |
|
||||
| 2026-04-22 | Researcher 搜索修复: schema 扁平化 + 空参数回退 + 排版修复 | commit 5816f56+81005c3 |
|
||||
| 2026-04-17 | 空壳 Hand 清理: Whiteboard/Slideshow/Speech 删除,净减 ~5400 行 | Phase 5 清理 |
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
---
|
||||
title: ZCLAW 项目知识库
|
||||
updated: 2026-04-22
|
||||
updated: 2026-04-24
|
||||
status: active
|
||||
---
|
||||
|
||||
@@ -8,29 +8,29 @@ status: active
|
||||
|
||||
> 面向中文用户的 AI Agent 桌面客户端。管家模式 + 多模型 + 7 自主能力 + 75 技能。
|
||||
> **使用方式**: 找到你要处理的模块,读对应页面,直接开始工作。
|
||||
> **数据来源**: 2026-04-22 代码全量扫描验证,非文档推测。
|
||||
> **数据来源**: 2026-04-23 代码全量扫描验证,非文档推测。
|
||||
|
||||
## 项目画像
|
||||
|
||||
| 维度 | 值 |
|
||||
|------|-----|
|
||||
| 定位 | AI Agent 桌面客户端 (Tauri 2.x) |
|
||||
| 技术栈 | Rust 10 crates + src-tauri (~102K行, 357 .rs) + React 19 + TypeScript + PostgreSQL |
|
||||
| 技术栈 | Rust 10 crates + src-tauri (~148K行, 384 .rs) + React 19 + TypeScript + PostgreSQL |
|
||||
| 阶段 | 发布前稳定化,功能冻结中 |
|
||||
|
||||
## 关键数字(2026-04-22 代码验证)
|
||||
## 关键数字(2026-04-23 代码验证)
|
||||
|
||||
| 指标 | 值 |
|
||||
|------|-----|
|
||||
| Rust Crates | 10 + src-tauri |
|
||||
| Rust 代码 | 101,967 行 (357 .rs文件) |
|
||||
| Rust 测试 | 987 定义 / 797 通过 |
|
||||
| Tauri 命令 | 190 定义 / 97 @reserved / 104 invoke |
|
||||
| Rust 代码 | 148,185 行 (384 .rs文件) |
|
||||
| Rust 测试 | 997 定义 (619 #[test] + 378 #[tokio::test]) |
|
||||
| Tauri 命令 | 193 定义 / 104 invoke |
|
||||
| SaaS API | 137 .route() / 16 模块 / 38 SQL 迁移 / 42 表 |
|
||||
| 中间件 | 14 层 runtime + 10 层 SaaS HTTP |
|
||||
| SKILL / HAND | 75 技能目录 / 7 注册 Hand (6 TOML + _reminder) |
|
||||
| Pipeline | 18 YAML 模板 (8 目录) |
|
||||
| 前端 | 25 Store / 102 组件 / 75 lib / 17 Admin 页面 |
|
||||
| 前端 | 25 Store / 103 组件 / 78 lib / 17 Admin 页面 |
|
||||
| Intelligence | 16 .rs 文件 |
|
||||
| 质量指标 | 0 cargo warnings / 2 TODO/FIXME / 0 dead_code |
|
||||
|
||||
@@ -38,13 +38,13 @@ status: active
|
||||
|
||||
| 类别 | 功能 | 入口 | Wiki |
|
||||
|------|------|------|------|
|
||||
| 对话 | 发消息、流式响应、多模型切换 | 聊天面板 | [[chat]] |
|
||||
| 分身 | 创建/切换/配置 Agent | 侧边栏 Agent 列表 | [[chat]] |
|
||||
| 对话 | 发消息、流式响应、多模型切换、LLM 动态建议 | 聊天面板 | [[chat]] |
|
||||
| 分身 | 创建/切换/配置 Agent、跨会话身份记忆 (soul.md) | 侧边栏 Agent 列表 | [[chat]] |
|
||||
| 自主 | 触发 Browser/Collector/Twitter 等 | 自动化面板 | [[hands-skills]] |
|
||||
| 记忆 | 搜索历史、自动注入上下文 | 设置 > 语义记忆 | [[memory]] |
|
||||
| 记忆 | 搜索历史、自动注入上下文、身份信号提取 | 设置 > 语义记忆 | [[memory]] |
|
||||
| 配置 | 模型/API/工作区/安全存储 | 设置面板 (19 页) | [[development]] |
|
||||
| SaaS | 登录注册、订阅计费、Admin 管理 | SaaS 平台 / Admin 后台 | [[saas]] |
|
||||
| 管家 | 痛点积累、行业配置、简洁/专业模式 | 聊天面板 (默认模式) | [[butler]] |
|
||||
| 管家 | 痛点积累、行业配置、简洁/专业模式、跨会话身份、动态建议 | 聊天面板 (默认模式) | [[butler]] |
|
||||
| Pipeline | YAML 模板选择、配置、DAG 执行 | 工作流面板 | [[pipeline]] |
|
||||
| 安全 | JWT 认证、TOTP 2FA、操作审计 | 设置 > 安全存储 | [[security]] |
|
||||
| 数据 | PostgreSQL (42表) + SQLite/FTS5 (本地记忆) | — | [[data-model]] |
|
||||
@@ -97,5 +97,7 @@ ZCLAW
|
||||
| Agent 创建失败 | [[chat]] | [[saas]] | 权限或持久化问题 |
|
||||
| Pipeline 执行卡住 | [[pipeline]] | [[middleware]] | DAG 循环 / 依赖缺失 |
|
||||
| Admin 页面 403 | [[saas]] | [[security]] | JWT 过期 / admin_guard 拦截 |
|
||||
| Agent 名字不记住 | [[butler]] | [[memory]] | soul.md 写入失败 / identity signal 未提取 |
|
||||
| 建议不个性化 | [[chat]] | [[butler]] | 4路上下文超时 / ExperienceExtractor 未初始化 |
|
||||
|
||||
> 数字真相源: `docs/TRUTH.md` — 如有冲突以代码实际为准
|
||||
|
||||
@@ -9,6 +9,15 @@ tags: [log, history]
|
||||
|
||||
> Append-only 操作记录。格式: `## [日期] 类型 | 描述`
|
||||
|
||||
## [2026-04-24] perf | Hermes 高价值设计实施 Phase 1-4
|
||||
- **Phase 1**: Anthropic prompt caching — cache_control ephemeral + cache token tracking (CompletionResponse + StreamChunk)
|
||||
- **Phase 2A**: 并行工具执行 — ToolConcurrency 枚举 (ReadOnly/Exclusive/Interactive) + JoinSet + Semaphore(3) + AtomicU32
|
||||
- **Phase 2B**: 工具输出修剪 — prune_tool_outputs() (2000→500 chars) + 集成到 CompactionMiddleware
|
||||
- **Phase 3**: 错误分类+智能重试 — LlmErrorKind + ClassifiedLlmError + RetryDriver (jittered backoff) + CONTEXT_OVERFLOW recovery
|
||||
- **Phase 4**: 异步压缩+迭代摘要 — 30s 防抖 + cached fallback + previous_summary 迭代累积
|
||||
- **新增文件**: error_classifier.rs, retry_driver.rs
|
||||
- **验证**: 997 workspace tests PASS
|
||||
|
||||
## [2026-04-23] perf | 回复效率+建议生成并行化优化 (三部分)
|
||||
- **perf(src-tauri)**: identity prompt 缓存 (`LazyLock<RwLock<HashMap>>`) + `pre_conversation_hook` 并行化 (`tokio::join!`)
|
||||
- **perf(runtime)**: middleware `before_completion` 分波并行 — `parallel_safe()` trait + wave detection + `tokio::spawn`,5 层 safe 中间件可并行
|
||||
|
||||
Reference in New Issue
Block a user