//! Agent loop implementation use std::sync::Arc; use std::sync::Mutex; use futures::StreamExt; use tokio::sync::mpsc; use zclaw_types::{AgentId, SessionId, Message, Result}; use crate::driver::{LlmDriver, CompletionRequest, ContentBlock}; use crate::stream::StreamChunk; use crate::tool::{ToolRegistry, ToolContext, SkillExecutor}; use crate::tool::builtin::PathValidator; use crate::loop_guard::{LoopGuard, LoopGuardResult}; use crate::growth::GrowthIntegration; use crate::compaction::{self, CompactionConfig}; use zclaw_memory::MemoryStore; /// Agent loop runner pub struct AgentLoop { agent_id: AgentId, driver: Arc, tools: ToolRegistry, memory: Arc, loop_guard: Mutex, model: String, system_prompt: Option, max_tokens: u32, temperature: f32, skill_executor: Option>, path_validator: Option, /// Growth system integration (optional) growth: Option, /// Compaction threshold in tokens (0 = disabled) compaction_threshold: usize, /// Compaction behavior configuration compaction_config: CompactionConfig, } impl AgentLoop { pub fn new( agent_id: AgentId, driver: Arc, tools: ToolRegistry, memory: Arc, ) -> Self { Self { agent_id, driver, tools, memory, loop_guard: Mutex::new(LoopGuard::default()), model: String::new(), // Must be set via with_model() system_prompt: None, max_tokens: 4096, temperature: 0.7, skill_executor: None, path_validator: None, growth: None, compaction_threshold: 0, compaction_config: CompactionConfig::default(), } } /// Set the skill executor for tool execution pub fn with_skill_executor(mut self, executor: Arc) -> Self { self.skill_executor = Some(executor); self } /// Set the path validator for file system operations pub fn with_path_validator(mut self, validator: PathValidator) -> Self { self.path_validator = Some(validator); self } /// Set the model to use pub fn with_model(mut self, model: impl Into) -> Self { self.model = model.into(); self } /// Set the system prompt pub fn with_system_prompt(mut self, prompt: impl Into) -> Self { self.system_prompt = Some(prompt.into()); self } /// Set max tokens pub fn with_max_tokens(mut self, max_tokens: u32) -> Self { self.max_tokens = max_tokens; self } /// Set temperature pub fn with_temperature(mut self, temperature: f32) -> Self { self.temperature = temperature; self } /// Enable growth system integration pub fn with_growth(mut self, growth: GrowthIntegration) -> Self { self.growth = Some(growth); self } /// Set growth system (mutable) pub fn set_growth(&mut self, growth: GrowthIntegration) { self.growth = Some(growth); } /// Set compaction threshold in tokens (0 = disabled) /// /// When the estimated token count of conversation history exceeds this /// threshold, older messages are summarized into a single system message /// and only recent messages are sent to the LLM. pub fn with_compaction_threshold(mut self, threshold: usize) -> Self { self.compaction_threshold = threshold; self } /// Set compaction configuration (LLM mode, memory flushing, etc.) pub fn with_compaction_config(mut self, config: CompactionConfig) -> Self { self.compaction_config = config; self } /// Get growth integration reference pub fn growth(&self) -> Option<&GrowthIntegration> { self.growth.as_ref() } /// Create tool context for tool execution fn create_tool_context(&self, session_id: SessionId) -> ToolContext { ToolContext { agent_id: self.agent_id.clone(), working_directory: None, session_id: Some(session_id.to_string()), skill_executor: self.skill_executor.clone(), path_validator: self.path_validator.clone(), } } /// Execute a tool with the given input async fn execute_tool(&self, tool_name: &str, input: serde_json::Value, context: &ToolContext) -> Result { let tool = self.tools.get(tool_name) .ok_or_else(|| zclaw_types::ZclawError::ToolError(format!("Unknown tool: {}", tool_name)))?; tool.execute(input, context).await } /// Run the agent loop with a single message /// Implements complete agent loop: LLM → Tool Call → Tool Result → LLM → Final Response pub async fn run(&self, session_id: SessionId, input: String) -> Result { // Add user message to session let user_message = Message::user(input.clone()); self.memory.append_message(&session_id, &user_message).await?; // Get all messages for context let mut messages = self.memory.get_messages(&session_id).await?; // Apply compaction if threshold is configured if self.compaction_threshold > 0 { let needs_async = self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled; if needs_async { let outcome = compaction::maybe_compact_with_config( messages, self.compaction_threshold, &self.compaction_config, &self.agent_id, &session_id, Some(&self.driver), self.growth.as_ref(), ) .await; messages = outcome.messages; } else { messages = compaction::maybe_compact(messages, self.compaction_threshold); } } // Enhance system prompt with growth memories let enhanced_prompt = if let Some(ref growth) = self.growth { let base = self.system_prompt.as_deref().unwrap_or(""); growth.enhance_prompt(&self.agent_id, base, &input).await? } else { self.system_prompt.clone().unwrap_or_default() }; let max_iterations = 10; let mut iterations = 0; let mut total_input_tokens = 0u32; let mut total_output_tokens = 0u32; let result = loop { iterations += 1; if iterations > max_iterations { // Save the state before returning let error_msg = "达到最大迭代次数,请简化请求"; self.memory.append_message(&session_id, &Message::assistant(error_msg)).await?; break AgentLoopResult { response: error_msg.to_string(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }; } // Build completion request let request = CompletionRequest { model: self.model.clone(), system: Some(enhanced_prompt.clone()), messages: messages.clone(), tools: self.tools.definitions(), max_tokens: Some(self.max_tokens), temperature: Some(self.temperature), stop: Vec::new(), stream: false, }; // Call LLM let response = self.driver.complete(request).await?; total_input_tokens += response.input_tokens; total_output_tokens += response.output_tokens; // Extract tool calls from response let tool_calls: Vec<(String, String, serde_json::Value)> = response.content.iter() .filter_map(|block| match block { ContentBlock::ToolUse { id, name, input } => Some((id.clone(), name.clone(), input.clone())), _ => None, }) .collect(); // If no tool calls, we have the final response if tool_calls.is_empty() { // Extract text content let text = response.content.iter() .filter_map(|block| match block { ContentBlock::Text { text } => Some(text.clone()), ContentBlock::Thinking { thinking } => Some(format!("[思考] {}", thinking)), _ => None, }) .collect::>() .join("\n"); // Save final assistant message self.memory.append_message(&session_id, &Message::assistant(&text)).await?; break AgentLoopResult { response: text, input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }; } // There are tool calls - add assistant message with tool calls to history for (id, name, input) in &tool_calls { messages.push(Message::tool_use(id, zclaw_types::ToolId::new(name), input.clone())); } // Create tool context and execute all tools let tool_context = self.create_tool_context(session_id.clone()); let mut circuit_breaker_triggered = false; for (id, name, input) in tool_calls { // Check loop guard before executing tool let guard_result = self.loop_guard.lock().unwrap().check(&name, &input); match guard_result { LoopGuardResult::CircuitBreaker => { tracing::warn!("[AgentLoop] Circuit breaker triggered by tool '{}'", name); circuit_breaker_triggered = true; break; } LoopGuardResult::Blocked => { tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name); let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" }); messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); continue; } LoopGuardResult::Warn => { tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name); } LoopGuardResult::Allowed => {} } let tool_result = match self.execute_tool(&name, input, &tool_context).await { Ok(result) => result, Err(e) => serde_json::json!({ "error": e.to_string() }), }; // Add tool result to messages messages.push(Message::tool_result( id, zclaw_types::ToolId::new(&name), tool_result, false, // is_error - we include errors in the result itself )); } // Continue the loop - LLM will process tool results and generate final response // If circuit breaker was triggered, terminate immediately if circuit_breaker_triggered { let msg = "检测到工具调用循环,已自动终止"; self.memory.append_message(&session_id, &Message::assistant(msg)).await?; break AgentLoopResult { response: msg.to_string(), input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations, }; } }; // Process conversation for memory extraction (post-conversation) if let Some(ref growth) = self.growth { if let Ok(all_messages) = self.memory.get_messages(&session_id).await { if let Err(e) = growth.process_conversation(&self.agent_id, &all_messages, session_id.clone()).await { tracing::warn!("[AgentLoop] Growth processing failed: {}", e); } } } Ok(result) } /// Run the agent loop with streaming /// Implements complete agent loop with multi-turn tool calling support pub async fn run_streaming( &self, session_id: SessionId, input: String, ) -> Result> { let (tx, rx) = mpsc::channel(100); // Add user message to session let user_message = Message::user(input.clone()); self.memory.append_message(&session_id, &user_message).await?; // Get all messages for context let mut messages = self.memory.get_messages(&session_id).await?; // Apply compaction if threshold is configured if self.compaction_threshold > 0 { let needs_async = self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled; if needs_async { let outcome = compaction::maybe_compact_with_config( messages, self.compaction_threshold, &self.compaction_config, &self.agent_id, &session_id, Some(&self.driver), self.growth.as_ref(), ) .await; messages = outcome.messages; } else { messages = compaction::maybe_compact(messages, self.compaction_threshold); } } // Enhance system prompt with growth memories let enhanced_prompt = if let Some(ref growth) = self.growth { let base = self.system_prompt.as_deref().unwrap_or(""); growth.enhance_prompt(&self.agent_id, base, &input).await? } else { self.system_prompt.clone().unwrap_or_default() }; // Clone necessary data for the async task let session_id_clone = session_id.clone(); let memory = self.memory.clone(); let driver = self.driver.clone(); let tools = self.tools.clone(); let loop_guard_clone = self.loop_guard.lock().unwrap().clone(); let skill_executor = self.skill_executor.clone(); let path_validator = self.path_validator.clone(); let agent_id = self.agent_id.clone(); let model = self.model.clone(); let max_tokens = self.max_tokens; let temperature = self.temperature; tokio::spawn(async move { let mut messages = messages; let loop_guard_clone = Mutex::new(loop_guard_clone); let max_iterations = 10; let mut iteration = 0; let mut total_input_tokens = 0u32; let mut total_output_tokens = 0u32; 'outer: loop { iteration += 1; if iteration > max_iterations { let _ = tx.send(LoopEvent::Error("达到最大迭代次数".to_string())).await; break; } // Notify iteration start let _ = tx.send(LoopEvent::IterationStart { iteration, max_iterations, }).await; // Build completion request let request = CompletionRequest { model: model.clone(), system: Some(enhanced_prompt.clone()), messages: messages.clone(), tools: tools.definitions(), max_tokens: Some(max_tokens), temperature: Some(temperature), stop: Vec::new(), stream: true, }; let mut stream = driver.stream(request); let mut pending_tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new(); let mut iteration_text = String::new(); // Process stream chunks tracing::debug!("[AgentLoop] Starting to process stream chunks"); while let Some(chunk_result) = stream.next().await { match chunk_result { Ok(chunk) => { match &chunk { StreamChunk::TextDelta { delta } => { iteration_text.push_str(delta); let _ = tx.send(LoopEvent::Delta(delta.clone())).await; } StreamChunk::ThinkingDelta { delta } => { let _ = tx.send(LoopEvent::Delta(format!("[思考] {}", delta))).await; } StreamChunk::ToolUseStart { id, name } => { tracing::debug!("[AgentLoop] ToolUseStart: id={}, name={}", id, name); pending_tool_calls.push((id.clone(), name.clone(), serde_json::Value::Null)); } StreamChunk::ToolUseDelta { id, delta } => { // Accumulate tool input delta (internal processing, not sent to user) if let Some(tool) = pending_tool_calls.iter_mut().find(|(tid, _, _)| tid == id) { // Try to accumulate JSON string match &mut tool.2 { serde_json::Value::String(s) => s.push_str(delta), serde_json::Value::Null => tool.2 = serde_json::Value::String(delta.clone()), _ => {} } } } StreamChunk::ToolUseEnd { id, input } => { tracing::debug!("[AgentLoop] ToolUseEnd: id={}, input={:?}", id, input); // Update with final parsed input and emit ToolStart event if let Some(tool) = pending_tool_calls.iter_mut().find(|(tid, _, _)| tid == id) { tool.2 = input.clone(); let _ = tx.send(LoopEvent::ToolStart { name: tool.1.clone(), input: input.clone() }).await; } } StreamChunk::Complete { input_tokens: it, output_tokens: ot, .. } => { tracing::debug!("[AgentLoop] Stream complete: input_tokens={}, output_tokens={}", it, ot); total_input_tokens += *it; total_output_tokens += *ot; } StreamChunk::Error { message } => { tracing::error!("[AgentLoop] Stream error: {}", message); let _ = tx.send(LoopEvent::Error(message.clone())).await; } } } Err(e) => { tracing::error!("[AgentLoop] Chunk error: {}", e); let _ = tx.send(LoopEvent::Error(e.to_string())).await; } } } tracing::debug!("[AgentLoop] Stream ended, pending_tool_calls count: {}", pending_tool_calls.len()); // If no tool calls, we have the final response if pending_tool_calls.is_empty() { tracing::debug!("[AgentLoop] No tool calls, returning final response"); // Save final assistant message let _ = memory.append_message(&session_id_clone, &Message::assistant(&iteration_text)).await; let _ = tx.send(LoopEvent::Complete(AgentLoopResult { response: iteration_text, input_tokens: total_input_tokens, output_tokens: total_output_tokens, iterations: iteration, })).await; break 'outer; } tracing::debug!("[AgentLoop] Processing {} tool calls", pending_tool_calls.len()); // There are tool calls - add to message history for (id, name, input) in &pending_tool_calls { tracing::debug!("[AgentLoop] Adding tool_use to history: id={}, name={}, input={:?}", id, name, input); messages.push(Message::tool_use(id, zclaw_types::ToolId::new(name), input.clone())); } // Execute tools for (id, name, input) in pending_tool_calls { tracing::debug!("[AgentLoop] Executing tool: name={}, input={:?}", name, input); // Check loop guard before executing tool let guard_result = loop_guard_clone.lock().unwrap().check(&name, &input); match guard_result { LoopGuardResult::CircuitBreaker => { let _ = tx.send(LoopEvent::Error("检测到工具调用循环,已自动终止".to_string())).await; break 'outer; } LoopGuardResult::Blocked => { tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name); let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); continue; } LoopGuardResult::Warn => { tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name); } LoopGuardResult::Allowed => {} } let tool_context = ToolContext { agent_id: agent_id.clone(), working_directory: None, session_id: Some(session_id_clone.to_string()), skill_executor: skill_executor.clone(), path_validator: path_validator.clone(), }; let (result, is_error) = if let Some(tool) = tools.get(&name) { tracing::debug!("[AgentLoop] Tool '{}' found, executing...", name); match tool.execute(input.clone(), &tool_context).await { Ok(output) => { tracing::debug!("[AgentLoop] Tool '{}' executed successfully: {:?}", name, output); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: output.clone() }).await; (output, false) } Err(e) => { tracing::error!("[AgentLoop] Tool '{}' execution failed: {}", name, e); let error_output = serde_json::json!({ "error": e.to_string() }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; (error_output, true) } } } else { tracing::error!("[AgentLoop] Tool '{}' not found in registry", name); let error_output = serde_json::json!({ "error": format!("Unknown tool: {}", name) }); let _ = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await; (error_output, true) }; // Add tool result to message history tracing::debug!("[AgentLoop] Adding tool_result to history: id={}, name={}, is_error={}", id, name, is_error); messages.push(Message::tool_result( id, zclaw_types::ToolId::new(&name), result, is_error, )); } tracing::debug!("[AgentLoop] Continuing to next iteration for LLM to process tool results"); // Continue loop - next iteration will call LLM with tool results } }); Ok(rx) } } /// Result of an agent loop execution #[derive(Debug, Clone)] pub struct AgentLoopResult { pub response: String, pub input_tokens: u32, pub output_tokens: u32, pub iterations: usize, } /// Events emitted during streaming #[derive(Debug, Clone)] pub enum LoopEvent { /// Text delta from LLM Delta(String), /// Tool execution started ToolStart { name: String, input: serde_json::Value }, /// Tool execution completed ToolEnd { name: String, output: serde_json::Value }, /// New iteration started (multi-turn tool calling) IterationStart { iteration: usize, max_iterations: usize }, /// Loop completed with final result Complete(AgentLoopResult), /// Error occurred Error(String), }