fix(runtime): 工具调用 P0 修复 — after_tool_call 接入 + stream_errored 工具抢救
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

P0-1: after_tool_call 中间件从未被调用
- 流式模式(run_streaming)和非流式模式(run)均添加 middleware_chain.run_after_tool_call()
- ToolErrorMiddleware 错误计数恢复逻辑现在生效
- ToolOutputGuardMiddleware 敏感信息检测现在生效

P0-2: stream_errored 跳过所有工具执行
- 新增 completed_tool_ids 跟踪哪些工具已收到完整 ToolUseEnd
- 流式错误时区分完整工具和不完整工具
- 完整工具照常执行(产物创建等不受影响)
- 不完整工具发送取消 ToolEnd 事件(前端不再卡"执行中")
- 工具执行后若 stream_errored,break outer 阻止无效 LLM 循环

参考文档:
- docs/references/zclaw-toolcall-issues.md (10项问题分析)
- docs/references/deerflow-toolcall-reference.md (DeerFlow工具调用完整参考)
This commit is contained in:
iven
2026-04-24 12:20:14 +08:00
parent 4c31471cd6
commit c12b64150b
5 changed files with 412 additions and 4 deletions

View File

@@ -552,6 +552,20 @@ impl AgentLoop {
sorted_indices.sort();
for idx in sorted_indices {
let (id, name, result) = results.remove(&idx).unwrap();
// Run after_tool_call middleware (error counting, output guard, etc.)
let mut mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
user_input: String::new(),
system_prompt: enhanced_prompt.clone(),
messages: messages.clone(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
if let Err(e) = self.middleware_chain.run_after_tool_call(&mut mw_ctx, &name, &result).await {
tracing::warn!("[AgentLoop] after_tool_call middleware failed for '{}': {}", name, e);
}
messages.push(Message::tool_result(&id, zclaw_types::ToolId::new(&name), result, false));
}
}
@@ -706,6 +720,7 @@ impl AgentLoop {
let mut stream = driver.stream(request);
let mut pending_tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new();
let mut completed_tool_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut iteration_text = String::new();
let mut reasoning_text = String::new(); // Track reasoning separately for API requirement
@@ -762,6 +777,7 @@ impl AgentLoop {
// Update with final parsed input and emit ToolStart event
if let Some(tool) = pending_tool_calls.iter_mut().find(|(tid, _, _)| tid == id) {
tool.2 = input.clone();
completed_tool_ids.insert(id.clone());
if let Err(e) = tx.send(LoopEvent::ToolStart { name: tool.1.clone(), input: input.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolStart event: {}", e);
}
@@ -869,10 +885,26 @@ impl AgentLoop {
break 'outer;
}
// Skip tool processing if stream errored or timed out
// Handle stream errors — execute complete tool calls, cancel incomplete ones
if stream_errored {
tracing::debug!("[AgentLoop] Stream errored, skipping tool processing and breaking");
break 'outer;
// Cancel incomplete tools (ToolStart sent but ToolUseEnd not received)
let incomplete: Vec<_> = pending_tool_calls.iter()
.filter(|(id, _, _)| !completed_tool_ids.contains(id))
.collect();
for (_, name, _) in &incomplete {
tracing::warn!("[AgentLoop] Cancelling incomplete tool '{}' due to stream error", name);
let error_output = serde_json::json!({ "error": "流式响应中断,工具调用未完成" });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output }).await {
tracing::warn!("[AgentLoop] Failed to send cancellation ToolEnd event: {}", e);
}
}
// Retain only complete tools for execution
pending_tool_calls.retain(|(id, _, _)| completed_tool_ids.contains(id));
if pending_tool_calls.is_empty() {
tracing::debug!("[AgentLoop] Stream errored with no complete tool calls, breaking");
break 'outer;
}
tracing::info!("[AgentLoop] Stream errored but executing {} complete tool calls", pending_tool_calls.len());
}
tracing::debug!("[AgentLoop] Processing {} tool calls (reasoning: {} chars)", pending_tool_calls.len(), reasoning_text.len());
@@ -1059,6 +1091,23 @@ impl AgentLoop {
break 'outer;
}
// Run after_tool_call middleware chain (error counting, output guard, etc.)
{
let mut mw_ctx = middleware::MiddlewareContext {
agent_id: agent_id.clone(),
session_id: session_id_clone.clone(),
user_input: String::new(),
system_prompt: enhanced_prompt.clone(),
messages: messages.clone(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
if let Err(e) = middleware_chain.run_after_tool_call(&mut mw_ctx, &name, &result).await {
tracing::warn!("[AgentLoop] after_tool_call middleware failed for '{}': {}", name, e);
}
}
// Add tool result to message history
tracing::debug!("[AgentLoop] Adding tool_result to history: id={}, name={}, is_error={}", id, name, is_error);
messages.push(Message::tool_result(
@@ -1070,6 +1119,11 @@ impl AgentLoop {
}
tracing::debug!("[AgentLoop] Continuing to next iteration for LLM to process tool results");
// If stream errored, we executed complete tools but cannot continue the LLM loop
if stream_errored {
tracing::info!("[AgentLoop] Stream was errored — executed salvageable tools, now breaking");
break 'outer;
}
// Continue loop - next iteration will call LLM with tool results
}
});