From 6529b673534f8b2d6f8487839712a673c8f535c2 Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 30 Mar 2026 19:55:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(a2a):=20=E6=B6=88=E6=81=AF=E9=87=8D?= =?UTF-8?q?=E5=85=A5=E9=98=9F=E5=88=97=20+=20=E5=B9=BF=E6=92=AD=E4=B8=A2?= =?UTF-8?q?=E5=BC=83=E4=BF=AE=E5=A4=8D=20+=20Router=20group=20=E7=AE=A1?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A2A 协议完善 (feature-gated by multi-agent): - AgentInbox wrapper: VecDeque 暂存非匹配消息,requeue 替代丢弃 - a2a_delegate_task: 非匹配消息安全重入队列,不再静默丢弃 - A2aRouter: 广播/组播改用 try_send + 日志,避免持有 RwLock 跨 await - 新增 group 管理方法: add_to_group/remove_from_group/list_groups/get_group_members - 修复 Capability import 在 multi-agent feature 下的编译问题 --- crates/zclaw-kernel/src/kernel.rs | 56 +++++++++++++++++++++++++------ crates/zclaw-protocols/src/a2a.rs | 49 +++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 12 deletions(-) diff --git a/crates/zclaw-kernel/src/kernel.rs b/crates/zclaw-kernel/src/kernel.rs index c2d80d9..763fd2a 100644 --- a/crates/zclaw-kernel/src/kernel.rs +++ b/crates/zclaw-kernel/src/kernel.rs @@ -5,6 +5,8 @@ use std::sync::Arc; use tokio::sync::{broadcast, mpsc, Mutex}; use zclaw_types::{AgentConfig, AgentId, AgentInfo, Event, Result, HandRun, HandRunId, HandRunStatus, HandRunFilter, TriggerSource}; #[cfg(feature = "multi-agent")] +use zclaw_types::Capability; +#[cfg(feature = "multi-agent")] use zclaw_protocols::{A2aRouter, A2aAgentProfile, A2aCapability, A2aEnvelope, A2aMessageType, A2aRecipient}; use async_trait::async_trait; use serde_json::Value; @@ -114,6 +116,39 @@ impl SkillExecutor for KernelSkillExecutor { } } +/// Inbox wrapper for A2A message receivers that supports re-queuing +/// non-matching messages instead of dropping them. +#[cfg(feature = "multi-agent")] +struct AgentInbox { + rx: tokio::sync::mpsc::Receiver, + pending: std::collections::VecDeque, +} + +#[cfg(feature = "multi-agent")] +impl AgentInbox { + fn new(rx: tokio::sync::mpsc::Receiver) -> Self { + Self { rx, pending: std::collections::VecDeque::new() } + } + + fn try_recv(&mut self) -> std::result::Result { + if let Some(msg) = self.pending.pop_front() { + return Ok(msg); + } + self.rx.try_recv() + } + + async fn recv(&mut self) -> Option { + if let Some(msg) = self.pending.pop_front() { + return Some(msg); + } + self.rx.recv().await + } + + fn requeue(&mut self, envelope: A2aEnvelope) { + self.pending.push_back(envelope); + } +} + /// The ZCLAW Kernel pub struct Kernel { config: KernelConfig, @@ -137,9 +172,9 @@ pub struct Kernel { /// A2A router for inter-agent messaging (gated by multi-agent feature) #[cfg(feature = "multi-agent")] a2a_router: Arc, - /// Per-agent A2A inbox receivers + /// Per-agent A2A inbox receivers (supports re-queuing non-matching messages) #[cfg(feature = "multi-agent")] - a2a_inboxes: Arc>>>>, + a2a_inboxes: Arc>>>, } impl Kernel { @@ -440,7 +475,7 @@ impl Kernel { { let profile = Self::agent_config_to_a2a_profile(&config); let rx = self.a2a_router.register_agent(profile).await; - self.a2a_inboxes.insert(id, Arc::new(Mutex::new(rx))); + self.a2a_inboxes.insert(id, Arc::new(Mutex::new(AgentInbox::new(rx)))); } // Register in registry (consumes config) @@ -1333,8 +1368,8 @@ impl Kernel { format!("No A2A inbox for agent: {}", agent_id) ))?; - let mut rx = inbox.lock().await; - match rx.try_recv() { + let mut inbox = inbox.lock().await; + match inbox.try_recv() { Ok(envelope) => { self.events.publish(Event::A2aMessageReceived { from: envelope.from, @@ -1393,23 +1428,24 @@ impl Kernel { // Wait for response with timeout let timeout = tokio::time::Duration::from_millis(timeout_ms); let result = tokio::time::timeout(timeout, async { - let inbox = self.a2a_inboxes.get(from) + let inbox_entry = self.a2a_inboxes.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("No A2A inbox for agent: {}", from) ))?; - let mut rx = inbox.lock().await; + let mut inbox = inbox_entry.lock().await; // Poll for matching response loop { - match rx.recv().await { + match inbox.recv().await { Some(msg) => { // Check if this is a response to our task if msg.message_type == A2aMessageType::Response && msg.reply_to.as_deref() == Some(&envelope_id) { return Ok::<_, zclaw_types::ZclawError>(msg.payload); } - // Not our response — put it back by logging it (would need a re-queue mechanism for production) - tracing::warn!("Received non-matching A2A response, discarding: {}", msg.id); + // Not our response — requeue it for later processing + tracing::debug!("Re-queuing non-matching A2A message: {}", msg.id); + inbox.requeue(msg); } None => { return Err(zclaw_types::ZclawError::Internal( diff --git a/crates/zclaw-protocols/src/a2a.rs b/crates/zclaw-protocols/src/a2a.rs index 7792ca4..9e74404 100644 --- a/crates/zclaw-protocols/src/a2a.rs +++ b/crates/zclaw-protocols/src/a2a.rs @@ -405,7 +405,15 @@ impl A2aRouter { if let Some(members) = groups.get(group_id) { for agent_id in members { if let Some(tx) = queues.get(agent_id) { - let _ = tx.send(envelope.clone()).await; + match tx.try_send(envelope.clone()) { + Ok(()) => {}, + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!("A2A delivery to agent {} dropped: channel full", agent_id); + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + tracing::warn!("A2A delivery to agent {} dropped: channel closed", agent_id); + } + } } } } @@ -414,7 +422,15 @@ impl A2aRouter { // Broadcast to all registered agents for (agent_id, tx) in queues.iter() { if agent_id != &envelope.from { - let _ = tx.send(envelope.clone()).await; + match tx.try_send(envelope.clone()) { + Ok(()) => {}, + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!("A2A delivery to agent {} dropped: channel full", agent_id); + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + tracing::warn!("A2A delivery to agent {} dropped: channel closed", agent_id); + } + } } } } @@ -444,6 +460,35 @@ impl A2aRouter { } } + /// Add agent to a group (creates group if not exists) + pub async fn add_to_group(&self, group_id: &str, agent_id: AgentId) { + let mut groups = self.groups.write().await; + let members = groups.entry(group_id.to_string()).or_insert_with(Vec::new); + if !members.contains(&agent_id) { + members.push(agent_id); + } + } + + /// Remove agent from a group + pub async fn remove_from_group(&self, group_id: &str, agent_id: &AgentId) { + let mut groups = self.groups.write().await; + if let Some(members) = groups.get_mut(group_id) { + members.retain(|id| id != agent_id); + } + } + + /// List all groups + pub async fn list_groups(&self) -> Vec { + let groups = self.groups.read().await; + groups.keys().cloned().collect() + } + + /// Get members of a group + pub async fn get_group_members(&self, group_id: &str) -> Vec { + let groups = self.groups.read().await; + groups.get(group_id).cloned().unwrap_or_default() + } + /// Get all registered agent profiles pub async fn list_profiles(&self) -> Vec { let profiles = self.profiles.read().await;