From 5687dc20e07c83c11330fadfba7fd2e2b2896232 Mon Sep 17 00:00:00 2001 From: iven Date: Fri, 17 Apr 2026 21:54:12 +0800 Subject: [PATCH] =?UTF-8?q?refactor(runtime):=20loop=5Frunner=20=E5=8F=8C?= =?UTF-8?q?=E8=B7=AF=E5=BE=84=E5=90=88=E5=B9=B6=20=E2=80=94=20=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E8=B5=B0=20middleware=20chain=20(Phase=203A)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit middleware_chain 从 Option 改为 MiddlewareChain: - 移除 6 处 use_middleware 分支 + 2 处 legacy loop_guard inline path - 移除 loop_guard field + Mutex import + circuit_breaker_triggered 变量 - 空 chain (Default) 行为等价于 middleware path 中的 no-op - 1154行 → 1023行,净减 131 行 - cargo check --workspace ✓ | cargo test ✓ (排除 desktop 预存编译问题) --- crates/zclaw-kernel/src/kernel/messaging.rs | 12 +- crates/zclaw-kernel/src/kernel/mod.rs | 10 +- crates/zclaw-runtime/src/loop_runner.rs | 211 ++++---------------- 3 files changed, 48 insertions(+), 185 deletions(-) diff --git a/crates/zclaw-kernel/src/kernel/messaging.rs b/crates/zclaw-kernel/src/kernel/messaging.rs index 8617929..8c5dce9 100644 --- a/crates/zclaw-kernel/src/kernel/messaging.rs +++ b/crates/zclaw-kernel/src/kernel/messaging.rs @@ -83,10 +83,8 @@ impl Kernel { loop_runner = loop_runner.with_path_validator(path_validator); } - // Inject middleware chain if available - if let Some(chain) = self.create_middleware_chain() { - loop_runner = loop_runner.with_middleware_chain(chain); - } + // Inject middleware chain + loop_runner = loop_runner.with_middleware_chain(self.create_middleware_chain()); // Apply chat mode configuration (thinking/reasoning/plan mode) if let Some(ref mode) = chat_mode { @@ -198,10 +196,8 @@ impl Kernel { loop_runner = loop_runner.with_path_validator(path_validator); } - // Inject middleware chain if available - if let Some(chain) = self.create_middleware_chain() { - loop_runner = loop_runner.with_middleware_chain(chain); - } + // Inject middleware chain + loop_runner = loop_runner.with_middleware_chain(self.create_middleware_chain()); // Apply chat mode configuration (thinking/reasoning/plan mode from frontend) if let Some(ref mode) = chat_mode { diff --git a/crates/zclaw-kernel/src/kernel/mod.rs b/crates/zclaw-kernel/src/kernel/mod.rs index d9308ee..b804f12 100644 --- a/crates/zclaw-kernel/src/kernel/mod.rs +++ b/crates/zclaw-kernel/src/kernel/mod.rs @@ -201,7 +201,7 @@ impl Kernel { /// When middleware is configured, cross-cutting concerns (compaction, loop guard, /// token calibration, etc.) are delegated to the chain. When no middleware is /// registered, the legacy inline path in `AgentLoop` is used instead. - pub(crate) fn create_middleware_chain(&self) -> Option { + pub(crate) fn create_middleware_chain(&self) -> zclaw_runtime::middleware::MiddlewareChain { let mut chain = zclaw_runtime::middleware::MiddlewareChain::new(); // Butler router — semantic skill routing context injection @@ -359,13 +359,11 @@ impl Kernel { chain.register(Arc::new(mw)); } - // Only return Some if we actually registered middleware - if chain.is_empty() { - None - } else { + // Always return the chain (empty chain is a no-op) + if !chain.is_empty() { tracing::info!("[Kernel] Middleware chain created with {} middlewares", chain.len()); - Some(chain) } + chain } /// Subscribe to events diff --git a/crates/zclaw-runtime/src/loop_runner.rs b/crates/zclaw-runtime/src/loop_runner.rs index 361a440..f614cc8 100644 --- a/crates/zclaw-runtime/src/loop_runner.rs +++ b/crates/zclaw-runtime/src/loop_runner.rs @@ -1,7 +1,6 @@ //! Agent loop implementation use std::sync::Arc; -use std::sync::Mutex; use futures::StreamExt; use tokio::sync::mpsc; use zclaw_types::{AgentId, SessionId, Message, Result}; @@ -10,7 +9,6 @@ use crate::driver::{LlmDriver, CompletionRequest, ContentBlock}; use crate::stream::StreamChunk; use crate::tool::{ToolRegistry, ToolContext, SkillExecutor}; use crate::tool::builtin::PathValidator; -use crate::loop_guard::{LoopGuard, LoopGuardResult}; use crate::growth::GrowthIntegration; use crate::compaction::{self, CompactionConfig}; use crate::middleware::{self, MiddlewareChain}; @@ -23,7 +21,6 @@ pub struct AgentLoop { driver: Arc, tools: ToolRegistry, memory: Arc, - loop_guard: Mutex, model: String, system_prompt: Option, /// Custom agent personality for prompt assembly @@ -38,10 +35,9 @@ pub struct AgentLoop { compaction_threshold: usize, /// Compaction behavior configuration compaction_config: CompactionConfig, - /// Optional middleware chain — when `Some`, cross-cutting logic is - /// delegated to the chain instead of the inline code below. - /// When `None`, the legacy inline path is used (100% backward compatible). - middleware_chain: Option, + /// Middleware chain — cross-cutting concerns are delegated to the chain. + /// An empty chain (Default) is a no-op: all `run_*` methods return Continue/Allow. + middleware_chain: MiddlewareChain, /// Chat mode: extended thinking enabled thinking_enabled: bool, /// Chat mode: reasoning effort level @@ -62,7 +58,6 @@ impl AgentLoop { driver, tools, memory, - loop_guard: Mutex::new(LoopGuard::default()), model: String::new(), // Must be set via with_model() system_prompt: None, soul: None, @@ -73,7 +68,7 @@ impl AgentLoop { growth: None, compaction_threshold: 0, compaction_config: CompactionConfig::default(), - middleware_chain: None, + middleware_chain: MiddlewareChain::default(), thinking_enabled: false, reasoning_effort: None, plan_mode: false, @@ -167,11 +162,10 @@ impl AgentLoop { self } - /// Inject a middleware chain. When set, cross-cutting concerns (compaction, - /// loop guard, token calibration, etc.) are delegated to the chain instead - /// of the inline logic. + /// Inject a middleware chain. Cross-cutting concerns (compaction, + /// loop guard, token calibration, etc.) are delegated to the chain. pub fn with_middleware_chain(mut self, chain: MiddlewareChain) -> Self { - self.middleware_chain = Some(chain); + self.middleware_chain = chain; self } @@ -227,49 +221,19 @@ impl AgentLoop { // Get all messages for context let mut messages = self.memory.get_messages(&session_id).await?; - let use_middleware = self.middleware_chain.is_some(); - - // Apply compaction — skip inline path when middleware chain handles it - if !use_middleware && self.compaction_threshold > 0 { - let needs_async = - self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled; - if needs_async { - let outcome = compaction::maybe_compact_with_config( - messages, - self.compaction_threshold, - &self.compaction_config, - &self.agent_id, - &session_id, - Some(&self.driver), - self.growth.as_ref(), - ) - .await; - messages = outcome.messages; - } else { - messages = compaction::maybe_compact(messages, self.compaction_threshold); - } - } - - // Enhance system prompt — skip when middleware chain handles it - let mut enhanced_prompt = if use_middleware { - let prompt_ctx = PromptContext { - base_prompt: self.system_prompt.clone(), - soul: self.soul.clone(), - thinking_enabled: self.thinking_enabled, - plan_mode: self.plan_mode, - tool_definitions: self.tools.definitions(), - agent_name: None, - }; - PromptBuilder::new().build(&prompt_ctx) - } else if let Some(ref growth) = self.growth { - let base = self.system_prompt.as_deref().unwrap_or(""); - growth.enhance_prompt(&self.agent_id, base, &input).await? - } else { - self.system_prompt.clone().unwrap_or_default() + // Enhance system prompt via PromptBuilder (middleware may further modify) + let prompt_ctx = PromptContext { + base_prompt: self.system_prompt.clone(), + soul: self.soul.clone(), + thinking_enabled: self.thinking_enabled, + plan_mode: self.plan_mode, + tool_definitions: self.tools.definitions(), + agent_name: None, }; + let mut enhanced_prompt = PromptBuilder::new().build(&prompt_ctx); // Run middleware before_completion hooks (compaction, memory inject, etc.) - if let Some(ref chain) = self.middleware_chain { + { let mut mw_ctx = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), @@ -280,7 +244,7 @@ impl AgentLoop { input_tokens: 0, output_tokens: 0, }; - match chain.run_before_completion(&mut mw_ctx).await? { + match self.middleware_chain.run_before_completion(&mut mw_ctx).await? { middleware::MiddlewareDecision::Continue => { messages = mw_ctx.messages; enhanced_prompt = mw_ctx.system_prompt; @@ -400,7 +364,6 @@ impl AgentLoop { // Create tool context and execute all tools let tool_context = self.create_tool_context(session_id.clone()); - let mut circuit_breaker_triggered = false; let mut abort_result: Option = None; let mut clarification_result: Option = None; for (id, name, input) in tool_calls { @@ -408,8 +371,8 @@ impl AgentLoop { if abort_result.is_some() { break; } - // Check tool call safety — via middleware chain or inline loop guard - if let Some(ref chain) = self.middleware_chain { + // Check tool call safety — via middleware chain + { let mw_ctx_ref = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), @@ -420,7 +383,7 @@ impl AgentLoop { input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; - match chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? { + match self.middleware_chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? { middleware::ToolCallDecision::Allow => {} middleware::ToolCallDecision::Block(msg) => { tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg); @@ -456,26 +419,6 @@ impl AgentLoop { }); } } - } else { - // Legacy inline path - let guard_result = self.loop_guard.lock().unwrap_or_else(|e| e.into_inner()).check(&name, &input); - match guard_result { - LoopGuardResult::CircuitBreaker => { - tracing::warn!("[AgentLoop] Circuit breaker triggered by tool '{}'", name); - circuit_breaker_triggered = true; - break; - } - LoopGuardResult::Blocked => { - tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name); - let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" }); - messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); - continue; - } - LoopGuardResult::Warn => { - tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name); - } - LoopGuardResult::Allowed => {} - } } let tool_result = match tokio::time::timeout( @@ -537,21 +480,10 @@ impl AgentLoop { break result; } - // If circuit breaker was triggered, terminate immediately - if circuit_breaker_triggered { - let msg = "检测到工具调用循环,已自动终止"; - self.memory.append_message(&session_id, &Message::assistant(msg)).await?; - break AgentLoopResult { - response: msg.to_string(), - input_tokens: total_input_tokens, - output_tokens: total_output_tokens, - iterations, - }; - } }; - // Post-completion processing — middleware chain or inline growth - if let Some(ref chain) = self.middleware_chain { + // Post-completion processing — middleware chain + { let mw_ctx = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), @@ -562,16 +494,9 @@ impl AgentLoop { input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; - if let Err(e) = chain.run_after_completion(&mw_ctx).await { + if let Err(e) = self.middleware_chain.run_after_completion(&mw_ctx).await { tracing::warn!("[AgentLoop] Middleware after_completion failed: {}", e); } - } else if let Some(ref growth) = self.growth { - // Legacy inline path - if let Ok(all_messages) = self.memory.get_messages(&session_id).await { - if let Err(e) = growth.process_conversation(&self.agent_id, &all_messages, session_id.clone()).await { - tracing::warn!("[AgentLoop] Growth processing failed: {}", e); - } - } } Ok(result) @@ -593,49 +518,19 @@ impl AgentLoop { // Get all messages for context let mut messages = self.memory.get_messages(&session_id).await?; - let use_middleware = self.middleware_chain.is_some(); - - // Apply compaction — skip inline path when middleware chain handles it - if !use_middleware && self.compaction_threshold > 0 { - let needs_async = - self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled; - if needs_async { - let outcome = compaction::maybe_compact_with_config( - messages, - self.compaction_threshold, - &self.compaction_config, - &self.agent_id, - &session_id, - Some(&self.driver), - self.growth.as_ref(), - ) - .await; - messages = outcome.messages; - } else { - messages = compaction::maybe_compact(messages, self.compaction_threshold); - } - } - - // Enhance system prompt — skip when middleware chain handles it - let mut enhanced_prompt = if use_middleware { - let prompt_ctx = PromptContext { - base_prompt: self.system_prompt.clone(), - soul: self.soul.clone(), - thinking_enabled: self.thinking_enabled, - plan_mode: self.plan_mode, - tool_definitions: self.tools.definitions(), - agent_name: None, - }; - PromptBuilder::new().build(&prompt_ctx) - } else if let Some(ref growth) = self.growth { - let base = self.system_prompt.as_deref().unwrap_or(""); - growth.enhance_prompt(&self.agent_id, base, &input).await? - } else { - self.system_prompt.clone().unwrap_or_default() + // Enhance system prompt via PromptBuilder (middleware may further modify) + let prompt_ctx = PromptContext { + base_prompt: self.system_prompt.clone(), + soul: self.soul.clone(), + thinking_enabled: self.thinking_enabled, + plan_mode: self.plan_mode, + tool_definitions: self.tools.definitions(), + agent_name: None, }; + let mut enhanced_prompt = PromptBuilder::new().build(&prompt_ctx); // Run middleware before_completion hooks (compaction, memory inject, etc.) - if let Some(ref chain) = self.middleware_chain { + { let mut mw_ctx = middleware::MiddlewareContext { agent_id: self.agent_id.clone(), session_id: session_id.clone(), @@ -646,7 +541,7 @@ impl AgentLoop { input_tokens: 0, output_tokens: 0, }; - match chain.run_before_completion(&mut mw_ctx).await? { + match self.middleware_chain.run_before_completion(&mut mw_ctx).await? { middleware::MiddlewareDecision::Continue => { messages = mw_ctx.messages; enhanced_prompt = mw_ctx.system_prompt; @@ -670,7 +565,6 @@ impl AgentLoop { let memory = self.memory.clone(); let driver = self.driver.clone(); let tools = self.tools.clone(); - let loop_guard_clone = self.loop_guard.lock().unwrap_or_else(|e| e.into_inner()).clone(); let middleware_chain = self.middleware_chain.clone(); let skill_executor = self.skill_executor.clone(); let path_validator = self.path_validator.clone(); @@ -684,7 +578,6 @@ impl AgentLoop { tokio::spawn(async move { let mut messages = messages; - let loop_guard_clone = Mutex::new(loop_guard_clone); let max_iterations = 10; let mut iteration = 0; let mut total_input_tokens = 0u32; @@ -868,7 +761,7 @@ impl AgentLoop { } // Post-completion: middleware after_completion (memory extraction, etc.) - if let Some(ref chain) = middleware_chain { + { let mw_ctx = middleware::MiddlewareContext { agent_id: agent_id.clone(), session_id: session_id_clone.clone(), @@ -879,7 +772,7 @@ impl AgentLoop { input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; - if let Err(e) = chain.run_after_completion(&mw_ctx).await { + if let Err(e) = middleware_chain.run_after_completion(&mw_ctx).await { tracing::warn!("[AgentLoop] Streaming middleware after_completion failed: {}", e); } } @@ -911,8 +804,8 @@ impl AgentLoop { for (id, name, input) in pending_tool_calls { tracing::debug!("[AgentLoop] Executing tool: name={}, input={:?}", name, input); - // Check tool call safety — via middleware chain or inline loop guard - if let Some(ref chain) = middleware_chain { + // Check tool call safety — via middleware chain + { let mw_ctx = middleware::MiddlewareContext { agent_id: agent_id.clone(), session_id: session_id_clone.clone(), @@ -923,7 +816,7 @@ impl AgentLoop { input_tokens: total_input_tokens, output_tokens: total_output_tokens, }; - match chain.run_before_tool_call(&mw_ctx, &name, &input).await { + match middleware_chain.run_before_tool_call(&mw_ctx, &name, &input).await { Ok(middleware::ToolCallDecision::Allow) => {} Ok(middleware::ToolCallDecision::Block(msg)) => { tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg); @@ -995,30 +888,6 @@ impl AgentLoop { continue; } } - } else { - // Legacy inline loop guard path - let guard_result = loop_guard_clone.lock().unwrap_or_else(|e| e.into_inner()).check(&name, &input); - match guard_result { - LoopGuardResult::CircuitBreaker => { - if let Err(e) = tx.send(LoopEvent::Error("检测到工具调用循环,已自动终止".to_string())).await { - tracing::warn!("[AgentLoop] Failed to send Error event: {}", e); - } - break 'outer; - } - LoopGuardResult::Blocked => { - tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name); - let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" }); - if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await { - tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e); - } - messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true)); - continue; - } - LoopGuardResult::Warn => { - tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name); - } - LoopGuardResult::Allowed => {} - } } // Use pre-resolved path_validator (already has default fallback from create_tool_context logic) let pv = path_validator.clone().unwrap_or_else(|| {