Files
zclaw_openfang/crates/zclaw-runtime/src/loop_runner.rs
iven 5687dc20e0
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor(runtime): loop_runner 双路径合并 — 统一走 middleware chain (Phase 3A)
middleware_chain 从 Option<MiddlewareChain> 改为 MiddlewareChain:
- 移除 6 处 use_middleware 分支 + 2 处 legacy loop_guard inline path
- 移除 loop_guard field + Mutex import + circuit_breaker_triggered 变量
- 空 chain (Default) 行为等价于 middleware path 中的 no-op
- 1154行 → 1023行,净减 131 行
- cargo check --workspace ✓ | cargo test ✓ (排除 desktop 预存编译问题)
2026-04-17 21:56:10 +08:00

1024 lines
49 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Agent loop implementation
use std::sync::Arc;
use futures::StreamExt;
use tokio::sync::mpsc;
use zclaw_types::{AgentId, SessionId, Message, Result};
use crate::driver::{LlmDriver, CompletionRequest, ContentBlock};
use crate::stream::StreamChunk;
use crate::tool::{ToolRegistry, ToolContext, SkillExecutor};
use crate::tool::builtin::PathValidator;
use crate::growth::GrowthIntegration;
use crate::compaction::{self, CompactionConfig};
use crate::middleware::{self, MiddlewareChain};
use crate::prompt::{PromptBuilder, PromptContext};
use zclaw_memory::MemoryStore;
/// Agent loop runner
pub struct AgentLoop {
agent_id: AgentId,
driver: Arc<dyn LlmDriver>,
tools: ToolRegistry,
memory: Arc<MemoryStore>,
model: String,
system_prompt: Option<String>,
/// Custom agent personality for prompt assembly
soul: Option<String>,
max_tokens: u32,
temperature: f32,
skill_executor: Option<Arc<dyn SkillExecutor>>,
path_validator: Option<PathValidator>,
/// Growth system integration (optional)
growth: Option<GrowthIntegration>,
/// Compaction threshold in tokens (0 = disabled)
compaction_threshold: usize,
/// Compaction behavior configuration
compaction_config: CompactionConfig,
/// Middleware chain — cross-cutting concerns are delegated to the chain.
/// An empty chain (Default) is a no-op: all `run_*` methods return Continue/Allow.
middleware_chain: MiddlewareChain,
/// Chat mode: extended thinking enabled
thinking_enabled: bool,
/// Chat mode: reasoning effort level
reasoning_effort: Option<String>,
/// Chat mode: plan mode
plan_mode: bool,
}
impl AgentLoop {
pub fn new(
agent_id: AgentId,
driver: Arc<dyn LlmDriver>,
tools: ToolRegistry,
memory: Arc<MemoryStore>,
) -> Self {
Self {
agent_id,
driver,
tools,
memory,
model: String::new(), // Must be set via with_model()
system_prompt: None,
soul: None,
max_tokens: 16384,
temperature: 0.7,
skill_executor: None,
path_validator: None,
growth: None,
compaction_threshold: 0,
compaction_config: CompactionConfig::default(),
middleware_chain: MiddlewareChain::default(),
thinking_enabled: false,
reasoning_effort: None,
plan_mode: false,
}
}
/// Set the skill executor for tool execution
pub fn with_skill_executor(mut self, executor: Arc<dyn SkillExecutor>) -> Self {
self.skill_executor = Some(executor);
self
}
/// Set the path validator for file system operations
pub fn with_path_validator(mut self, validator: PathValidator) -> Self {
self.path_validator = Some(validator);
self
}
/// Set the model to use
pub fn with_model(mut self, model: impl Into<String>) -> Self {
self.model = model.into();
self
}
/// Set the system prompt
pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
self.system_prompt = Some(prompt.into());
self
}
/// Set the agent personality (SOUL.md equivalent)
pub fn with_soul(mut self, soul: impl Into<String>) -> Self {
self.soul = Some(soul.into());
self
}
/// Enable extended thinking/reasoning mode
pub fn with_thinking_enabled(mut self, enabled: bool) -> Self {
self.thinking_enabled = enabled;
self
}
/// Set reasoning effort level (low/medium/high)
pub fn with_reasoning_effort(mut self, effort: impl Into<String>) -> Self {
self.reasoning_effort = Some(effort.into());
self
}
/// Enable plan mode
pub fn with_plan_mode(mut self, enabled: bool) -> Self {
self.plan_mode = enabled;
self
}
/// Set max tokens
pub fn with_max_tokens(mut self, max_tokens: u32) -> Self {
self.max_tokens = max_tokens;
self
}
/// Set temperature
pub fn with_temperature(mut self, temperature: f32) -> Self {
self.temperature = temperature;
self
}
/// Enable growth system integration
pub fn with_growth(mut self, growth: GrowthIntegration) -> Self {
self.growth = Some(growth);
self
}
/// Set growth system (mutable)
pub fn set_growth(&mut self, growth: GrowthIntegration) {
self.growth = Some(growth);
}
/// Set compaction threshold in tokens (0 = disabled)
///
/// When the estimated token count of conversation history exceeds this
/// threshold, older messages are summarized into a single system message
/// and only recent messages are sent to the LLM.
pub fn with_compaction_threshold(mut self, threshold: usize) -> Self {
self.compaction_threshold = threshold;
self
}
/// Set compaction configuration (LLM mode, memory flushing, etc.)
pub fn with_compaction_config(mut self, config: CompactionConfig) -> Self {
self.compaction_config = config;
self
}
/// Inject a middleware chain. Cross-cutting concerns (compaction,
/// loop guard, token calibration, etc.) are delegated to the chain.
pub fn with_middleware_chain(mut self, chain: MiddlewareChain) -> Self {
self.middleware_chain = chain;
self
}
/// Get growth integration reference
pub fn growth(&self) -> Option<&GrowthIntegration> {
self.growth.as_ref()
}
/// Create tool context for tool execution
fn create_tool_context(&self, session_id: SessionId) -> ToolContext {
// If no path_validator is configured, create a default one with user home as workspace.
// This allows file_read/file_write tools to work without explicit workspace config,
// while still restricting access to the user's home directory for security.
let path_validator = self.path_validator.clone().unwrap_or_else(|| {
let home = std::env::var("USERPROFILE")
.or_else(|_| std::env::var("HOME"))
.unwrap_or_else(|_| ".".to_string());
let home_path = std::path::PathBuf::from(&home);
tracing::info!(
"[AgentLoop] No path_validator configured, using user home as workspace: {}",
home_path.display()
);
PathValidator::new().with_workspace(home_path)
});
let working_dir = path_validator.workspace_root()
.map(|p| p.to_string_lossy().to_string());
ToolContext {
agent_id: self.agent_id.clone(),
working_directory: working_dir,
session_id: Some(session_id.to_string()),
skill_executor: self.skill_executor.clone(),
path_validator: Some(path_validator),
event_sender: None,
}
}
/// Execute a tool with the given input
async fn execute_tool(&self, tool_name: &str, input: serde_json::Value, context: &ToolContext) -> Result<serde_json::Value> {
let tool = self.tools.get(tool_name)
.ok_or_else(|| zclaw_types::ZclawError::ToolError(format!("Unknown tool: {}", tool_name)))?;
tool.execute(input, context).await
}
/// Run the agent loop with a single message
/// Implements complete agent loop: LLM → Tool Call → Tool Result → LLM → Final Response
pub async fn run(&self, session_id: SessionId, input: String) -> Result<AgentLoopResult> {
// Add user message to session
let user_message = Message::user(input.clone());
self.memory.append_message(&session_id, &user_message).await?;
// Get all messages for context
let mut messages = self.memory.get_messages(&session_id).await?;
// Enhance system prompt via PromptBuilder (middleware may further modify)
let prompt_ctx = PromptContext {
base_prompt: self.system_prompt.clone(),
soul: self.soul.clone(),
thinking_enabled: self.thinking_enabled,
plan_mode: self.plan_mode,
tool_definitions: self.tools.definitions(),
agent_name: None,
};
let mut enhanced_prompt = PromptBuilder::new().build(&prompt_ctx);
// Run middleware before_completion hooks (compaction, memory inject, etc.)
{
let mut mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
user_input: input.clone(),
system_prompt: enhanced_prompt.clone(),
messages,
response_content: Vec::new(),
input_tokens: 0,
output_tokens: 0,
};
match self.middleware_chain.run_before_completion(&mut mw_ctx).await? {
middleware::MiddlewareDecision::Continue => {
messages = mw_ctx.messages;
enhanced_prompt = mw_ctx.system_prompt;
}
middleware::MiddlewareDecision::Stop(reason) => {
return Ok(AgentLoopResult {
response: reason,
input_tokens: 0,
output_tokens: 0,
iterations: 1,
});
}
}
}
let max_iterations = 10;
let mut iterations = 0;
let mut total_input_tokens = 0u32;
let mut total_output_tokens = 0u32;
let result = loop {
iterations += 1;
if iterations > max_iterations {
// Save the state before returning
let error_msg = "达到最大迭代次数,请简化请求";
self.memory.append_message(&session_id, &Message::assistant(error_msg)).await?;
break AgentLoopResult {
response: error_msg.to_string(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
};
}
// Build completion request
let request = CompletionRequest {
model: self.model.clone(),
system: Some(enhanced_prompt.clone()),
messages: messages.clone(),
tools: self.tools.definitions(),
max_tokens: Some(self.max_tokens),
temperature: Some(self.temperature),
stop: Vec::new(),
stream: false,
thinking_enabled: self.thinking_enabled,
reasoning_effort: self.reasoning_effort.clone(),
plan_mode: self.plan_mode,
};
// Call LLM
let response = self.driver.complete(request).await?;
total_input_tokens += response.input_tokens;
total_output_tokens += response.output_tokens;
// Calibrate token estimation on first iteration
if iterations == 1 {
compaction::update_calibration(
compaction::estimate_messages_tokens(&messages),
response.input_tokens,
);
}
// Extract tool calls from response
let tool_calls: Vec<(String, String, serde_json::Value)> = response.content.iter()
.filter_map(|block| match block {
ContentBlock::ToolUse { id, name, input } => Some((id.clone(), name.clone(), input.clone())),
_ => None,
})
.collect();
// Extract text and thinking separately
let text_parts: Vec<String> = response.content.iter()
.filter_map(|block| match block {
ContentBlock::Text { text } => Some(text.clone()),
_ => None,
})
.collect();
let thinking_parts: Vec<String> = response.content.iter()
.filter_map(|block| match block {
ContentBlock::Thinking { thinking } => Some(thinking.clone()),
_ => None,
})
.collect();
let text_content = text_parts.join("\n");
let thinking_content = if thinking_parts.is_empty() { None } else { Some(thinking_parts.join("")) };
// If no tool calls, we have the final response
if tool_calls.is_empty() {
// Save final assistant message with thinking
let msg = if let Some(thinking) = &thinking_content {
Message::assistant_with_thinking(&text_content, thinking)
} else {
Message::assistant(&text_content)
};
self.memory.append_message(&session_id, &msg).await?;
break AgentLoopResult {
response: text_content,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
};
}
// There are tool calls - push assistant message with thinking before tool calls
// (required by Kimi and other thinking-enabled APIs)
let assistant_msg = if let Some(thinking) = &thinking_content {
Message::assistant_with_thinking(&text_content, thinking)
} else {
Message::assistant(&text_content)
};
messages.push(assistant_msg);
for (id, name, input) in &tool_calls {
messages.push(Message::tool_use(id, zclaw_types::ToolId::new(name), input.clone()));
}
// Create tool context and execute all tools
let tool_context = self.create_tool_context(session_id.clone());
let mut abort_result: Option<AgentLoopResult> = None;
let mut clarification_result: Option<AgentLoopResult> = None;
for (id, name, input) in tool_calls {
// Check if loop was already aborted
if abort_result.is_some() {
break;
}
// Check tool call safety — via middleware chain
{
let mw_ctx_ref = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
user_input: input.to_string(),
system_prompt: enhanced_prompt.clone(),
messages: messages.clone(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
match self.middleware_chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? {
middleware::ToolCallDecision::Allow => {}
middleware::ToolCallDecision::Block(msg) => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
let error_output = serde_json::json!({ "error": msg });
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true));
continue;
}
middleware::ToolCallDecision::ReplaceInput(new_input) => {
// Execute with replaced input (with timeout)
let tool_result = match tokio::time::timeout(
std::time::Duration::from_secs(30),
self.execute_tool(&name, new_input, &tool_context),
).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
Err(_) => {
tracing::warn!("[AgentLoop] Tool '{}' (replaced input) timed out after 30s", name);
serde_json::json!({ "error": format!("工具 '{}' 执行超时30秒请重试", name) })
}
};
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), tool_result, false));
continue;
}
middleware::ToolCallDecision::AbortLoop(reason) => {
tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason);
let msg = format!("{}\n已自动终止", reason);
self.memory.append_message(&session_id, &Message::assistant(&msg)).await?;
abort_result = Some(AgentLoopResult {
response: msg,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
});
}
}
}
let tool_result = match tokio::time::timeout(
std::time::Duration::from_secs(30),
self.execute_tool(&name, input, &tool_context),
).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => serde_json::json!({ "error": e.to_string() }),
Err(_) => {
tracing::warn!("[AgentLoop] Tool '{}' timed out after 30s", name);
serde_json::json!({ "error": format!("工具 '{}' 执行超时30秒请重试", name) })
}
};
// Check if this is a clarification response — terminate loop immediately
// so the LLM waits for user input instead of continuing to generate.
if name == "ask_clarification"
&& tool_result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed")
{
tracing::info!("[AgentLoop] Clarification requested, terminating loop");
let question = tool_result.get("question")
.and_then(|v| v.as_str())
.unwrap_or("需要更多信息")
.to_string();
messages.push(Message::tool_result(
id,
zclaw_types::ToolId::new(&name),
tool_result,
false,
));
self.memory.append_message(&session_id, &Message::assistant(&question)).await?;
clarification_result = Some(AgentLoopResult {
response: question,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
});
break;
}
// Add tool result to messages
messages.push(Message::tool_result(
id,
zclaw_types::ToolId::new(&name),
tool_result,
false, // is_error - we include errors in the result itself
));
}
// Continue the loop - LLM will process tool results and generate final response
// If middleware aborted the loop, return immediately
if let Some(result) = abort_result {
break result;
}
// If clarification was requested, return immediately
if let Some(result) = clarification_result {
break result;
}
};
// Post-completion processing — middleware chain
{
let mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
user_input: input.clone(),
system_prompt: enhanced_prompt.clone(),
messages: self.memory.get_messages(&session_id).await.unwrap_or_default(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
if let Err(e) = self.middleware_chain.run_after_completion(&mw_ctx).await {
tracing::warn!("[AgentLoop] Middleware after_completion failed: {}", e);
}
}
Ok(result)
}
/// Run the agent loop with streaming
/// Implements complete agent loop with multi-turn tool calling support
pub async fn run_streaming(
&self,
session_id: SessionId,
input: String,
) -> Result<mpsc::Receiver<LoopEvent>> {
let (tx, rx) = mpsc::channel(100);
// Add user message to session
let user_message = Message::user(input.clone());
self.memory.append_message(&session_id, &user_message).await?;
// Get all messages for context
let mut messages = self.memory.get_messages(&session_id).await?;
// Enhance system prompt via PromptBuilder (middleware may further modify)
let prompt_ctx = PromptContext {
base_prompt: self.system_prompt.clone(),
soul: self.soul.clone(),
thinking_enabled: self.thinking_enabled,
plan_mode: self.plan_mode,
tool_definitions: self.tools.definitions(),
agent_name: None,
};
let mut enhanced_prompt = PromptBuilder::new().build(&prompt_ctx);
// Run middleware before_completion hooks (compaction, memory inject, etc.)
{
let mut mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
user_input: input.clone(),
system_prompt: enhanced_prompt.clone(),
messages,
response_content: Vec::new(),
input_tokens: 0,
output_tokens: 0,
};
match self.middleware_chain.run_before_completion(&mut mw_ctx).await? {
middleware::MiddlewareDecision::Continue => {
messages = mw_ctx.messages;
enhanced_prompt = mw_ctx.system_prompt;
}
middleware::MiddlewareDecision::Stop(reason) => {
if let Err(e) = tx.send(LoopEvent::Complete(AgentLoopResult {
response: reason,
input_tokens: 0,
output_tokens: 0,
iterations: 1,
})).await {
tracing::warn!("[AgentLoop] Failed to send Complete event: {}", e);
}
return Ok(rx);
}
}
}
// Clone necessary data for the async task
let session_id_clone = session_id.clone();
let memory = self.memory.clone();
let driver = self.driver.clone();
let tools = self.tools.clone();
let middleware_chain = self.middleware_chain.clone();
let skill_executor = self.skill_executor.clone();
let path_validator = self.path_validator.clone();
let agent_id = self.agent_id.clone();
let model = self.model.clone();
let max_tokens = self.max_tokens;
let temperature = self.temperature;
let thinking_enabled = self.thinking_enabled;
let reasoning_effort = self.reasoning_effort.clone();
let plan_mode = self.plan_mode;
tokio::spawn(async move {
let mut messages = messages;
let max_iterations = 10;
let mut iteration = 0;
let mut total_input_tokens = 0u32;
let mut total_output_tokens = 0u32;
'outer: loop {
iteration += 1;
if iteration > max_iterations {
if let Err(e) = tx.send(LoopEvent::Error("达到最大迭代次数".to_string())).await {
tracing::warn!("[AgentLoop] Failed to send Error event: {}", e);
}
break;
}
// Notify iteration start
if let Err(e) = tx.send(LoopEvent::IterationStart {
iteration,
max_iterations,
}).await {
tracing::warn!("[AgentLoop] Failed to send IterationStart event: {}", e);
}
// Build completion request
let request = CompletionRequest {
model: model.clone(),
system: Some(enhanced_prompt.clone()),
messages: messages.clone(),
tools: tools.definitions(),
max_tokens: Some(max_tokens),
temperature: Some(temperature),
stop: Vec::new(),
stream: true,
thinking_enabled,
reasoning_effort: reasoning_effort.clone(),
plan_mode,
};
let mut stream = driver.stream(request);
let mut pending_tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new();
let mut iteration_text = String::new();
let mut reasoning_text = String::new(); // Track reasoning separately for API requirement
// Process stream chunks
tracing::debug!("[AgentLoop] Starting to process stream chunks");
let mut chunk_count: usize = 0;
let mut text_delta_count: usize = 0;
let mut thinking_delta_count: usize = 0;
let mut stream_errored = false;
// 180s per-chunk timeout — thinking models (Kimi, DeepSeek R1) can have
// long gaps between reasoning_content and content phases (observed: ~60s).
// The SaaS relay sends SSE heartbeat comments during idle periods, but these
// are filtered out by the OpenAI driver and don't yield StreamChunks.
let chunk_timeout = std::time::Duration::from_secs(180);
loop {
match tokio::time::timeout(chunk_timeout, stream.next()).await {
Ok(Some(Ok(chunk))) => {
chunk_count += 1;
match &chunk {
StreamChunk::TextDelta { delta } => {
text_delta_count += 1;
tracing::debug!("[AgentLoop] TextDelta #{}: {} chars", text_delta_count, delta.len());
iteration_text.push_str(delta);
if let Err(e) = tx.send(LoopEvent::Delta(delta.clone())).await {
tracing::warn!("[AgentLoop] Failed to send Delta event: {}", e);
}
}
StreamChunk::ThinkingDelta { delta } => {
thinking_delta_count += 1;
tracing::debug!("[AgentLoop] ThinkingDelta #{}: {} chars", thinking_delta_count, delta.len());
reasoning_text.push_str(delta);
if let Err(e) = tx.send(LoopEvent::ThinkingDelta(delta.clone())).await {
tracing::warn!("[AgentLoop] Failed to send ThinkingDelta event: {}", e);
}
}
StreamChunk::ToolUseStart { id, name } => {
tracing::debug!("[AgentLoop] ToolUseStart: id={}, name={}", id, name);
pending_tool_calls.push((id.clone(), name.clone(), serde_json::Value::Null));
}
StreamChunk::ToolUseDelta { id, delta } => {
// Accumulate tool input delta (internal processing, not sent to user)
if let Some(tool) = pending_tool_calls.iter_mut().find(|(tid, _, _)| tid == id) {
// Try to accumulate JSON string
match &mut tool.2 {
serde_json::Value::String(s) => s.push_str(delta),
serde_json::Value::Null => tool.2 = serde_json::Value::String(delta.clone()),
_ => {}
}
}
}
StreamChunk::ToolUseEnd { id, input } => {
tracing::debug!("[AgentLoop] ToolUseEnd: id={}, input={:?}", id, input);
// Update with final parsed input and emit ToolStart event
if let Some(tool) = pending_tool_calls.iter_mut().find(|(tid, _, _)| tid == id) {
tool.2 = input.clone();
if let Err(e) = tx.send(LoopEvent::ToolStart { name: tool.1.clone(), input: input.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolStart event: {}", e);
}
}
}
StreamChunk::Complete { input_tokens: it, output_tokens: ot, .. } => {
tracing::debug!("[AgentLoop] Stream complete: input_tokens={}, output_tokens={}", it, ot);
total_input_tokens += *it;
total_output_tokens += *ot;
// Calibrate token estimation on first iteration
if iteration == 1 {
compaction::update_calibration(
compaction::estimate_messages_tokens(&messages),
*it,
);
}
}
StreamChunk::Error { message } => {
tracing::error!("[AgentLoop] Stream error: {}", message);
if let Err(e) = tx.send(LoopEvent::Error(message.clone())).await {
tracing::warn!("[AgentLoop] Failed to send Error event: {}", e);
}
stream_errored = true;
}
}
}
Ok(Some(Err(e))) => {
tracing::error!("[AgentLoop] Chunk error: {}", e);
if let Err(e) = tx.send(LoopEvent::Error(format!("LLM 响应错误: {}", e.to_string()))).await {
tracing::warn!("[AgentLoop] Failed to send Error event: {}", e);
}
stream_errored = true;
}
Ok(None) => break, // Stream ended normally
Err(_) => {
tracing::error!("[AgentLoop] Stream chunk timeout ({}s)", chunk_timeout.as_secs());
if let Err(e) = tx.send(LoopEvent::Error("LLM 响应超时,请重试".to_string())).await {
tracing::warn!("[AgentLoop] Failed to send Error event: {}", e);
}
stream_errored = true;
}
}
if stream_errored {
break;
}
}
tracing::info!("[AgentLoop] Stream ended: {} total chunks (text={}, thinking={}, tools={}), iteration_text={} chars",
chunk_count, text_delta_count, thinking_delta_count, pending_tool_calls.len(),
iteration_text.len());
// Fallback: if model generated reasoning but no text content,
// use reasoning as text response. This happens with some thinking models
// (DeepSeek R1, QWQ) that put the answer in reasoning_content instead of content.
// Safe now because: (1) context is clean (no stale user_profile/memory injection),
// (2) max_tokens=16384 prevents truncation, (3) reasoning is about the correct topic.
if iteration_text.is_empty() && !reasoning_text.is_empty() {
tracing::info!("[AgentLoop] Model generated {} chars of reasoning but no text — using reasoning as response",
reasoning_text.len());
if let Err(e) = tx.send(LoopEvent::Delta(reasoning_text.clone())).await {
tracing::warn!("[AgentLoop] Failed to send Delta event: {}", e);
}
iteration_text = reasoning_text.clone();
} else if iteration_text.is_empty() {
tracing::warn!("[AgentLoop] No text content after {} chunks (thinking_delta={})",
chunk_count, thinking_delta_count);
}
// If no tool calls, we have the final response
if pending_tool_calls.is_empty() {
tracing::info!("[AgentLoop] No tool calls, returning final response: {} chars (reasoning: {} chars)", iteration_text.len(), reasoning_text.len());
// Save final assistant message with reasoning
if let Err(e) = memory.append_message(&session_id_clone, &Message::assistant_with_thinking(
&iteration_text,
&reasoning_text,
)).await {
tracing::warn!("[AgentLoop] Failed to save final assistant message: {}", e);
}
if let Err(e) = tx.send(LoopEvent::Complete(AgentLoopResult {
response: iteration_text.clone(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations: iteration,
})).await {
tracing::warn!("[AgentLoop] Failed to send Complete event: {}", e);
}
// Post-completion: middleware after_completion (memory extraction, etc.)
{
let mw_ctx = middleware::MiddlewareContext {
agent_id: agent_id.clone(),
session_id: session_id_clone.clone(),
user_input: String::new(),
system_prompt: enhanced_prompt.clone(),
messages: memory.get_messages(&session_id_clone).await.unwrap_or_default(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
if let Err(e) = middleware_chain.run_after_completion(&mw_ctx).await {
tracing::warn!("[AgentLoop] Streaming middleware after_completion failed: {}", e);
}
}
break 'outer;
}
// Skip tool processing if stream errored or timed out
if stream_errored {
tracing::debug!("[AgentLoop] Stream errored, skipping tool processing and breaking");
break 'outer;
}
tracing::debug!("[AgentLoop] Processing {} tool calls (reasoning: {} chars)", pending_tool_calls.len(), reasoning_text.len());
// Push assistant message with reasoning before tool calls (required by Kimi and other thinking-enabled APIs)
messages.push(Message::assistant_with_thinking(
&iteration_text,
&reasoning_text,
));
// There are tool calls - add to message history
for (id, name, input) in &pending_tool_calls {
tracing::debug!("[AgentLoop] Adding tool_use to history: id={}, name={}, input={:?}", id, name, input);
messages.push(Message::tool_use(id, zclaw_types::ToolId::new(name), input.clone()));
}
// Execute tools
for (id, name, input) in pending_tool_calls {
tracing::debug!("[AgentLoop] Executing tool: name={}, input={:?}", name, input);
// Check tool call safety — via middleware chain
{
let mw_ctx = middleware::MiddlewareContext {
agent_id: agent_id.clone(),
session_id: session_id_clone.clone(),
user_input: input.to_string(),
system_prompt: enhanced_prompt.clone(),
messages: messages.clone(),
response_content: Vec::new(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
match middleware_chain.run_before_tool_call(&mw_ctx, &name, &input).await {
Ok(middleware::ToolCallDecision::Allow) => {}
Ok(middleware::ToolCallDecision::Block(msg)) => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
let error_output = serde_json::json!({ "error": msg });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true));
continue;
}
Ok(middleware::ToolCallDecision::AbortLoop(reason)) => {
tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason);
if let Err(e) = tx.send(LoopEvent::Error(reason)).await {
tracing::warn!("[AgentLoop] Failed to send Error event: {}", e);
}
break 'outer;
}
Ok(middleware::ToolCallDecision::ReplaceInput(new_input)) => {
// Execute with replaced input (same path_validator logic below)
let pv = path_validator.clone().unwrap_or_else(|| {
let home = std::env::var("USERPROFILE")
.or_else(|_| std::env::var("HOME"))
.unwrap_or_else(|_| ".".to_string());
PathValidator::new().with_workspace(std::path::PathBuf::from(&home))
});
let working_dir = pv.workspace_root()
.map(|p| p.to_string_lossy().to_string());
let tool_context = ToolContext {
agent_id: agent_id.clone(),
working_directory: working_dir,
session_id: Some(session_id_clone.to_string()),
skill_executor: skill_executor.clone(),
path_validator: Some(pv),
event_sender: Some(tx.clone()),
};
let (result, is_error) = if let Some(tool) = tools.get(&name) {
match tool.execute(new_input, &tool_context).await {
Ok(output) => {
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
(output, false)
}
Err(e) => {
let error_output = serde_json::json!({ "error": e.to_string() });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
(error_output, true)
}
}
} else {
let error_output = serde_json::json!({ "error": format!("Unknown tool: {}", name) });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
(error_output, true)
};
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), result, is_error));
continue;
}
Err(e) => {
tracing::error!("[AgentLoop] Middleware error for tool '{}': {}", name, e);
let error_output = serde_json::json!({ "error": e.to_string() });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true));
continue;
}
}
}
// Use pre-resolved path_validator (already has default fallback from create_tool_context logic)
let pv = path_validator.clone().unwrap_or_else(|| {
let home = std::env::var("USERPROFILE")
.or_else(|_| std::env::var("HOME"))
.unwrap_or_else(|_| ".".to_string());
PathValidator::new().with_workspace(std::path::PathBuf::from(&home))
});
let working_dir = pv.workspace_root()
.map(|p| p.to_string_lossy().to_string());
let tool_context = ToolContext {
agent_id: agent_id.clone(),
working_directory: working_dir,
session_id: Some(session_id_clone.to_string()),
skill_executor: skill_executor.clone(),
path_validator: Some(pv),
event_sender: Some(tx.clone()),
};
let (result, is_error) = if let Some(tool) = tools.get(&name) {
tracing::debug!("[AgentLoop] Tool '{}' found, executing...", name);
match tool.execute(input.clone(), &tool_context).await {
Ok(output) => {
tracing::debug!("[AgentLoop] Tool '{}' executed successfully: {:?}", name, output);
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
(output, false)
}
Err(e) => {
tracing::error!("[AgentLoop] Tool '{}' execution failed: {}", name, e);
let error_output = serde_json::json!({ "error": e.to_string() });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
(error_output, true)
}
}
} else {
tracing::error!("[AgentLoop] Tool '{}' not found in registry", name);
let error_output = serde_json::json!({ "error": format!("Unknown tool: {}", name) });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
(error_output, true)
};
// Check if this is a clarification response — break outer loop
if name == "ask_clarification"
&& result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed")
{
tracing::info!("[AgentLoop] Streaming: Clarification requested, terminating loop");
let question = result.get("question")
.and_then(|v| v.as_str())
.unwrap_or("需要更多信息")
.to_string();
messages.push(Message::tool_result(
id,
zclaw_types::ToolId::new(&name),
result,
is_error,
));
// Send the question as final delta so the user sees it
if let Err(e) = tx.send(LoopEvent::Delta(question.clone())).await {
tracing::warn!("[AgentLoop] Failed to send Delta event: {}", e);
}
if let Err(e) = tx.send(LoopEvent::Complete(AgentLoopResult {
response: question.clone(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations: iteration,
})).await {
tracing::warn!("[AgentLoop] Failed to send Complete event: {}", e);
}
if let Err(e) = memory.append_message(&session_id_clone, &Message::assistant(&question)).await {
tracing::warn!("[AgentLoop] Failed to save clarification message: {}", e);
}
break 'outer;
}
// Add tool result to message history
tracing::debug!("[AgentLoop] Adding tool_result to history: id={}, name={}, is_error={}", id, name, is_error);
messages.push(Message::tool_result(
id,
zclaw_types::ToolId::new(&name),
result,
is_error,
));
}
tracing::debug!("[AgentLoop] Continuing to next iteration for LLM to process tool results");
// Continue loop - next iteration will call LLM with tool results
}
});
Ok(rx)
}
}
/// Result of an agent loop execution
#[derive(Debug, Clone)]
pub struct AgentLoopResult {
pub response: String,
pub input_tokens: u32,
pub output_tokens: u32,
pub iterations: usize,
}
/// Events emitted during streaming
#[derive(Debug, Clone)]
pub enum LoopEvent {
/// Text delta from LLM
Delta(String),
/// Thinking/reasoning delta from LLM (extended thinking)
ThinkingDelta(String),
/// Tool execution started
ToolStart { name: String, input: serde_json::Value },
/// Tool execution completed
ToolEnd { name: String, output: serde_json::Value },
/// Sub-agent task status update (started/running/completed/failed)
SubtaskStatus {
task_id: String,
description: String,
status: String,
detail: Option<String>,
},
/// New iteration started (multi-turn tool calling)
IterationStart { iteration: usize, max_iterations: usize },
/// Loop completed with final result
Complete(AgentLoopResult),
/// Error occurred
Error(String),
}