//! Agent loop implementation use std::sync::Arc; use std::sync::Mutex; use futures::StreamExt; use tokio::sync::mpsc; use zclaw_types::{AgentId, SessionId, Message, Result}; use crate::driver::{LlmDriver, CompletionRequest, ContentBlock}; use crate::stream::StreamChunk; use crate::tool::{ToolRegistry, ToolContext, SkillExecutor}; use crate::tool::builtin::PathValidator; use crate::loop_guard::{LoopGuard, LoopGuardResult}; use crate::growth::GrowthIntegration; use crate::compaction::{self, CompactionConfig}; use crate::middleware::{self, MiddlewareChain}; use crate::prompt::{PromptBuilder, PromptContext}; use zclaw_memory::MemoryStore; /// Agent loop runner pub struct AgentLoop { agent_id: AgentId, driver: Arc, tools: ToolRegistry, memory: Arc, loop_guard: Mutex, model: String, system_prompt: Option, /// Custom agent personality for prompt assembly soul: Option, max_tokens: u32, temperature: f32, skill_executor: Option>, path_validator: Option, /// Growth system integration (optional) growth: Option, /// Compaction threshold in tokens (0 = disabled) compaction_threshold: usize, /// Compaction behavior configuration compaction_config: CompactionConfig, /// Optional middleware chain — when `Some`, cross-cutting logic is /// delegated to the chain instead of the inline code below. /// When `None`, the legacy inline path is used (100% backward compatible). middleware_chain: Option, /// Chat mode: extended thinking enabled thinking_enabled: bool, /// Chat mode: reasoning effort level reasoning_effort: Option, /// Chat mode: plan mode plan_mode: bool, } impl AgentLoop { pub fn new( agent_id: AgentId, driver: Arc, tools: ToolRegistry, memory: Arc, ) -> Self { Self { agent_id, driver, tools, memory, loop_guard: Mutex::new(LoopGuard::default()), model: String::new(), // Must be set via with_model() system_prompt: None, soul: None, max_tokens: 16384, temperature: 0.7, skill_executor: None, path_validator: None, growth: None, compaction_threshold: 0, compaction_config: CompactionConfig::default(), middleware_chain: None, thinking_enabled: false, reasoning_effort: None, plan_mode: false, } } /// Set the skill executor for tool execution pub fn with_skill_executor(mut self, executor: Arc) -> Self { self.skill_executor = Some(executor); self } /// Set the path validator for file system operations pub fn with_path_validator(mut self, validator: PathValidator) -> Self { self.path_validator = Some(validator); self } /// Set the model to use pub fn with_model(mut self, model: impl Into) -> Self { self.model = model.into(); self } /// Set the system prompt pub fn with_system_prompt(mut self, prompt: impl Into) -> Self { self.system_prompt = Some(prompt.into()); self } /// Set the agent personality (SOUL.md equivalent) pub fn with_soul(mut self, soul: impl Into) -> Self { self.soul = Some(soul.into()); self } /// Enable extended thinking/reasoning mode pub fn with_thinking_enabled(mut self, enabled: bool) -> Self { self.thinking_enabled = enabled; self } /// Set reasoning effort level (low/medium/high) pub fn with_reasoning_effort(mut self, effort: impl Into) -> Self { self.reasoning_effort = Some(effort.into()); self } /// Enable plan mode pub fn with_plan_mode(mut self, enabled: bool) -> Self { self.plan_mode = enabled; self } /// Set max tokens pub fn with_max_tokens(mut self, max_tokens: u32) -> Self { self.max_tokens = max_tokens; self } /// Set temperature pub fn with_temperature(mut self, temperature: f32) -> Self { self.temperature = temperature; self } /// Enable growth system integration pub fn with_growth(mut self, growth: GrowthIntegration) -> Self { self.growth = Some(growth); self } /// Set growth system (mutable) pub fn set_growth(&mut self, growth: GrowthIntegration) { self.growth = Some(growth); } /// Set compaction threshold in tokens (0 = disabled) /// /// When the estimated token count of conversation history exceeds this /// threshold, older messages are summarized into a single system message /// and only recent messages are sent to the LLM. pub fn with_compaction_threshold(mut self, threshold: usize) -> Self { self.compaction_threshold = threshold; self } /// Set compaction configuration (LLM mode, memory flushing, etc.) pub fn with_compaction_config(mut self, config: CompactionConfig) -> Self { self.compaction_config = config; self } /// Inject a middleware chain. When set, cross-cutting concerns (compaction, /// loop guard, token calibration, etc.) are delegated to the chain instead /// of the inline logic. pub fn with_middleware_chain(mut self, chain: MiddlewareChain) -> Self { self.middleware_chain = Some(chain); self } /// Get growth integration reference pub fn growth(&self) -> Option<&GrowthIntegration> { self.growth.as_ref() } /// Create tool context for tool execution fn create_tool_context(&self, session_id: SessionId) -> ToolContext { // If no path_validator is configured, create a default one with user home as workspace. // This allows file_read/file_write tools to work without explicit workspace config, // while still restricting access to the user's home directory for security. let path_validator = self.path_validator.clone().unwrap_or_else(|| { let home = std::env::var("USERPROFILE") .or_else(|_| std::env::var("HOME")) .unwrap_or_else(|_| ".".to_string()); let home_path = std::path::PathBuf::from(&home); tracing::info!( "[AgentLoop] No path_validator configured, using user home as workspace: {}", home_path.display() ); PathValidator::new().with_workspace(home_path) }); let working_dir = path_validator.workspace_root() .map(|p| p.to_string_lossy().to_string()); ToolContext { agent_id: self.agent_id.clone(), working_directory: working_dir, session_id: Some(session_id.to_string()), skill_executor: self.skill_executor.clone(), path_validator: Some(path_validator), event_sender: None, } } /// Execute a tool with the given input async fn execute_tool(&self, tool_name: &str, input: serde_json::Value, context: &ToolContext) -> Result { let tool = self.tools.get(tool_name) .ok_or_else(|| zclaw_types::ZclawError::ToolError(format!("Unknown tool: {}", tool_name)))?; tool.execute(input, context).await } /// Run the agent loop with a single message /// Implements complete agent loop: LLM → Tool Call → Tool Result → LLM → Final Response pub async fn run(&self, session_id: SessionId, input: String) -> Result { // Add user message to session let user_message = Message::user(input.clone()); self.memory.append_message(&session_id, &user_message).await?; // Get all messages for context let mut messages = self.memory.get_messages(&session_id).await?; let use_middleware = self.middleware_chain.is_some(); // Apply compaction — skip inline path when middleware chain handles it if !use_middleware && self.compaction_threshold > 0 { let needs_async = self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled; if needs_async { let outcome = compaction::maybe_compact_with_config( messages, self.compaction_threshold, &self.compaction_config, &self.agent_id, &session_id, Some(&self.driver), self.growth.as_ref(), ) .await; messages = outcome.messages; } else { messages = compaction::maybe_compact(messages, self.compaction_threshold); } } // Enhance system prompt — skip when middleware chain handles it let mut enhanced_prompt = if use_middleware { let prompt_ctx = PromptContext { base_prompt: self.system_prompt.clone(), soul: self.soul.clone(), thinking_enabled: self.thinking_enabled, plan_mode: self.plan_mode, tool_definitions: self.tools.definitions(), agent_name: None, }; PromptBuilder::new().build(&prompt_ctx) } else if let Some(ref growth) = self.growth { let base = self.system_prompt.as_deref().unwrap_or(""); growth.enhance_prompt(&self.agent_id, base, &input).await? } else { self.system_prompt.clone().unwrap_or_default() }; // Run middleware before_completion hooks (compaction, memory inject, etc.) if let Some(ref chain) = self.middleware_chain { let mut mw_ctx = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), user_input: input.clone(), system_prompt: enhanced_prompt.clone(), messages, response_content: Vec::new(), input_tokens: 0, output_tokens: 0, }; match chain.run_before_completion(&mut mw_ctx).await? { middleware::MiddlewareDecision::Continue => { messages = mw_ctx.messages; enhanced_prompt = mw_ctx.system_prompt; } middleware::MiddlewareDecision::Stop(reason) => { return Ok(AgentLoopResult { response: reason, input_tokens: 0, output_tokens: 0, iterations: 1, }); } } } let max_iterations = 10; let mut iterations = 0; let mut total_input_tokens = 0u32; let mut total_output_tokens = 0u32; let result = loop { iterations += 1; if iterations > max_iterations { // Save the state before returning let error_msg = "达到最大迭代次数,请简化请求"; self.memory.append_message(&session_id, &Message::assistant(error_msg)).await?; break AgentLoopResult { response: error_msg.to_string(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }; } // Build completion request let request = CompletionRequest { model: self.model.clone(), system: Some(enhanced_prompt.clone()), messages: messages.clone(), tools: self.tools.definitions(), max_tokens: Some(self.max_tokens), temperature: Some(self.temperature), stop: Vec::new(), stream: false, thinking_enabled: self.thinking_enabled, reasoning_effort: self.reasoning_effort.clone(), plan_mode: self.plan_mode, }; // Call LLM let response = self.driver.complete(request).await?; total_input_tokens += response.input_tokens; total_output_tokens += response.output_tokens; // Calibrate token estimation on first iteration if iterations == 1 { compaction::update_calibration( compaction::estimate_messages_tokens(&messages), response.input_tokens, ); } // Extract tool calls from response let tool_calls: Vec<(String, String, serde_json::Value)> = response.content.iter() .filter_map(|block| match block { ContentBlock::ToolUse { id, name, input } => Some((id.clone(), name.clone(), input.clone())), _ => None, }) .collect(); // Extract text and thinking separately let text_parts: Vec = response.content.iter() .filter_map(|block| match block { ContentBlock::Text { text } => Some(text.clone()), _ => None, }) .collect(); let thinking_parts: Vec = response.content.iter() .filter_map(|block| match block { ContentBlock::Thinking { thinking } => Some(thinking.clone()), _ => None, }) .collect(); let text_content = text_parts.join("\n"); let thinking_content = if thinking_parts.is_empty() { None } else { Some(thinking_parts.join("")) }; // If no tool calls, we have the final response if tool_calls.is_empty() { // Save final assistant message with thinking let msg = if let Some(thinking) = &thinking_content { Message::assistant_with_thinking(&text_content, thinking) } else { Message::assistant(&text_content) }; self.memory.append_message(&session_id, &msg).await?; break AgentLoopResult { response: text_content, input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }; } // There are tool calls - push assistant message with thinking before tool calls // (required by Kimi and other thinking-enabled APIs) let assistant_msg = if let Some(thinking) = &thinking_content { Message::assistant_with_thinking(&text_content, thinking) } else { Message::assistant(&text_content) }; messages.push(assistant_msg); for (id, name, input) in &tool_calls { messages.push(Message::tool_use(id, zclaw_types::ToolId::new(name), input.clone())); } // Create tool context and execute all tools let tool_context = self.create_tool_context(session_id.clone()); let mut circuit_breaker_triggered = false; let mut abort_result: Option = None; let mut clarification_result: Option = None; for (id, name, input) in tool_calls { // Check if loop was already aborted if abort_result.is_some() { break; } // Check tool call safety — via middleware chain or inline loop guard if let Some(ref chain) = self.middleware_chain { let mw_ctx_ref = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), user_input: input.to_string(), system_prompt: enhanced_prompt.clone(), messages: messages.clone(), response_content: Vec::new(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; match chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? { middleware::ToolCallDecision::Allow => {} middleware::ToolCallDecision::Block(msg) => { tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg); let error_output = serde_json::json!({ "error": msg }); messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); continue; } middleware::ToolCallDecision::ReplaceInput(new_input) => { // Execute with replaced input let tool_result = match self.execute_tool(&name, new_input, &tool_context).await { Ok(result) => result, Err(e) => serde_json::json!({ "error": e.to_string() }), }; messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), tool_result, false)); continue; } middleware::ToolCallDecision::AbortLoop(reason) => { tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason); let msg = format!("{}\n已自动终止", reason); self.memory.append_message(&session_id, &Message::assistant(&msg)).await?; abort_result = Some(AgentLoopResult { response: msg, input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }); } } } else { // Legacy inline path let guard_result = self.loop_guard.lock().unwrap_or_else(|e| e.into_inner()).check(&name, &input); match guard_result { LoopGuardResult::CircuitBreaker => { tracing::warn!("[AgentLoop] Circuit breaker triggered by tool '{}'", name); circuit_breaker_triggered = true; break; } LoopGuardResult::Blocked => { tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name); let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" }); messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); continue; } LoopGuardResult::Warn => { tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name); } LoopGuardResult::Allowed => {} } } let tool_result = match self.execute_tool(&name, input, &tool_context).await { Ok(result) => result, Err(e) => serde_json::json!({ "error": e.to_string() }), }; // Check if this is a clarification response — terminate loop immediately // so the LLM waits for user input instead of continuing to generate. if name == "ask_clarification" && tool_result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed") { tracing::info!("[AgentLoop] Clarification requested, terminating loop"); let question = tool_result.get("question") .and_then(|v| v.as_str()) .unwrap_or("需要更多信息") .to_string(); messages.push(Message::tool_result( id, zclaw_types::ToolId::new(&name), tool_result, false, )); self.memory.append_message(&session_id, &Message::assistant(&question)).await?; clarification_result = Some(AgentLoopResult { response: question, input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }); break; } // Add tool result to messages messages.push(Message::tool_result( id, zclaw_types::ToolId::new(&name), tool_result, false, // is_error - we include errors in the result itself )); } // Continue the loop - LLM will process tool results and generate final response // If middleware aborted the loop, return immediately if let Some(result) = abort_result { break result; } // If clarification was requested, return immediately if let Some(result) = clarification_result { break result; } // If circuit breaker was triggered, terminate immediately if circuit_breaker_triggered { let msg = "检测到工具调用循环,已自动终止"; self.memory.append_message(&session_id, &Message::assistant(msg)).await?; break AgentLoopResult { response: msg.to_string(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }; } }; // Post-completion processing — middleware chain or inline growth if let Some(ref chain) = self.middleware_chain { let mw_ctx = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), user_input: input.clone(), system_prompt: enhanced_prompt.clone(), messages: self.memory.get_messages(&session_id).await.unwrap_or_default(), response_content: Vec::new(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; if let Err(e) = chain.run_after_completion(&mw_ctx).await { tracing::warn!("[AgentLoop] Middleware after_completion failed: {}", e); } } else if let Some(ref growth) = self.growth { // Legacy inline path if let Ok(all_messages) = self.memory.get_messages(&session_id).await { if let Err(e) = growth.process_conversation(&self.agent_id, &all_messages, session_id.clone()).await { tracing::warn!("[AgentLoop] Growth processing failed: {}", e); } } } Ok(result) } /// Run the agent loop with streaming /// Implements complete agent loop with multi-turn tool calling support pub async fn run_streaming( &self, session_id: SessionId, input: String, ) -> Result> { let (tx, rx) = mpsc::channel(100); // Add user message to session let user_message = Message::user(input.clone()); self.memory.append_message(&session_id, &user_message).await?; // Get all messages for context let mut messages = self.memory.get_messages(&session_id).await?; let use_middleware = self.middleware_chain.is_some(); // Apply compaction — skip inline path when middleware chain handles it if !use_middleware && self.compaction_threshold > 0 { let needs_async = self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled; if needs_async { let outcome = compaction::maybe_compact_with_config( messages, self.compaction_threshold, &self.compaction_config, &self.agent_id, &session_id, Some(&self.driver), self.growth.as_ref(), ) .await; messages = outcome.messages; } else { messages = compaction::maybe_compact(messages, self.compaction_threshold); } } // Enhance system prompt — skip when middleware chain handles it let mut enhanced_prompt = if use_middleware { let prompt_ctx = PromptContext { base_prompt: self.system_prompt.clone(), soul: self.soul.clone(), thinking_enabled: self.thinking_enabled, plan_mode: self.plan_mode, tool_definitions: self.tools.definitions(), agent_name: None, }; PromptBuilder::new().build(&prompt_ctx) } else if let Some(ref growth) = self.growth { let base = self.system_prompt.as_deref().unwrap_or(""); growth.enhance_prompt(&self.agent_id, base, &input).await? } else { self.system_prompt.clone().unwrap_or_default() }; // Run middleware before_completion hooks (compaction, memory inject, etc.) if let Some(ref chain) = self.middleware_chain { let mut mw_ctx = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), user_input: input.clone(), system_prompt: enhanced_prompt.clone(), messages, response_content: Vec::new(), input_tokens: 0, output_tokens: 0, }; match chain.run_before_completion(&mut mw_ctx).await? { middleware::MiddlewareDecision::Continue => { messages = mw_ctx.messages; enhanced_prompt = mw_ctx.system_prompt; } middleware::MiddlewareDecision::Stop(reason) => { let _ = tx.send(LoopEvent::Complete(AgentLoopResult { response: reason, input_tokens: 0, output_tokens: 0, iterations: 1, })).await; return Ok(rx); } } } // Clone necessary data for the async task let session_id_clone = session_id.clone(); let memory = self.memory.clone(); let driver = self.driver.clone(); let tools = self.tools.clone(); let loop_guard_clone = self.loop_guard.lock().unwrap_or_else(|e| e.into_inner()).clone(); let middleware_chain = self.middleware_chain.clone(); let skill_executor = self.skill_executor.clone(); let path_validator = self.path_validator.clone(); let agent_id = self.agent_id.clone(); let model = self.model.clone(); let max_tokens = self.max_tokens; let temperature = self.temperature; let thinking_enabled = self.thinking_enabled; let reasoning_effort = self.reasoning_effort.clone(); let plan_mode = self.plan_mode; tokio::spawn(async move { let mut messages = messages; let loop_guard_clone = Mutex::new(loop_guard_clone); let max_iterations = 10; let mut iteration = 0; let mut total_input_tokens = 0u32; let mut total_output_tokens = 0u32; 'outer: loop { iteration += 1; if iteration > max_iterations { let _ = tx.send(LoopEvent::Error("达到最大迭代次数".to_string())).await; break; } // Notify iteration start let _ = tx.send(LoopEvent::IterationStart { iteration, max_iterations, }).await; // Build completion request let request = CompletionRequest { model: model.clone(), system: Some(enhanced_prompt.clone()), messages: messages.clone(), tools: tools.definitions(), max_tokens: Some(max_tokens), temperature: Some(temperature), stop: Vec::new(), stream: true, thinking_enabled, reasoning_effort: reasoning_effort.clone(), plan_mode, }; let mut stream = driver.stream(request); let mut pending_tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new(); let mut iteration_text = String::new(); let mut reasoning_text = String::new(); // Track reasoning separately for API requirement // Process stream chunks tracing::debug!("[AgentLoop] Starting to process stream chunks"); let mut chunk_count: usize = 0; let mut text_delta_count: usize = 0; let mut thinking_delta_count: usize = 0; let mut stream_errored = false; let chunk_timeout = std::time::Duration::from_secs(60); loop { match tokio::time::timeout(chunk_timeout, stream.next()).await { Ok(Some(Ok(chunk))) => { chunk_count += 1; match &chunk { StreamChunk::TextDelta { delta } => { text_delta_count += 1; tracing::debug!("[AgentLoop] TextDelta #{}: {} chars", text_delta_count, delta.len()); iteration_text.push_str(delta); let _ = tx.send(LoopEvent::Delta(delta.clone())).await; } StreamChunk::ThinkingDelta { delta } => { thinking_delta_count += 1; tracing::debug!("[AgentLoop] ThinkingDelta #{}: {} chars", thinking_delta_count, delta.len()); reasoning_text.push_str(delta); let _ = tx.send(LoopEvent::ThinkingDelta(delta.clone())).await; } StreamChunk::ToolUseStart { id, name } => { tracing::debug!("[AgentLoop] ToolUseStart: id={}, name={}", id, name); pending_tool_calls.push((id.clone(), name.clone(), serde_json::Value::Null)); } StreamChunk::ToolUseDelta { id, delta } => { // Accumulate tool input delta (internal processing, not sent to user) if let Some(tool) = pending_tool_calls.iter_mut().find(|(tid, _, _)| tid == id) { // Try to accumulate JSON string match &mut tool.2 { serde_json::Value::String(s) => s.push_str(delta), serde_json::Value::Null => tool.2 = serde_json::Value::String(delta.clone()), _ => {} } } } StreamChunk::ToolUseEnd { id, input } => { tracing::debug!("[AgentLoop] ToolUseEnd: id={}, input={:?}", id, input); // Update with final parsed input and emit ToolStart event if let Some(tool) = pending_tool_calls.iter_mut().find(|(tid, _, _)| tid == id) { tool.2 = input.clone(); let _ = tx.send(LoopEvent::ToolStart { name: tool.1.clone(), input: input.clone() }).await; } } StreamChunk::Complete { input_tokens: it, output_tokens: ot, .. } => { tracing::debug!("[AgentLoop] Stream complete: input_tokens={}, output_tokens={}", it, ot); total_input_tokens += *it; total_output_tokens += *ot; // Calibrate token estimation on first iteration if iteration == 1 { compaction::update_calibration( compaction::estimate_messages_tokens(&messages), *it, ); } } StreamChunk::Error { message } => { tracing::error!("[AgentLoop] Stream error: {}", message); let _ = tx.send(LoopEvent::Error(message.clone())).await; stream_errored = true; } } } Ok(Some(Err(e))) => { tracing::error!("[AgentLoop] Chunk error: {}", e); let _ = tx.send(LoopEvent::Error(format!("LLM 响应错误: {}", e.to_string()))).await; stream_errored = true; } Ok(None) => break, // Stream ended normally Err(_) => { tracing::error!("[AgentLoop] Stream chunk timeout ({}s)", chunk_timeout.as_secs()); let _ = tx.send(LoopEvent::Error("LLM 响应超时,请重试".to_string())).await; stream_errored = true; } } if stream_errored { break; } } tracing::info!("[AgentLoop] Stream ended: {} total chunks (text={}, thinking={}, tools={}), iteration_text={} chars", chunk_count, text_delta_count, thinking_delta_count, pending_tool_calls.len(), iteration_text.len()); // Fallback: if model generated reasoning but no text content, // use reasoning as text response. This happens with some thinking models // (DeepSeek R1, QWQ) that put the answer in reasoning_content instead of content. // Safe now because: (1) context is clean (no stale user_profile/memory injection), // (2) max_tokens=16384 prevents truncation, (3) reasoning is about the correct topic. if iteration_text.is_empty() && !reasoning_text.is_empty() { tracing::info!("[AgentLoop] Model generated {} chars of reasoning but no text — using reasoning as response", reasoning_text.len()); let _ = tx.send(LoopEvent::Delta(reasoning_text.clone())).await; iteration_text = reasoning_text.clone(); } else if iteration_text.is_empty() { tracing::warn!("[AgentLoop] No text content after {} chunks (thinking_delta={})", chunk_count, thinking_delta_count); } // If no tool calls, we have the final response if pending_tool_calls.is_empty() { tracing::info!("[AgentLoop] No tool calls, returning final response: {} chars (reasoning: {} chars)", iteration_text.len(), reasoning_text.len()); // Save final assistant message with reasoning if let Err(e) = memory.append_message(&session_id_clone, &Message::assistant_with_thinking( &iteration_text, &reasoning_text, )).await { tracing::warn!("[AgentLoop] Failed to save final assistant message: {}", e); } let _ = tx.send(LoopEvent::Complete(AgentLoopResult { response: iteration_text.clone(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations: iteration, })).await; // Post-completion: middleware after_completion (memory extraction, etc.) if let Some(ref chain) = middleware_chain { let mw_ctx = middleware::MiddlewareContext { agent_id: agent_id.clone(), session_id: session_id_clone.clone(), user_input: String::new(), system_prompt: enhanced_prompt.clone(), messages: memory.get_messages(&session_id_clone).await.unwrap_or_default(), response_content: Vec::new(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; if let Err(e) = chain.run_after_completion(&mw_ctx).await { tracing::warn!("[AgentLoop] Streaming middleware after_completion failed: {}", e); } } break 'outer; } // Skip tool processing if stream errored or timed out if stream_errored { tracing::debug!("[AgentLoop] Stream errored, skipping tool processing and breaking"); break 'outer; } tracing::debug!("[AgentLoop] Processing {} tool calls (reasoning: {} chars)", pending_tool_calls.len(), reasoning_text.len()); // Push assistant message with reasoning before tool calls (required by Kimi and other thinking-enabled APIs) messages.push(Message::assistant_with_thinking( &iteration_text, &reasoning_text, )); // There are tool calls - add to message history for (id, name, input) in &pending_tool_calls { tracing::debug!("[AgentLoop] Adding tool_use to history: id={}, name={}, input={:?}", id, name, input); messages.push(Message::tool_use(id, zclaw_types::ToolId::new(name), input.clone())); } // Execute tools for (id, name, input) in pending_tool_calls { tracing::debug!("[AgentLoop] Executing tool: name={}, input={:?}", name, input); // Check tool call safety — via middleware chain or inline loop guard if let Some(ref chain) = middleware_chain { let mw_ctx = middleware::MiddlewareContext { agent_id: agent_id.clone(), session_id: session_id_clone.clone(), user_input: input.to_string(), system_prompt: enhanced_prompt.clone(), messages: messages.clone(), response_content: Vec::new(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; match chain.run_before_tool_call(&mw_ctx, &name, &input).await { Ok(middleware::ToolCallDecision::Allow) => {} Ok(middleware::ToolCallDecision::Block(msg)) => { tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg); let error_output = serde_json::json!({ "error": msg }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); continue; } Ok(middleware::ToolCallDecision::AbortLoop(reason)) => { tracing::warn!("[AgentLoop] Loop aborted by middleware: {}", reason); let _ = tx.send(LoopEvent::Error(reason)).await; break 'outer; } Ok(middleware::ToolCallDecision::ReplaceInput(new_input)) => { // Execute with replaced input (same path_validator logic below) let pv = path_validator.clone().unwrap_or_else(|| { let home = std::env::var("USERPROFILE") .or_else(|_| std::env::var("HOME")) .unwrap_or_else(|_| ".".to_string()); PathValidator::new().with_workspace(std::path::PathBuf::from(&home)) }); let working_dir = pv.workspace_root() .map(|p| p.to_string_lossy().to_string()); let tool_context = ToolContext { agent_id: agent_id.clone(), working_directory: working_dir, session_id: Some(session_id_clone.to_string()), skill_executor: skill_executor.clone(), path_validator: Some(pv), event_sender: Some(tx.clone()), }; let (result, is_error) = if let Some(tool) = tools.get(&name) { match tool.execute(new_input, &tool_context).await { Ok(output) => { let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: output.clone() }).await; (output, false) } Err(e) => { let error_output = serde_json::json!({ "error": e.to_string() }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; (error_output, true) } } } else { let error_output = serde_json::json!({ "error": format!("Unknown tool: {}", name) }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; (error_output, true) }; messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), result, is_error)); continue; } Err(e) => { tracing::error!("[AgentLoop] Middleware error for tool '{}': {}", name, e); let error_output = serde_json::json!({ "error": e.to_string() }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); continue; } } } else { // Legacy inline loop guard path let guard_result = loop_guard_clone.lock().unwrap_or_else(|e| e.into_inner()).check(&name, &input); match guard_result { LoopGuardResult::CircuitBreaker => { let _ = tx.send(LoopEvent::Error("检测到工具调用循环,已自动终止".to_string())).await; break 'outer; } LoopGuardResult::Blocked => { tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name); let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); continue; } LoopGuardResult::Warn => { tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name); } LoopGuardResult::Allowed => {} } } // Use pre-resolved path_validator (already has default fallback from create_tool_context logic) let pv = path_validator.clone().unwrap_or_else(|| { let home = std::env::var("USERPROFILE") .or_else(|_| std::env::var("HOME")) .unwrap_or_else(|_| ".".to_string()); PathValidator::new().with_workspace(std::path::PathBuf::from(&home)) }); let working_dir = pv.workspace_root() .map(|p| p.to_string_lossy().to_string()); let tool_context = ToolContext { agent_id: agent_id.clone(), working_directory: working_dir, session_id: Some(session_id_clone.to_string()), skill_executor: skill_executor.clone(), path_validator: Some(pv), event_sender: Some(tx.clone()), }; let (result, is_error) = if let Some(tool) = tools.get(&name) { tracing::debug!("[AgentLoop] Tool '{}' found, executing...", name); match tool.execute(input.clone(), &tool_context).await { Ok(output) => { tracing::debug!("[AgentLoop] Tool '{}' executed successfully: {:?}", name, output); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: output.clone() }).await; (output, false) } Err(e) => { tracing::error!("[AgentLoop] Tool '{}' execution failed: {}", name, e); let error_output = serde_json::json!({ "error": e.to_string() }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; (error_output, true) } } } else { tracing::error!("[AgentLoop] Tool '{}' not found in registry", name); let error_output = serde_json::json!({ "error": format!("Unknown tool: {}", name) }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; (error_output, true) }; // Check if this is a clarification response — break outer loop if name == "ask_clarification" && result.get("status").and_then(|v| v.as_str()) == Some("clarification_needed") { tracing::info!("[AgentLoop] Streaming: Clarification requested, terminating loop"); let question = result.get("question") .and_then(|v| v.as_str()) .unwrap_or("需要更多信息") .to_string(); messages.push(Message::tool_result( id, zclaw_types::ToolId::new(&name), result, is_error, )); // Send the question as final delta so the user sees it let _ = tx.send(LoopEvent::Delta(question.clone())).await; let _ = tx.send(LoopEvent::Complete(AgentLoopResult { response: question.clone(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations: iteration, })).await; if let Err(e) = memory.append_message(&session_id_clone, &Message::assistant(&question)).await { tracing::warn!("[AgentLoop] Failed to save clarification message: {}", e); } break 'outer; } // Add tool result to message history tracing::debug!("[AgentLoop] Adding tool_result to history: id={}, name={}, is_error={}", id, name, is_error); messages.push(Message::tool_result( id, zclaw_types::ToolId::new(&name), result, is_error, )); } tracing::debug!("[AgentLoop] Continuing to next iteration for LLM to process tool results"); // Continue loop - next iteration will call LLM with tool results } }); Ok(rx) } } /// Result of an agent loop execution #[derive(Debug, Clone)] pub struct AgentLoopResult { pub response: String, pub input_tokens: u32, pub output_tokens: u32, pub iterations: usize, } /// Events emitted during streaming #[derive(Debug, Clone)] pub enum LoopEvent { /// Text delta from LLM Delta(String), /// Thinking/reasoning delta from LLM (extended thinking) ThinkingDelta(String), /// Tool execution started ToolStart { name: String, input: serde_json::Value }, /// Tool execution completed ToolEnd { name: String, output: serde_json::Value }, /// Sub-agent task status update (started/running/completed/failed) SubtaskStatus { description: String, status: String, detail: Option, }, /// New iteration started (multi-turn tool calling) IterationStart { iteration: usize, max_iterations: usize }, /// Loop completed with final result Complete(AgentLoopResult), /// Error occurred Error(String), }