refactor(runtime): loop_runner 双路径合并 — 统一走 middleware chain (Phase 3A)
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

middleware_chain 从 Option<MiddlewareChain> 改为 MiddlewareChain:
- 移除 6 处 use_middleware 分支 + 2 处 legacy loop_guard inline path
- 移除 loop_guard field + Mutex import + circuit_breaker_triggered 变量
- 空 chain (Default) 行为等价于 middleware path 中的 no-op
- 1154行 → 1023行,净减 131 行
- cargo check --workspace ✓ | cargo test ✓ (排除 desktop 预存编译问题)
This commit is contained in:
iven
2026-04-17 21:54:12 +08:00
parent 21c3222ad5
commit 5687dc20e0
3 changed files with 48 additions and 185 deletions

View File

@@ -83,10 +83,8 @@ impl Kernel {
loop_runner = loop_runner.with_path_validator(path_validator);
}
// Inject middleware chain if available
if let Some(chain) = self.create_middleware_chain() {
loop_runner = loop_runner.with_middleware_chain(chain);
}
// Inject middleware chain
loop_runner = loop_runner.with_middleware_chain(self.create_middleware_chain());
// Apply chat mode configuration (thinking/reasoning/plan mode)
if let Some(ref mode) = chat_mode {
@@ -198,10 +196,8 @@ impl Kernel {
loop_runner = loop_runner.with_path_validator(path_validator);
}
// Inject middleware chain if available
if let Some(chain) = self.create_middleware_chain() {
loop_runner = loop_runner.with_middleware_chain(chain);
}
// Inject middleware chain
loop_runner = loop_runner.with_middleware_chain(self.create_middleware_chain());
// Apply chat mode configuration (thinking/reasoning/plan mode from frontend)
if let Some(ref mode) = chat_mode {

View File

@@ -201,7 +201,7 @@ impl Kernel {
/// When middleware is configured, cross-cutting concerns (compaction, loop guard,
/// token calibration, etc.) are delegated to the chain. When no middleware is
/// registered, the legacy inline path in `AgentLoop` is used instead.
pub(crate) fn create_middleware_chain(&self) -> Option<zclaw_runtime::middleware::MiddlewareChain> {
pub(crate) fn create_middleware_chain(&self) -> zclaw_runtime::middleware::MiddlewareChain {
let mut chain = zclaw_runtime::middleware::MiddlewareChain::new();
// Butler router — semantic skill routing context injection
@@ -359,13 +359,11 @@ impl Kernel {
chain.register(Arc::new(mw));
}
// Only return Some if we actually registered middleware
if chain.is_empty() {
None
} else {
// Always return the chain (empty chain is a no-op)
if !chain.is_empty() {
tracing::info!("[Kernel] Middleware chain created with {} middlewares", chain.len());
Some(chain)
}
chain
}
/// Subscribe to events

View File

@@ -1,7 +1,6 @@
//! Agent loop implementation
use std::sync::Arc;
use std::sync::Mutex;
use futures::StreamExt;
use tokio::sync::mpsc;
use zclaw_types::{AgentId, SessionId, Message, Result};
@@ -10,7 +9,6 @@ use crate::driver::{LlmDriver, CompletionRequest, ContentBlock};
use crate::stream::StreamChunk;
use crate::tool::{ToolRegistry, ToolContext, SkillExecutor};
use crate::tool::builtin::PathValidator;
use crate::loop_guard::{LoopGuard, LoopGuardResult};
use crate::growth::GrowthIntegration;
use crate::compaction::{self, CompactionConfig};
use crate::middleware::{self, MiddlewareChain};
@@ -23,7 +21,6 @@ pub struct AgentLoop {
driver: Arc<dyn LlmDriver>,
tools: ToolRegistry,
memory: Arc<MemoryStore>,
loop_guard: Mutex<LoopGuard>,
model: String,
system_prompt: Option<String>,
/// Custom agent personality for prompt assembly
@@ -38,10 +35,9 @@ pub struct AgentLoop {
compaction_threshold: usize,
/// Compaction behavior configuration
compaction_config: CompactionConfig,
/// Optional middleware chain — when `Some`, cross-cutting logic is
/// delegated to the chain instead of the inline code below.
/// When `None`, the legacy inline path is used (100% backward compatible).
middleware_chain: Option<MiddlewareChain>,
/// Middleware chain — cross-cutting concerns are delegated to the chain.
/// An empty chain (Default) is a no-op: all `run_*` methods return Continue/Allow.
middleware_chain: MiddlewareChain,
/// Chat mode: extended thinking enabled
thinking_enabled: bool,
/// Chat mode: reasoning effort level
@@ -62,7 +58,6 @@ impl AgentLoop {
driver,
tools,
memory,
loop_guard: Mutex::new(LoopGuard::default()),
model: String::new(), // Must be set via with_model()
system_prompt: None,
soul: None,
@@ -73,7 +68,7 @@ impl AgentLoop {
growth: None,
compaction_threshold: 0,
compaction_config: CompactionConfig::default(),
middleware_chain: None,
middleware_chain: MiddlewareChain::default(),
thinking_enabled: false,
reasoning_effort: None,
plan_mode: false,
@@ -167,11 +162,10 @@ impl AgentLoop {
self
}
/// Inject a middleware chain. When set, cross-cutting concerns (compaction,
/// loop guard, token calibration, etc.) are delegated to the chain instead
/// of the inline logic.
/// Inject a middleware chain. Cross-cutting concerns (compaction,
/// loop guard, token calibration, etc.) are delegated to the chain.
pub fn with_middleware_chain(mut self, chain: MiddlewareChain) -> Self {
self.middleware_chain = Some(chain);
self.middleware_chain = chain;
self
}
@@ -227,49 +221,19 @@ impl AgentLoop {
// Get all messages for context
let mut messages = self.memory.get_messages(&session_id).await?;
let use_middleware = self.middleware_chain.is_some();
// Apply compaction — skip inline path when middleware chain handles it
if !use_middleware && self.compaction_threshold > 0 {
let needs_async =
self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled;
if needs_async {
let outcome = compaction::maybe_compact_with_config(
messages,
self.compaction_threshold,
&self.compaction_config,
&self.agent_id,
&session_id,
Some(&self.driver),
self.growth.as_ref(),
)
.await;
messages = outcome.messages;
} else {
messages = compaction::maybe_compact(messages, self.compaction_threshold);
}
}
// Enhance system prompt — skip when middleware chain handles it
let mut enhanced_prompt = if use_middleware {
let prompt_ctx = PromptContext {
base_prompt: self.system_prompt.clone(),
soul: self.soul.clone(),
thinking_enabled: self.thinking_enabled,
plan_mode: self.plan_mode,
tool_definitions: self.tools.definitions(),
agent_name: None,
};
PromptBuilder::new().build(&prompt_ctx)
} else if let Some(ref growth) = self.growth {
let base = self.system_prompt.as_deref().unwrap_or("");
growth.enhance_prompt(&self.agent_id, base, &input).await?
} else {
self.system_prompt.clone().unwrap_or_default()
// Enhance system prompt via PromptBuilder (middleware may further modify)
let prompt_ctx = PromptContext {
base_prompt: self.system_prompt.clone(),
soul: self.soul.clone(),
thinking_enabled: self.thinking_enabled,
plan_mode: self.plan_mode,
tool_definitions: self.tools.definitions(),
agent_name: None,
};
let mut enhanced_prompt = PromptBuilder::new().build(&prompt_ctx);
// Run middleware before_completion hooks (compaction, memory inject, etc.)
if let Some(ref chain) = self.middleware_chain {
{
let mut mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
@@ -280,7 +244,7 @@ impl AgentLoop {
input_tokens: 0,
output_tokens: 0,
};
match chain.run_before_completion(&mut mw_ctx).await? {
match self.middleware_chain.run_before_completion(&mut mw_ctx).await? {
middleware::MiddlewareDecision::Continue => {
messages = mw_ctx.messages;
enhanced_prompt = mw_ctx.system_prompt;
@@ -400,7 +364,6 @@ impl AgentLoop {
// Create tool context and execute all tools
let tool_context = self.create_tool_context(session_id.clone());
let mut circuit_breaker_triggered = false;
let mut abort_result: Option<AgentLoopResult> = None;
let mut clarification_result: Option<AgentLoopResult> = None;
for (id, name, input) in tool_calls {
@@ -408,8 +371,8 @@ impl AgentLoop {
if abort_result.is_some() {
break;
}
// Check tool call safety — via middleware chain or inline loop guard
if let Some(ref chain) = self.middleware_chain {
// Check tool call safety — via middleware chain
{
let mw_ctx_ref = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
@@ -420,7 +383,7 @@ impl AgentLoop {
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
match chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? {
match self.middleware_chain.run_before_tool_call(&mw_ctx_ref, &name, &input).await? {
middleware::ToolCallDecision::Allow => {}
middleware::ToolCallDecision::Block(msg) => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
@@ -456,26 +419,6 @@ impl AgentLoop {
});
}
}
} else {
// Legacy inline path
let guard_result = self.loop_guard.lock().unwrap_or_else(|e| e.into_inner()).check(&name, &input);
match guard_result {
LoopGuardResult::CircuitBreaker => {
tracing::warn!("[AgentLoop] Circuit breaker triggered by tool '{}'", name);
circuit_breaker_triggered = true;
break;
}
LoopGuardResult::Blocked => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name);
let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" });
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true));
continue;
}
LoopGuardResult::Warn => {
tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name);
}
LoopGuardResult::Allowed => {}
}
}
let tool_result = match tokio::time::timeout(
@@ -537,21 +480,10 @@ impl AgentLoop {
break result;
}
// If circuit breaker was triggered, terminate immediately
if circuit_breaker_triggered {
let msg = "检测到工具调用循环,已自动终止";
self.memory.append_message(&session_id, &Message::assistant(msg)).await?;
break AgentLoopResult {
response: msg.to_string(),
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
iterations,
};
}
};
// Post-completion processing — middleware chain or inline growth
if let Some(ref chain) = self.middleware_chain {
// Post-completion processing — middleware chain
{
let mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
@@ -562,16 +494,9 @@ impl AgentLoop {
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
if let Err(e) = chain.run_after_completion(&mw_ctx).await {
if let Err(e) = self.middleware_chain.run_after_completion(&mw_ctx).await {
tracing::warn!("[AgentLoop] Middleware after_completion failed: {}", e);
}
} else if let Some(ref growth) = self.growth {
// Legacy inline path
if let Ok(all_messages) = self.memory.get_messages(&session_id).await {
if let Err(e) = growth.process_conversation(&self.agent_id, &all_messages, session_id.clone()).await {
tracing::warn!("[AgentLoop] Growth processing failed: {}", e);
}
}
}
Ok(result)
@@ -593,49 +518,19 @@ impl AgentLoop {
// Get all messages for context
let mut messages = self.memory.get_messages(&session_id).await?;
let use_middleware = self.middleware_chain.is_some();
// Apply compaction — skip inline path when middleware chain handles it
if !use_middleware && self.compaction_threshold > 0 {
let needs_async =
self.compaction_config.use_llm || self.compaction_config.memory_flush_enabled;
if needs_async {
let outcome = compaction::maybe_compact_with_config(
messages,
self.compaction_threshold,
&self.compaction_config,
&self.agent_id,
&session_id,
Some(&self.driver),
self.growth.as_ref(),
)
.await;
messages = outcome.messages;
} else {
messages = compaction::maybe_compact(messages, self.compaction_threshold);
}
}
// Enhance system prompt — skip when middleware chain handles it
let mut enhanced_prompt = if use_middleware {
let prompt_ctx = PromptContext {
base_prompt: self.system_prompt.clone(),
soul: self.soul.clone(),
thinking_enabled: self.thinking_enabled,
plan_mode: self.plan_mode,
tool_definitions: self.tools.definitions(),
agent_name: None,
};
PromptBuilder::new().build(&prompt_ctx)
} else if let Some(ref growth) = self.growth {
let base = self.system_prompt.as_deref().unwrap_or("");
growth.enhance_prompt(&self.agent_id, base, &input).await?
} else {
self.system_prompt.clone().unwrap_or_default()
// Enhance system prompt via PromptBuilder (middleware may further modify)
let prompt_ctx = PromptContext {
base_prompt: self.system_prompt.clone(),
soul: self.soul.clone(),
thinking_enabled: self.thinking_enabled,
plan_mode: self.plan_mode,
tool_definitions: self.tools.definitions(),
agent_name: None,
};
let mut enhanced_prompt = PromptBuilder::new().build(&prompt_ctx);
// Run middleware before_completion hooks (compaction, memory inject, etc.)
if let Some(ref chain) = self.middleware_chain {
{
let mut mw_ctx = middleware::MiddlewareContext {
agent_id: self.agent_id.clone(),
session_id: session_id.clone(),
@@ -646,7 +541,7 @@ impl AgentLoop {
input_tokens: 0,
output_tokens: 0,
};
match chain.run_before_completion(&mut mw_ctx).await? {
match self.middleware_chain.run_before_completion(&mut mw_ctx).await? {
middleware::MiddlewareDecision::Continue => {
messages = mw_ctx.messages;
enhanced_prompt = mw_ctx.system_prompt;
@@ -670,7 +565,6 @@ impl AgentLoop {
let memory = self.memory.clone();
let driver = self.driver.clone();
let tools = self.tools.clone();
let loop_guard_clone = self.loop_guard.lock().unwrap_or_else(|e| e.into_inner()).clone();
let middleware_chain = self.middleware_chain.clone();
let skill_executor = self.skill_executor.clone();
let path_validator = self.path_validator.clone();
@@ -684,7 +578,6 @@ impl AgentLoop {
tokio::spawn(async move {
let mut messages = messages;
let loop_guard_clone = Mutex::new(loop_guard_clone);
let max_iterations = 10;
let mut iteration = 0;
let mut total_input_tokens = 0u32;
@@ -868,7 +761,7 @@ impl AgentLoop {
}
// Post-completion: middleware after_completion (memory extraction, etc.)
if let Some(ref chain) = middleware_chain {
{
let mw_ctx = middleware::MiddlewareContext {
agent_id: agent_id.clone(),
session_id: session_id_clone.clone(),
@@ -879,7 +772,7 @@ impl AgentLoop {
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
if let Err(e) = chain.run_after_completion(&mw_ctx).await {
if let Err(e) = middleware_chain.run_after_completion(&mw_ctx).await {
tracing::warn!("[AgentLoop] Streaming middleware after_completion failed: {}", e);
}
}
@@ -911,8 +804,8 @@ impl AgentLoop {
for (id, name, input) in pending_tool_calls {
tracing::debug!("[AgentLoop] Executing tool: name={}, input={:?}", name, input);
// Check tool call safety — via middleware chain or inline loop guard
if let Some(ref chain) = middleware_chain {
// Check tool call safety — via middleware chain
{
let mw_ctx = middleware::MiddlewareContext {
agent_id: agent_id.clone(),
session_id: session_id_clone.clone(),
@@ -923,7 +816,7 @@ impl AgentLoop {
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
};
match chain.run_before_tool_call(&mw_ctx, &name, &input).await {
match middleware_chain.run_before_tool_call(&mw_ctx, &name, &input).await {
Ok(middleware::ToolCallDecision::Allow) => {}
Ok(middleware::ToolCallDecision::Block(msg)) => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by middleware: {}", name, msg);
@@ -995,30 +888,6 @@ impl AgentLoop {
continue;
}
}
} else {
// Legacy inline loop guard path
let guard_result = loop_guard_clone.lock().unwrap_or_else(|e| e.into_inner()).check(&name, &input);
match guard_result {
LoopGuardResult::CircuitBreaker => {
if let Err(e) = tx.send(LoopEvent::Error("检测到工具调用循环,已自动终止".to_string())).await {
tracing::warn!("[AgentLoop] Failed to send Error event: {}", e);
}
break 'outer;
}
LoopGuardResult::Blocked => {
tracing::warn!("[AgentLoop] Tool '{}' blocked by loop guard", name);
let error_output = serde_json::json!({ "error": "工具调用被循环防护拦截" });
if let Err(e) = tx.send(LoopEvent::ToolEnd { name: name.clone(), output: error_output.clone() }).await {
tracing::warn!("[AgentLoop] Failed to send ToolEnd event: {}", e);
}
messages.push(Message::tool_result(id, zclaw_types::ToolId::new(&name), error_output, true));
continue;
}
LoopGuardResult::Warn => {
tracing::warn!("[AgentLoop] Tool '{}' triggered loop guard warning", name);
}
LoopGuardResult::Allowed => {}
}
}
// Use pre-resolved path_validator (already has default fallback from create_tool_context logic)
let pv = path_validator.clone().unwrap_or_else(|| {