Files
zclaw_openfang/crates/zclaw-protocols/src/a2a.rs
iven 9af7b0dd46
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(kernel): enable multi-agent compilation + A2A routing tests
- director.rs: add missing CompletionRequest fields (thinking_enabled,
  reasoning_effort, plan_mode) for multi-agent feature gate
- agents.rs: remove unused AgentState import behind multi-agent feature
- lib.rs: replace ambiguous glob re-export with explicit director types,
  resolving AgentRole conflict between director and generation modules
- a2a.rs: add 5 integration tests covering direct message delivery,
  broadcast routing, group messaging, agent unregistration, and
  expired message rejection (10 total A2A tests, all passing)
- Verified: 537 workspace tests pass with multi-agent feature enabled

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-04 09:41:24 +08:00

938 lines
31 KiB
Rust

//! A2A (Agent-to-Agent) protocol support
//!
//! Implements communication between AI agents with support for:
//! - Direct messaging (point-to-point)
//! - Group messaging (multicast)
//! - Broadcast messaging (all agents)
//! - Capability discovery and advertisement
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use zclaw_types::{AgentId, Result, ZclawError};
/// Default channel buffer size
const DEFAULT_CHANNEL_SIZE: usize = 256;
/// A2A message envelope
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2aEnvelope {
/// Message ID (UUID recommended)
pub id: String,
/// Sender agent ID
pub from: AgentId,
/// Recipient specification
pub to: A2aRecipient,
/// Message type
pub message_type: A2aMessageType,
/// Message payload (JSON)
pub payload: serde_json::Value,
/// Timestamp (Unix epoch milliseconds)
pub timestamp: i64,
/// Conversation/thread ID for grouping related messages
pub conversation_id: Option<String>,
/// Reply-to message ID for threading
pub reply_to: Option<String>,
/// Priority (0 = normal, higher = more urgent)
#[serde(default)]
pub priority: u8,
/// Time-to-live in seconds (0 = no expiry)
#[serde(default)]
pub ttl: u32,
}
impl A2aEnvelope {
/// Create a new envelope with auto-generated ID and timestamp
pub fn new(from: AgentId, to: A2aRecipient, message_type: A2aMessageType, payload: serde_json::Value) -> Self {
Self {
id: uuid_v4(),
from,
to,
message_type,
payload,
timestamp: current_timestamp(),
conversation_id: None,
reply_to: None,
priority: 0,
ttl: 0,
}
}
/// Set conversation ID
pub fn with_conversation(mut self, conversation_id: impl Into<String>) -> Self {
self.conversation_id = Some(conversation_id.into());
self
}
/// Set reply-to message ID
pub fn with_reply_to(mut self, reply_to: impl Into<String>) -> Self {
self.reply_to = Some(reply_to.into());
self
}
/// Set priority
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
/// Check if message has expired
pub fn is_expired(&self) -> bool {
if self.ttl == 0 {
return false;
}
let now = current_timestamp();
let expiry = self.timestamp + (self.ttl as i64 * 1000);
now > expiry
}
}
/// Recipient specification
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum A2aRecipient {
/// Direct message to specific agent
Direct { agent_id: AgentId },
/// Message to all agents in a group
Group { group_id: String },
/// Broadcast to all agents
Broadcast,
}
impl std::fmt::Display for A2aRecipient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
A2aRecipient::Direct { agent_id } => write!(f, "direct:{}", agent_id),
A2aRecipient::Group { group_id } => write!(f, "group:{}", group_id),
A2aRecipient::Broadcast => write!(f, "broadcast"),
}
}
}
/// A2A message types
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum A2aMessageType {
/// Request for information or action (expects response)
Request,
/// Response to a request
Response,
/// Notification (no response expected)
Notification,
/// Error message
Error,
/// Heartbeat/ping
Heartbeat,
/// Capability advertisement
Capability,
/// Task delegation
Task,
/// Task status update
TaskStatus,
}
/// Agent capability advertisement
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2aCapability {
/// Capability name (e.g., "code-generation", "web-search")
pub name: String,
/// Human-readable description
pub description: String,
/// JSON Schema for input validation
pub input_schema: Option<serde_json::Value>,
/// JSON Schema for output validation
pub output_schema: Option<serde_json::Value>,
/// Whether this capability requires human approval
pub requires_approval: bool,
/// Capability version
#[serde(default)]
pub version: String,
/// Tags for categorization
#[serde(default)]
pub tags: Vec<String>,
}
/// Agent profile for A2A
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2aAgentProfile {
/// Agent ID
pub id: AgentId,
/// Display name
pub name: String,
/// Agent description
pub description: String,
/// Advertised capabilities
pub capabilities: Vec<A2aCapability>,
/// Supported protocols
pub protocols: Vec<String>,
/// Agent role (e.g., "teacher", "assistant", "worker")
#[serde(default)]
pub role: String,
/// Priority for task assignment (higher = more priority)
#[serde(default)]
pub priority: u8,
/// Additional metadata
#[serde(default)]
pub metadata: HashMap<String, String>,
/// Groups this agent belongs to
#[serde(default)]
pub groups: Vec<String>,
/// Last seen timestamp
#[serde(default)]
pub last_seen: i64,
}
impl A2aAgentProfile {
/// Check if agent has a specific capability
pub fn has_capability(&self, name: &str) -> bool {
self.capabilities.iter().any(|c| c.name == name)
}
/// Get capability by name
pub fn get_capability(&self, name: &str) -> Option<&A2aCapability> {
self.capabilities.iter().find(|c| c.name == name)
}
}
/// A2A client trait
#[async_trait]
pub trait A2aClient: Send + Sync {
/// Send a message to another agent
async fn send(&self, envelope: A2aEnvelope) -> Result<()>;
/// Receive the next message (blocking)
async fn recv(&self) -> Option<A2aEnvelope>;
/// Try to receive a message without blocking
fn try_recv(&self) -> Result<A2aEnvelope>;
/// Get agent profile by ID
async fn get_profile(&self, agent_id: &AgentId) -> Result<Option<A2aAgentProfile>>;
/// Discover agents with specific capability
async fn discover(&self, capability: &str) -> Result<Vec<A2aAgentProfile>>;
/// Advertise own capabilities
async fn advertise(&self, profile: A2aAgentProfile) -> Result<()>;
/// Join a group
async fn join_group(&self, group_id: &str) -> Result<()>;
/// Leave a group
async fn leave_group(&self, group_id: &str) -> Result<()>;
/// Get all agents in a group
async fn get_group_members(&self, group_id: &str) -> Result<Vec<AgentId>>;
/// Get all online agents
async fn get_online_agents(&self) -> Result<Vec<A2aAgentProfile>>;
}
/// A2A Router - manages message routing between agents
pub struct A2aRouter {
/// Agent ID for this router instance
agent_id: AgentId,
/// Agent profiles registry
profiles: Arc<RwLock<HashMap<AgentId, A2aAgentProfile>>>,
/// Agent message queues (inbox for each agent) - using broadcast for multiple subscribers
queues: Arc<RwLock<HashMap<AgentId, mpsc::Sender<A2aEnvelope>>>>,
/// Group membership mapping (group_id -> agent_ids)
groups: Arc<RwLock<HashMap<String, Vec<AgentId>>>>,
/// Capability index (capability_name -> agent_ids)
capability_index: Arc<RwLock<HashMap<String, Vec<AgentId>>>>,
/// Channel size for message queues
channel_size: usize,
}
/// Handle for receiving A2A messages
///
/// This struct provides a way to receive messages from the A2A router.
/// It stores the receiver internally and provides methods to receive messages.
pub struct A2aReceiver {
receiver: Option<mpsc::Receiver<A2aEnvelope>>,
}
impl A2aReceiver {
#[allow(dead_code)] // Will be used when A2A message channels are activated
fn new(rx: mpsc::Receiver<A2aEnvelope>) -> Self {
Self { receiver: Some(rx) }
}
/// Receive the next message (async)
pub async fn recv(&mut self) -> Option<A2aEnvelope> {
if let Some(ref mut rx) = self.receiver {
rx.recv().await
} else {
None
}
}
/// Try to receive a message without blocking
pub fn try_recv(&mut self) -> Result<A2aEnvelope> {
if let Some(ref mut rx) = self.receiver {
rx.try_recv()
.map_err(|e| ZclawError::Internal(format!("Receive error: {}", e)))
} else {
Err(ZclawError::Internal("No receiver available".into()))
}
}
/// Check if receiver is still active
pub fn is_active(&self) -> bool {
self.receiver.is_some()
}
}
impl A2aRouter {
/// Create a new A2A router
pub fn new(agent_id: AgentId) -> Self {
Self {
agent_id,
profiles: Arc::new(RwLock::new(HashMap::new())),
queues: Arc::new(RwLock::new(HashMap::new())),
groups: Arc::new(RwLock::new(HashMap::new())),
capability_index: Arc::new(RwLock::new(HashMap::new())),
channel_size: DEFAULT_CHANNEL_SIZE,
}
}
/// Create router with custom channel size
pub fn with_channel_size(agent_id: AgentId, channel_size: usize) -> Self {
Self {
agent_id,
profiles: Arc::new(RwLock::new(HashMap::new())),
queues: Arc::new(RwLock::new(HashMap::new())),
groups: Arc::new(RwLock::new(HashMap::new())),
capability_index: Arc::new(RwLock::new(HashMap::new())),
channel_size,
}
}
/// Register an agent with the router
pub async fn register_agent(&self, profile: A2aAgentProfile) -> mpsc::Receiver<A2aEnvelope> {
let agent_id = profile.id.clone();
// Create inbox for this agent
let (tx, rx) = mpsc::channel(self.channel_size);
// Update capability index
{
let mut cap_index = self.capability_index.write().await;
for cap in &profile.capabilities {
cap_index
.entry(cap.name.clone())
.or_insert_with(Vec::new)
.push(agent_id.clone());
}
}
// Update last seen
let mut profile = profile;
profile.last_seen = current_timestamp();
// Store profile and queue
{
let mut profiles = self.profiles.write().await;
profiles.insert(agent_id.clone(), profile);
}
{
let mut queues = self.queues.write().await;
queues.insert(agent_id, tx);
}
rx
}
/// Unregister an agent
pub async fn unregister_agent(&self, agent_id: &AgentId) {
// Remove from profiles
let profile = {
let mut profiles = self.profiles.write().await;
profiles.remove(agent_id)
};
// Remove from capability index
if let Some(profile) = profile {
let mut cap_index = self.capability_index.write().await;
for cap in &profile.capabilities {
if let Some(agents) = cap_index.get_mut(&cap.name) {
agents.retain(|id| id != agent_id);
}
}
}
// Remove from all groups
{
let mut groups = self.groups.write().await;
for members in groups.values_mut() {
members.retain(|id| id != agent_id);
}
}
// Remove queue
{
let mut queues = self.queues.write().await;
queues.remove(agent_id);
}
}
/// Route a message to recipient(s)
pub async fn route(&self, envelope: A2aEnvelope) -> Result<()> {
// Check if message has expired
if envelope.is_expired() {
return Err(ZclawError::InvalidInput("Message has expired".into()));
}
let queues = self.queues.read().await;
match &envelope.to {
A2aRecipient::Direct { agent_id } => {
// Direct message to single agent
if let Some(tx) = queues.get(agent_id) {
tx.send(envelope.clone())
.await
.map_err(|e| ZclawError::Internal(format!("Failed to send message: {}", e)))?;
} else {
tracing::warn!("Agent {} not found for direct message", agent_id);
}
}
A2aRecipient::Group { group_id } => {
// Message to all agents in group
let groups = self.groups.read().await;
if let Some(members) = groups.get(group_id) {
for agent_id in members {
if let Some(tx) = queues.get(agent_id) {
match tx.try_send(envelope.clone()) {
Ok(()) => {},
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
tracing::warn!("A2A delivery to agent {} dropped: channel full", agent_id);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::warn!("A2A delivery to agent {} dropped: channel closed", agent_id);
}
}
}
}
}
}
A2aRecipient::Broadcast => {
// Broadcast to all registered agents
for (agent_id, tx) in queues.iter() {
if agent_id != &envelope.from {
match tx.try_send(envelope.clone()) {
Ok(()) => {},
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
tracing::warn!("A2A delivery to agent {} dropped: channel full", agent_id);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::warn!("A2A delivery to agent {} dropped: channel closed", agent_id);
}
}
}
}
}
}
Ok(())
}
/// Get router's agent ID
pub fn agent_id(&self) -> &AgentId {
&self.agent_id
}
/// Discover agents that have a specific capability
pub async fn discover(&self, capability: &str) -> Result<Vec<A2aAgentProfile>> {
let cap_index = self.capability_index.read().await;
let profiles = self.profiles.read().await;
match cap_index.get(capability) {
Some(agent_ids) => {
let result: Vec<A2aAgentProfile> = agent_ids.iter()
.filter_map(|id| profiles.get(id).cloned())
.collect();
Ok(result)
}
None => Ok(Vec::new()),
}
}
/// Add agent to a group (creates group if not exists)
pub async fn add_to_group(&self, group_id: &str, agent_id: AgentId) {
let mut groups = self.groups.write().await;
let members = groups.entry(group_id.to_string()).or_insert_with(Vec::new);
if !members.contains(&agent_id) {
members.push(agent_id);
}
}
/// Remove agent from a group
pub async fn remove_from_group(&self, group_id: &str, agent_id: &AgentId) {
let mut groups = self.groups.write().await;
if let Some(members) = groups.get_mut(group_id) {
members.retain(|id| id != agent_id);
}
}
/// List all groups
pub async fn list_groups(&self) -> Vec<String> {
let groups = self.groups.read().await;
groups.keys().cloned().collect()
}
/// Get members of a group
pub async fn get_group_members(&self, group_id: &str) -> Vec<AgentId> {
let groups = self.groups.read().await;
groups.get(group_id).cloned().unwrap_or_default()
}
/// Get all registered agent profiles
pub async fn list_profiles(&self) -> Vec<A2aAgentProfile> {
let profiles = self.profiles.read().await;
profiles.values().cloned().collect()
}
}
/// Basic A2A client implementation
pub struct BasicA2aClient {
/// Agent ID
agent_id: AgentId,
/// Shared router reference
router: Arc<A2aRouter>,
/// Receiver for incoming messages
receiver: Arc<tokio::sync::Mutex<Option<mpsc::Receiver<A2aEnvelope>>>>,
}
impl BasicA2aClient {
/// Create a new A2A client with shared router
pub fn new(agent_id: AgentId, router: Arc<A2aRouter>) -> Self {
Self {
agent_id,
router,
receiver: Arc::new(tokio::sync::Mutex::new(None)),
}
}
/// Initialize the client (register with router)
pub async fn initialize(&self, profile: A2aAgentProfile) -> Result<()> {
let rx = self.router.register_agent(profile).await;
let mut receiver = self.receiver.lock().await;
*receiver = Some(rx);
Ok(())
}
/// Shutdown the client
pub async fn shutdown(&self) {
self.router.unregister_agent(&self.agent_id).await;
}
}
#[async_trait]
impl A2aClient for BasicA2aClient {
async fn send(&self, envelope: A2aEnvelope) -> Result<()> {
tracing::debug!(
from = %envelope.from,
to = %envelope.to,
type = ?envelope.message_type,
"A2A send"
);
self.router.route(envelope).await
}
async fn recv(&self) -> Option<A2aEnvelope> {
let mut receiver = self.receiver.lock().await;
if let Some(ref mut rx) = *receiver {
rx.recv().await
} else {
// Wait a bit and return None if no receiver
None
}
}
fn try_recv(&self) -> Result<A2aEnvelope> {
// Use blocking lock for try_recv
let mut receiver = self.receiver
.try_lock()
.map_err(|_| ZclawError::Internal("Receiver locked".into()))?;
if let Some(ref mut rx) = *receiver {
rx.try_recv()
.map_err(|e| ZclawError::Internal(format!("Receive error: {}", e)))
} else {
Err(ZclawError::Internal("No receiver available".into()))
}
}
async fn get_profile(&self, agent_id: &AgentId) -> Result<Option<A2aAgentProfile>> {
let profiles = self.router.profiles.read().await;
Ok(profiles.get(agent_id).cloned())
}
async fn discover(&self, capability: &str) -> Result<Vec<A2aAgentProfile>> {
let cap_index = self.router.capability_index.read().await;
let profiles = self.router.profiles.read().await;
if let Some(agent_ids) = cap_index.get(capability) {
let result: Vec<A2aAgentProfile> = agent_ids
.iter()
.filter_map(|id| profiles.get(id).cloned())
.collect();
Ok(result)
} else {
Ok(Vec::new())
}
}
async fn advertise(&self, profile: A2aAgentProfile) -> Result<()> {
tracing::info!(agent_id = %profile.id, capabilities = ?profile.capabilities.len(), "A2A advertise");
self.router.register_agent(profile).await;
Ok(())
}
async fn join_group(&self, group_id: &str) -> Result<()> {
let mut groups = self.router.groups.write().await;
groups
.entry(group_id.to_string())
.or_insert_with(Vec::new)
.push(self.agent_id.clone());
tracing::info!(agent_id = %self.agent_id, group = %group_id, "A2A join group");
Ok(())
}
async fn leave_group(&self, group_id: &str) -> Result<()> {
let mut groups = self.router.groups.write().await;
if let Some(members) = groups.get_mut(group_id) {
members.retain(|id| id != &self.agent_id);
}
tracing::info!(agent_id = %self.agent_id, group = %group_id, "A2A leave group");
Ok(())
}
async fn get_group_members(&self, group_id: &str) -> Result<Vec<AgentId>> {
let groups = self.router.groups.read().await;
Ok(groups.get(group_id).cloned().unwrap_or_default())
}
async fn get_online_agents(&self) -> Result<Vec<A2aAgentProfile>> {
let profiles = self.router.profiles.read().await;
Ok(profiles.values().cloned().collect())
}
}
// Helper functions
/// Generate a UUID v4 string using cryptographically secure random
fn uuid_v4() -> String {
Uuid::new_v4().to_string()
}
/// Get current timestamp in milliseconds
fn current_timestamp() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_envelope_creation() {
let from = AgentId::new();
let to = A2aRecipient::Direct { agent_id: AgentId::new() };
let envelope = A2aEnvelope::new(
from,
to,
A2aMessageType::Request,
serde_json::json!({"action": "test"}),
);
assert!(!envelope.id.is_empty());
assert!(envelope.timestamp > 0);
assert!(envelope.conversation_id.is_none());
}
#[test]
fn test_envelope_expiry() {
let from = AgentId::new();
let to = A2aRecipient::Broadcast;
let mut envelope = A2aEnvelope::new(
from,
to,
A2aMessageType::Notification,
serde_json::json!({}),
);
envelope.ttl = 1; // 1 second
assert!(!envelope.is_expired());
// After TTL should be expired (in practice, this test might be flaky)
// We just verify the logic exists
}
#[test]
fn test_recipient_display() {
let agent_id = AgentId::new();
let direct = A2aRecipient::Direct { agent_id };
assert!(format!("{}", direct).starts_with("direct:"));
let group = A2aRecipient::Group { group_id: "teachers".to_string() };
assert_eq!(format!("{}", group), "group:teachers");
let broadcast = A2aRecipient::Broadcast;
assert_eq!(format!("{}", broadcast), "broadcast");
}
#[tokio::test]
async fn test_router_registration() {
let router = A2aRouter::new(AgentId::new());
let agent_id = AgentId::new();
let profile = A2aAgentProfile {
id: agent_id,
name: "Test Agent".to_string(),
description: "A test agent".to_string(),
capabilities: vec![A2aCapability {
name: "test".to_string(),
description: "Test capability".to_string(),
input_schema: None,
output_schema: None,
requires_approval: false,
version: "1.0.0".to_string(),
tags: vec![],
}],
protocols: vec!["a2a".to_string()],
role: "worker".to_string(),
priority: 5,
metadata: HashMap::new(),
groups: vec![],
last_seen: 0,
};
let _rx = router.register_agent(profile.clone()).await;
// Verify registration
let profiles = router.profiles.read().await;
assert!(profiles.contains_key(&agent_id));
}
#[tokio::test]
async fn test_capability_discovery() {
let router = A2aRouter::new(AgentId::new());
let agent_id = AgentId::new();
let profile = A2aAgentProfile {
id: agent_id,
name: "Test Agent".to_string(),
description: "A test agent".to_string(),
capabilities: vec![A2aCapability {
name: "code-generation".to_string(),
description: "Generate code".to_string(),
input_schema: None,
output_schema: None,
requires_approval: false,
version: "1.0.0".to_string(),
tags: vec!["coding".to_string()],
}],
protocols: vec!["a2a".to_string()],
role: "worker".to_string(),
priority: 5,
metadata: HashMap::new(),
groups: vec![],
last_seen: 0,
};
router.register_agent(profile).await;
// Check capability index
let cap_index = router.capability_index.read().await;
assert!(cap_index.contains_key("code-generation"));
}
#[tokio::test]
async fn test_direct_message_delivery() {
let router = A2aRouter::new(AgentId::new());
// Register two agents
let alice_id = AgentId::new();
let bob_id = AgentId::new();
let alice_profile = A2aAgentProfile {
id: alice_id, name: "Alice".into(), description: String::new(),
capabilities: vec![], protocols: vec!["a2a".into()], role: "worker".into(),
priority: 5, metadata: HashMap::new(), groups: vec![], last_seen: 0,
};
let bob_profile = A2aAgentProfile {
id: bob_id, name: "Bob".into(), description: String::new(),
capabilities: vec![], protocols: vec!["a2a".into()], role: "worker".into(),
priority: 5, metadata: HashMap::new(), groups: vec![], last_seen: 0,
};
let mut alice_rx = router.register_agent(alice_profile).await;
let mut bob_rx = router.register_agent(bob_profile).await;
// Alice sends direct message to Bob
let envelope = A2aEnvelope::new(
alice_id,
A2aRecipient::Direct { agent_id: bob_id },
A2aMessageType::Notification,
serde_json::json!({"msg": "hello bob"}),
);
router.route(envelope).await.unwrap();
// Bob should receive it
let received = bob_rx.recv().await.unwrap();
assert_eq!(received.from, alice_id);
assert_eq!(received.payload["msg"], "hello bob");
// Alice should NOT receive it
assert!(alice_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_broadcast_delivery() {
let router = A2aRouter::new(AgentId::new());
let alice_id = AgentId::new();
let bob_id = AgentId::new();
let carol_id = AgentId::new();
let make_profile = |id: AgentId, name: &str| A2aAgentProfile {
id, name: name.into(), description: String::new(),
capabilities: vec![], protocols: vec!["a2a".into()], role: "worker".into(),
priority: 5, metadata: HashMap::new(), groups: vec![], last_seen: 0,
};
let mut alice_rx = router.register_agent(make_profile(alice_id, "Alice")).await;
let mut bob_rx = router.register_agent(make_profile(bob_id, "Bob")).await;
let mut carol_rx = router.register_agent(make_profile(carol_id, "Carol")).await;
// Alice broadcasts
let envelope = A2aEnvelope::new(
alice_id,
A2aRecipient::Broadcast,
A2aMessageType::Notification,
serde_json::json!({"announcement": "standup in 5"}),
);
router.route(envelope).await.unwrap();
// Bob and Carol should receive, Alice should NOT (sender excluded)
let bob_msg = bob_rx.recv().await.unwrap();
assert_eq!(bob_msg.payload["announcement"], "standup in 5");
let carol_msg = carol_rx.recv().await.unwrap();
assert_eq!(carol_msg.payload["announcement"], "standup in 5");
assert!(alice_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_group_message_delivery() {
let router = A2aRouter::new(AgentId::new());
let alice_id = AgentId::new();
let bob_id = AgentId::new();
let carol_id = AgentId::new();
let make_profile = |id: AgentId, name: &str| A2aAgentProfile {
id, name: name.into(), description: String::new(),
capabilities: vec![], protocols: vec!["a2a".into()], role: "worker".into(),
priority: 5, metadata: HashMap::new(), groups: vec![], last_seen: 0,
};
let mut alice_rx = router.register_agent(make_profile(alice_id, "Alice")).await;
let mut bob_rx = router.register_agent(make_profile(bob_id, "Bob")).await;
let mut carol_rx = router.register_agent(make_profile(carol_id, "Carol")).await;
// Add Bob and Carol to "dev-team" group
router.add_to_group("dev-team", bob_id).await;
router.add_to_group("dev-team", carol_id).await;
// Alice sends to group
let envelope = A2aEnvelope::new(
alice_id,
A2aRecipient::Group { group_id: "dev-team".into() },
A2aMessageType::Notification,
serde_json::json!({"sprint": "review"}),
);
router.route(envelope).await.unwrap();
// Bob and Carol should receive
let bob_msg = bob_rx.recv().await.unwrap();
assert_eq!(bob_msg.payload["sprint"], "review");
let carol_msg = carol_rx.recv().await.unwrap();
assert_eq!(carol_msg.payload["sprint"], "review");
// Alice not in group, should NOT receive
assert!(alice_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_unregister_agent() {
let router = A2aRouter::new(AgentId::new());
let agent_id = AgentId::new();
let profile = A2aAgentProfile {
id: agent_id, name: "Temp".into(), description: String::new(),
capabilities: vec![A2aCapability {
name: "temp-work".into(), description: "temp".into(),
input_schema: None, output_schema: None, requires_approval: false,
version: "1.0.0".into(), tags: vec![],
}],
protocols: vec!["a2a".into()], role: "worker".into(),
priority: 5, metadata: HashMap::new(), groups: vec![], last_seen: 0,
};
router.register_agent(profile).await;
assert_eq!(router.list_profiles().await.len(), 1);
// Discover should find it
let found = router.discover("temp-work").await.unwrap();
assert_eq!(found.len(), 1);
// Unregister
router.unregister_agent(&agent_id).await;
assert_eq!(router.list_profiles().await.len(), 0);
// Capability index should be cleaned
let found_after = router.discover("temp-work").await.unwrap();
assert!(found_after.is_empty());
}
#[tokio::test]
async fn test_expired_message_rejected() {
let router = A2aRouter::new(AgentId::new());
let alice_id = AgentId::new();
let bob_id = AgentId::new();
let profile = |id: AgentId| A2aAgentProfile {
id, name: "Agent".into(), description: String::new(),
capabilities: vec![], protocols: vec!["a2a".into()], role: "worker".into(),
priority: 5, metadata: HashMap::new(), groups: vec![], last_seen: 0,
};
let _rx = router.register_agent(profile(alice_id)).await;
let _rx = router.register_agent(profile(bob_id)).await;
// Create envelope with already-expired TTL
let mut envelope = A2aEnvelope::new(
alice_id,
A2aRecipient::Direct { agent_id: bob_id },
A2aMessageType::Notification,
serde_json::json!({}),
);
envelope.ttl = 1; // 1 second
envelope.timestamp = 0; // Far in the past — definitely expired
let result = router.route(envelope).await;
assert!(result.is_err());
}
}