Files
zclaw_openfang/crates/zclaw-kernel/src/kernel/a2a.rs
iven 2037809196 refactor(kernel): 移除 multi-agent feature gate — 33处 cfg 全部删除 (Phase 4A)
8 个文件移除 #[cfg(feature = "multi-agent")],zclaw-kernel default features
新增 multi-agent。A2A 路由、agents、adapters 现在始终编译。
2026-04-18 08:17:58 +08:00

263 lines
9.0 KiB
Rust

//! A2A (Agent-to-Agent) messaging
use zclaw_types::{AgentId, Capability, Event, Result};
use zclaw_protocols::{A2aAgentProfile, A2aCapability, A2aEnvelope, A2aMessageType, A2aRecipient};
use super::Kernel;
impl Kernel {
// ============================================================
// A2A (Agent-to-Agent) Messaging
// ============================================================
/// Derive an A2A agent profile from an AgentConfig
pub(super) fn agent_config_to_a2a_profile(config: &zclaw_types::AgentConfig) -> A2aAgentProfile {
let caps: Vec<A2aCapability> = config.tools.iter().map(|tool_name| {
A2aCapability {
name: tool_name.clone(),
description: format!("Tool: {}", tool_name),
input_schema: None,
output_schema: None,
requires_approval: false,
version: "1.0.0".to_string(),
tags: vec![],
}
}).collect();
A2aAgentProfile {
id: config.id,
name: config.name.clone(),
description: config.description.clone().unwrap_or_default(),
capabilities: caps,
protocols: vec!["a2a".to_string()],
role: "worker".to_string(),
priority: 5,
metadata: std::collections::HashMap::new(),
groups: vec![],
last_seen: 0,
}
}
/// Check if an agent is authorized to send messages to a target
pub(super) fn check_a2a_permission(&self, from: &AgentId, to: &AgentId) -> Result<()> {
let caps = self.capabilities.get(from);
match caps {
Some(cap_set) => {
let has_permission = cap_set.capabilities.iter().any(|cap| {
match cap {
Capability::AgentMessage { pattern } => {
pattern == "*" || to.to_string().starts_with(pattern)
}
_ => false,
}
});
if !has_permission {
return Err(zclaw_types::ZclawError::PermissionDenied(
format!("Agent {} does not have AgentMessage capability for {}", from, to)
));
}
Ok(())
}
None => {
// No capabilities registered — deny by default
Err(zclaw_types::ZclawError::PermissionDenied(
format!("Agent {} has no capabilities registered", from)
))
}
}
}
/// Send a direct A2A message from one agent to another
pub async fn a2a_send(
&self,
from: &AgentId,
to: &AgentId,
payload: serde_json::Value,
message_type: Option<A2aMessageType>,
) -> Result<()> {
// Validate sender exists
self.registry.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Sender agent not found: {}", from)
))?;
// Validate receiver exists and is running
self.registry.get(to)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Target agent not found: {}", to)
))?;
// Check capability permission
self.check_a2a_permission(from, to)?;
// Build and route envelope
let envelope = A2aEnvelope::new(
*from,
A2aRecipient::Direct { agent_id: *to },
message_type.unwrap_or(A2aMessageType::Notification),
payload,
);
self.a2a_router.route(envelope).await?;
// Emit event
self.events.publish(Event::A2aMessageSent {
from: *from,
to: format!("{}", to),
message_type: "direct".to_string(),
});
Ok(())
}
/// Broadcast a message from one agent to all other agents
pub async fn a2a_broadcast(
&self,
from: &AgentId,
payload: serde_json::Value,
) -> Result<()> {
// Validate sender exists
self.registry.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Sender agent not found: {}", from)
))?;
let envelope = A2aEnvelope::new(
*from,
A2aRecipient::Broadcast,
A2aMessageType::Notification,
payload,
);
self.a2a_router.route(envelope).await?;
self.events.publish(Event::A2aMessageSent {
from: *from,
to: "broadcast".to_string(),
message_type: "broadcast".to_string(),
});
Ok(())
}
/// Discover agents that have a specific capability
pub async fn a2a_discover(&self, capability: &str) -> Result<Vec<A2aAgentProfile>> {
let result = self.a2a_router.discover(capability).await?;
self.events.publish(Event::A2aAgentDiscovered {
agent_id: AgentId::new(),
capabilities: vec![capability.to_string()],
});
Ok(result)
}
/// Try to receive a pending A2A message for an agent (non-blocking)
pub async fn a2a_receive(&self, agent_id: &AgentId) -> Result<Option<A2aEnvelope>> {
let inbox = self.a2a_inboxes.get(agent_id)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("No A2A inbox for agent: {}", agent_id)
))?;
let mut inbox = inbox.lock().await;
match inbox.try_recv() {
Ok(envelope) => {
self.events.publish(Event::A2aMessageReceived {
from: envelope.from,
to: format!("{}", agent_id),
message_type: "direct".to_string(),
});
Ok(Some(envelope))
}
Err(_) => Ok(None),
}
}
/// Delegate a task to another agent and wait for response with timeout
pub async fn a2a_delegate_task(
&self,
from: &AgentId,
to: &AgentId,
task_description: String,
timeout_ms: u64,
) -> Result<serde_json::Value> {
// Validate both agents exist
self.registry.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Sender agent not found: {}", from)
))?;
self.registry.get(to)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Target agent not found: {}", to)
))?;
// Check capability permission
self.check_a2a_permission(from, to)?;
// Send task request
let task_id = uuid::Uuid::new_v4().to_string();
let envelope = A2aEnvelope::new(
*from,
A2aRecipient::Direct { agent_id: *to },
A2aMessageType::Task,
serde_json::json!({
"task_id": task_id,
"description": task_description,
}),
).with_conversation(task_id.clone());
let envelope_id = envelope.id.clone();
self.a2a_router.route(envelope).await?;
self.events.publish(Event::A2aMessageSent {
from: *from,
to: format!("{}", to),
message_type: "task".to_string(),
});
// Wait for response with timeout
let timeout = tokio::time::Duration::from_millis(timeout_ms);
let result = tokio::time::timeout(timeout, async {
let inbox_entry = self.a2a_inboxes.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("No A2A inbox for agent: {}", from)
))?;
let mut inbox = inbox_entry.lock().await;
// Poll for matching response
loop {
match inbox.recv().await {
Some(msg) => {
// Check if this is a response to our task
if msg.message_type == A2aMessageType::Response
&& msg.reply_to.as_deref() == Some(&envelope_id) {
return Ok::<_, zclaw_types::ZclawError>(msg.payload);
}
// Not our response — requeue it for later processing
tracing::debug!("Re-queuing non-matching A2A message: {}", msg.id);
inbox.requeue(msg);
}
None => {
return Err(zclaw_types::ZclawError::Internal(
"A2A inbox channel closed".to_string()
));
}
}
}
}).await;
match result {
Ok(Ok(payload)) => Ok(payload),
Ok(Err(e)) => Err(e),
Err(_) => Err(zclaw_types::ZclawError::Timeout(
format!("A2A task delegation timed out after {}ms", timeout_ms)
)),
}
}
/// Get all online agents via A2A profiles
pub async fn a2a_get_online_agents(&self) -> Result<Vec<A2aAgentProfile>> {
Ok(self.a2a_router.list_profiles().await)
}
}