feat(a2a): 消息重入队列 + 广播丢弃修复 + Router group 管理
A2A 协议完善 (feature-gated by multi-agent): - AgentInbox wrapper: VecDeque 暂存非匹配消息,requeue 替代丢弃 - a2a_delegate_task: 非匹配消息安全重入队列,不再静默丢弃 - A2aRouter: 广播/组播改用 try_send + 日志,避免持有 RwLock 跨 await - 新增 group 管理方法: add_to_group/remove_from_group/list_groups/get_group_members - 修复 Capability import 在 multi-agent feature 下的编译问题
This commit is contained in:
@@ -5,6 +5,8 @@ use std::sync::Arc;
|
|||||||
use tokio::sync::{broadcast, mpsc, Mutex};
|
use tokio::sync::{broadcast, mpsc, Mutex};
|
||||||
use zclaw_types::{AgentConfig, AgentId, AgentInfo, Event, Result, HandRun, HandRunId, HandRunStatus, HandRunFilter, TriggerSource};
|
use zclaw_types::{AgentConfig, AgentId, AgentInfo, Event, Result, HandRun, HandRunId, HandRunStatus, HandRunFilter, TriggerSource};
|
||||||
#[cfg(feature = "multi-agent")]
|
#[cfg(feature = "multi-agent")]
|
||||||
|
use zclaw_types::Capability;
|
||||||
|
#[cfg(feature = "multi-agent")]
|
||||||
use zclaw_protocols::{A2aRouter, A2aAgentProfile, A2aCapability, A2aEnvelope, A2aMessageType, A2aRecipient};
|
use zclaw_protocols::{A2aRouter, A2aAgentProfile, A2aCapability, A2aEnvelope, A2aMessageType, A2aRecipient};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
@@ -114,6 +116,39 @@ impl SkillExecutor for KernelSkillExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Inbox wrapper for A2A message receivers that supports re-queuing
|
||||||
|
/// non-matching messages instead of dropping them.
|
||||||
|
#[cfg(feature = "multi-agent")]
|
||||||
|
struct AgentInbox {
|
||||||
|
rx: tokio::sync::mpsc::Receiver<A2aEnvelope>,
|
||||||
|
pending: std::collections::VecDeque<A2aEnvelope>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "multi-agent")]
|
||||||
|
impl AgentInbox {
|
||||||
|
fn new(rx: tokio::sync::mpsc::Receiver<A2aEnvelope>) -> Self {
|
||||||
|
Self { rx, pending: std::collections::VecDeque::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_recv(&mut self) -> std::result::Result<A2aEnvelope, tokio::sync::mpsc::error::TryRecvError> {
|
||||||
|
if let Some(msg) = self.pending.pop_front() {
|
||||||
|
return Ok(msg);
|
||||||
|
}
|
||||||
|
self.rx.try_recv()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn recv(&mut self) -> Option<A2aEnvelope> {
|
||||||
|
if let Some(msg) = self.pending.pop_front() {
|
||||||
|
return Some(msg);
|
||||||
|
}
|
||||||
|
self.rx.recv().await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn requeue(&mut self, envelope: A2aEnvelope) {
|
||||||
|
self.pending.push_back(envelope);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The ZCLAW Kernel
|
/// The ZCLAW Kernel
|
||||||
pub struct Kernel {
|
pub struct Kernel {
|
||||||
config: KernelConfig,
|
config: KernelConfig,
|
||||||
@@ -137,9 +172,9 @@ pub struct Kernel {
|
|||||||
/// A2A router for inter-agent messaging (gated by multi-agent feature)
|
/// A2A router for inter-agent messaging (gated by multi-agent feature)
|
||||||
#[cfg(feature = "multi-agent")]
|
#[cfg(feature = "multi-agent")]
|
||||||
a2a_router: Arc<A2aRouter>,
|
a2a_router: Arc<A2aRouter>,
|
||||||
/// Per-agent A2A inbox receivers
|
/// Per-agent A2A inbox receivers (supports re-queuing non-matching messages)
|
||||||
#[cfg(feature = "multi-agent")]
|
#[cfg(feature = "multi-agent")]
|
||||||
a2a_inboxes: Arc<dashmap::DashMap<AgentId, Arc<Mutex<mpsc::Receiver<A2aEnvelope>>>>>,
|
a2a_inboxes: Arc<dashmap::DashMap<AgentId, Arc<Mutex<AgentInbox>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Kernel {
|
impl Kernel {
|
||||||
@@ -440,7 +475,7 @@ impl Kernel {
|
|||||||
{
|
{
|
||||||
let profile = Self::agent_config_to_a2a_profile(&config);
|
let profile = Self::agent_config_to_a2a_profile(&config);
|
||||||
let rx = self.a2a_router.register_agent(profile).await;
|
let rx = self.a2a_router.register_agent(profile).await;
|
||||||
self.a2a_inboxes.insert(id, Arc::new(Mutex::new(rx)));
|
self.a2a_inboxes.insert(id, Arc::new(Mutex::new(AgentInbox::new(rx))));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register in registry (consumes config)
|
// Register in registry (consumes config)
|
||||||
@@ -1333,8 +1368,8 @@ impl Kernel {
|
|||||||
format!("No A2A inbox for agent: {}", agent_id)
|
format!("No A2A inbox for agent: {}", agent_id)
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
let mut rx = inbox.lock().await;
|
let mut inbox = inbox.lock().await;
|
||||||
match rx.try_recv() {
|
match inbox.try_recv() {
|
||||||
Ok(envelope) => {
|
Ok(envelope) => {
|
||||||
self.events.publish(Event::A2aMessageReceived {
|
self.events.publish(Event::A2aMessageReceived {
|
||||||
from: envelope.from,
|
from: envelope.from,
|
||||||
@@ -1393,23 +1428,24 @@ impl Kernel {
|
|||||||
// Wait for response with timeout
|
// Wait for response with timeout
|
||||||
let timeout = tokio::time::Duration::from_millis(timeout_ms);
|
let timeout = tokio::time::Duration::from_millis(timeout_ms);
|
||||||
let result = tokio::time::timeout(timeout, async {
|
let result = tokio::time::timeout(timeout, async {
|
||||||
let inbox = self.a2a_inboxes.get(from)
|
let inbox_entry = self.a2a_inboxes.get(from)
|
||||||
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
|
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
|
||||||
format!("No A2A inbox for agent: {}", from)
|
format!("No A2A inbox for agent: {}", from)
|
||||||
))?;
|
))?;
|
||||||
let mut rx = inbox.lock().await;
|
let mut inbox = inbox_entry.lock().await;
|
||||||
|
|
||||||
// Poll for matching response
|
// Poll for matching response
|
||||||
loop {
|
loop {
|
||||||
match rx.recv().await {
|
match inbox.recv().await {
|
||||||
Some(msg) => {
|
Some(msg) => {
|
||||||
// Check if this is a response to our task
|
// Check if this is a response to our task
|
||||||
if msg.message_type == A2aMessageType::Response
|
if msg.message_type == A2aMessageType::Response
|
||||||
&& msg.reply_to.as_deref() == Some(&envelope_id) {
|
&& msg.reply_to.as_deref() == Some(&envelope_id) {
|
||||||
return Ok::<_, zclaw_types::ZclawError>(msg.payload);
|
return Ok::<_, zclaw_types::ZclawError>(msg.payload);
|
||||||
}
|
}
|
||||||
// Not our response — put it back by logging it (would need a re-queue mechanism for production)
|
// Not our response — requeue it for later processing
|
||||||
tracing::warn!("Received non-matching A2A response, discarding: {}", msg.id);
|
tracing::debug!("Re-queuing non-matching A2A message: {}", msg.id);
|
||||||
|
inbox.requeue(msg);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
return Err(zclaw_types::ZclawError::Internal(
|
return Err(zclaw_types::ZclawError::Internal(
|
||||||
|
|||||||
@@ -405,7 +405,15 @@ impl A2aRouter {
|
|||||||
if let Some(members) = groups.get(group_id) {
|
if let Some(members) = groups.get(group_id) {
|
||||||
for agent_id in members {
|
for agent_id in members {
|
||||||
if let Some(tx) = queues.get(agent_id) {
|
if let Some(tx) = queues.get(agent_id) {
|
||||||
let _ = tx.send(envelope.clone()).await;
|
match tx.try_send(envelope.clone()) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
|
||||||
|
tracing::warn!("A2A delivery to agent {} dropped: channel full", agent_id);
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
|
||||||
|
tracing::warn!("A2A delivery to agent {} dropped: channel closed", agent_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -414,7 +422,15 @@ impl A2aRouter {
|
|||||||
// Broadcast to all registered agents
|
// Broadcast to all registered agents
|
||||||
for (agent_id, tx) in queues.iter() {
|
for (agent_id, tx) in queues.iter() {
|
||||||
if agent_id != &envelope.from {
|
if agent_id != &envelope.from {
|
||||||
let _ = tx.send(envelope.clone()).await;
|
match tx.try_send(envelope.clone()) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
|
||||||
|
tracing::warn!("A2A delivery to agent {} dropped: channel full", agent_id);
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
|
||||||
|
tracing::warn!("A2A delivery to agent {} dropped: channel closed", agent_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -444,6 +460,35 @@ impl A2aRouter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add agent to a group (creates group if not exists)
|
||||||
|
pub async fn add_to_group(&self, group_id: &str, agent_id: AgentId) {
|
||||||
|
let mut groups = self.groups.write().await;
|
||||||
|
let members = groups.entry(group_id.to_string()).or_insert_with(Vec::new);
|
||||||
|
if !members.contains(&agent_id) {
|
||||||
|
members.push(agent_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove agent from a group
|
||||||
|
pub async fn remove_from_group(&self, group_id: &str, agent_id: &AgentId) {
|
||||||
|
let mut groups = self.groups.write().await;
|
||||||
|
if let Some(members) = groups.get_mut(group_id) {
|
||||||
|
members.retain(|id| id != agent_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all groups
|
||||||
|
pub async fn list_groups(&self) -> Vec<String> {
|
||||||
|
let groups = self.groups.read().await;
|
||||||
|
groups.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get members of a group
|
||||||
|
pub async fn get_group_members(&self, group_id: &str) -> Vec<AgentId> {
|
||||||
|
let groups = self.groups.read().await;
|
||||||
|
groups.get(group_id).cloned().unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
/// Get all registered agent profiles
|
/// Get all registered agent profiles
|
||||||
pub async fn list_profiles(&self) -> Vec<A2aAgentProfile> {
|
pub async fn list_profiles(&self) -> Vec<A2aAgentProfile> {
|
||||||
let profiles = self.profiles.read().await;
|
let profiles = self.profiles.read().await;
|
||||||
|
|||||||
Reference in New Issue
Block a user