//! A2A (Agent-to-Agent) messaging //! //! All items in this module are gated by the `multi-agent` feature flag. #[cfg(feature = "multi-agent")] use zclaw_types::{AgentId, Capability, Event, Result}; #[cfg(feature = "multi-agent")] use zclaw_protocols::{A2aAgentProfile, A2aCapability, A2aEnvelope, A2aMessageType, A2aRecipient}; #[cfg(feature = "multi-agent")] use super::Kernel; #[cfg(feature = "multi-agent")] impl Kernel { // ============================================================ // A2A (Agent-to-Agent) Messaging // ============================================================ /// Derive an A2A agent profile from an AgentConfig pub(super) fn agent_config_to_a2a_profile(config: &zclaw_types::AgentConfig) -> A2aAgentProfile { let caps: Vec = config.tools.iter().map(|tool_name| { A2aCapability { name: tool_name.clone(), description: format!("Tool: {}", tool_name), input_schema: None, output_schema: None, requires_approval: false, version: "1.0.0".to_string(), tags: vec![], } }).collect(); A2aAgentProfile { id: config.id, name: config.name.clone(), description: config.description.clone().unwrap_or_default(), capabilities: caps, protocols: vec!["a2a".to_string()], role: "worker".to_string(), priority: 5, metadata: std::collections::HashMap::new(), groups: vec![], last_seen: 0, } } /// Check if an agent is authorized to send messages to a target pub(super) fn check_a2a_permission(&self, from: &AgentId, to: &AgentId) -> Result<()> { let caps = self.capabilities.get(from); match caps { Some(cap_set) => { let has_permission = cap_set.capabilities.iter().any(|cap| { match cap { Capability::AgentMessage { pattern } => { pattern == "*" || to.to_string().starts_with(pattern) } _ => false, } }); if !has_permission { return Err(zclaw_types::ZclawError::PermissionDenied( format!("Agent {} does not have AgentMessage capability for {}", from, to) )); } Ok(()) } None => { // No capabilities registered — deny by default Err(zclaw_types::ZclawError::PermissionDenied( format!("Agent {} has no capabilities registered", from) )) } } } /// Send a direct A2A message from one agent to another pub async fn a2a_send( &self, from: &AgentId, to: &AgentId, payload: serde_json::Value, message_type: Option, ) -> Result<()> { // Validate sender exists self.registry.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Sender agent not found: {}", from) ))?; // Validate receiver exists and is running self.registry.get(to) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Target agent not found: {}", to) ))?; // Check capability permission self.check_a2a_permission(from, to)?; // Build and route envelope let envelope = A2aEnvelope::new( *from, A2aRecipient::Direct { agent_id: *to }, message_type.unwrap_or(A2aMessageType::Notification), payload, ); self.a2a_router.route(envelope).await?; // Emit event self.events.publish(Event::A2aMessageSent { from: *from, to: format!("{}", to), message_type: "direct".to_string(), }); Ok(()) } /// Broadcast a message from one agent to all other agents pub async fn a2a_broadcast( &self, from: &AgentId, payload: serde_json::Value, ) -> Result<()> { // Validate sender exists self.registry.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Sender agent not found: {}", from) ))?; let envelope = A2aEnvelope::new( *from, A2aRecipient::Broadcast, A2aMessageType::Notification, payload, ); self.a2a_router.route(envelope).await?; self.events.publish(Event::A2aMessageSent { from: *from, to: "broadcast".to_string(), message_type: "broadcast".to_string(), }); Ok(()) } /// Discover agents that have a specific capability pub async fn a2a_discover(&self, capability: &str) -> Result> { let result = self.a2a_router.discover(capability).await?; self.events.publish(Event::A2aAgentDiscovered { agent_id: AgentId::new(), capabilities: vec![capability.to_string()], }); Ok(result) } /// Try to receive a pending A2A message for an agent (non-blocking) pub async fn a2a_receive(&self, agent_id: &AgentId) -> Result> { let inbox = self.a2a_inboxes.get(agent_id) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("No A2A inbox for agent: {}", agent_id) ))?; let mut inbox = inbox.lock().await; match inbox.try_recv() { Ok(envelope) => { self.events.publish(Event::A2aMessageReceived { from: envelope.from, to: format!("{}", agent_id), message_type: "direct".to_string(), }); Ok(Some(envelope)) } Err(_) => Ok(None), } } /// Delegate a task to another agent and wait for response with timeout pub async fn a2a_delegate_task( &self, from: &AgentId, to: &AgentId, task_description: String, timeout_ms: u64, ) -> Result { // Validate both agents exist self.registry.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Sender agent not found: {}", from) ))?; self.registry.get(to) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Target agent not found: {}", to) ))?; // Check capability permission self.check_a2a_permission(from, to)?; // Send task request let task_id = uuid::Uuid::new_v4().to_string(); let envelope = A2aEnvelope::new( *from, A2aRecipient::Direct { agent_id: *to }, A2aMessageType::Task, serde_json::json!({ "task_id": task_id, "description": task_description, }), ).with_conversation(task_id.clone()); let envelope_id = envelope.id.clone(); self.a2a_router.route(envelope).await?; self.events.publish(Event::A2aMessageSent { from: *from, to: format!("{}", to), message_type: "task".to_string(), }); // Wait for response with timeout let timeout = tokio::time::Duration::from_millis(timeout_ms); let result = tokio::time::timeout(timeout, async { let inbox_entry = self.a2a_inboxes.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("No A2A inbox for agent: {}", from) ))?; let mut inbox = inbox_entry.lock().await; // Poll for matching response loop { match inbox.recv().await { Some(msg) => { // Check if this is a response to our task if msg.message_type == A2aMessageType::Response && msg.reply_to.as_deref() == Some(&envelope_id) { return Ok::<_, zclaw_types::ZclawError>(msg.payload); } // Not our response — requeue it for later processing tracing::debug!("Re-queuing non-matching A2A message: {}", msg.id); inbox.requeue(msg); } None => { return Err(zclaw_types::ZclawError::Internal( "A2A inbox channel closed".to_string() )); } } } }).await; match result { Ok(Ok(payload)) => Ok(payload), Ok(Err(e)) => Err(e), Err(_) => Err(zclaw_types::ZclawError::Timeout( format!("A2A task delegation timed out after {}ms", timeout_ms) )), } } /// Get all online agents via A2A profiles pub async fn a2a_get_online_agents(&self) -> Result> { Ok(self.a2a_router.list_profiles().await) } }