Files
zclaw_openfang/crates/zclaw-kernel/src/kernel.rs
iven 6040d98b18 fix(kernel): message_count 始终为 0 的 bug
- AgentRegistry 新增 message_counts: DashMap<AgentId, u64> 跟踪字段
- 添加 increment_message_count() 方法
- Kernel.send_message() 和 send_message_stream() 中递增计数
- get_info() 返回实际计数值而非硬编码 0
2026-03-30 00:04:55 +08:00

1352 lines
51 KiB
Rust

//! Kernel - central coordinator
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, Mutex};
use zclaw_types::{AgentConfig, AgentId, AgentInfo, Capability, Event, Result, HandRun, HandRunId, HandRunStatus, HandRunFilter, TriggerSource};
#[cfg(feature = "multi-agent")]
use zclaw_protocols::{A2aRouter, A2aAgentProfile, A2aCapability, A2aEnvelope, A2aMessageType, A2aRecipient};
use async_trait::async_trait;
use serde_json::Value;
use crate::registry::AgentRegistry;
use crate::capabilities::CapabilityManager;
use crate::events::EventBus;
use crate::config::KernelConfig;
use zclaw_memory::MemoryStore;
use zclaw_runtime::{AgentLoop, LlmDriver, ToolRegistry, tool::SkillExecutor, tool::builtin::PathValidator};
use zclaw_skills::SkillRegistry;
use zclaw_skills::LlmCompleter;
use zclaw_hands::{HandRegistry, HandContext, HandResult, hands::{BrowserHand, SlideshowHand, SpeechHand, QuizHand, WhiteboardHand, ResearcherHand, CollectorHand, ClipHand, TwitterHand, quiz::LlmQuizGenerator}};
/// Adapter that bridges `zclaw_runtime::LlmDriver` → `zclaw_skills::LlmCompleter`
struct LlmDriverAdapter {
driver: Arc<dyn LlmDriver>,
max_tokens: u32,
temperature: f32,
}
impl zclaw_skills::LlmCompleter for LlmDriverAdapter {
fn complete(
&self,
prompt: &str,
) -> Pin<Box<dyn std::future::Future<Output = std::result::Result<String, String>> + Send + '_>> {
let driver = self.driver.clone();
let prompt = prompt.to_string();
Box::pin(async move {
let request = zclaw_runtime::CompletionRequest {
messages: vec![zclaw_types::Message::user(prompt)],
max_tokens: Some(self.max_tokens),
temperature: Some(self.temperature),
..Default::default()
};
let response = driver.complete(request).await
.map_err(|e| format!("LLM completion error: {}", e))?;
// Extract text from content blocks
let text: String = response.content.iter()
.filter_map(|block| match block {
zclaw_runtime::ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("");
Ok(text)
})
}
}
/// Skill executor implementation for Kernel
pub struct KernelSkillExecutor {
skills: Arc<SkillRegistry>,
llm: Arc<dyn LlmCompleter>,
}
impl KernelSkillExecutor {
pub fn new(skills: Arc<SkillRegistry>, driver: Arc<dyn LlmDriver>) -> Self {
let llm: Arc<dyn zclaw_skills::LlmCompleter> = Arc::new(LlmDriverAdapter { driver, max_tokens: 4096, temperature: 0.7 });
Self { skills, llm }
}
}
#[async_trait]
impl SkillExecutor for KernelSkillExecutor {
async fn execute_skill(
&self,
skill_id: &str,
agent_id: &str,
session_id: &str,
input: Value,
) -> Result<Value> {
let context = zclaw_skills::SkillContext {
agent_id: agent_id.to_string(),
session_id: session_id.to_string(),
llm: Some(self.llm.clone()),
..Default::default()
};
let result = self.skills.execute(&zclaw_types::SkillId::new(skill_id), &context, input).await?;
Ok(result.output)
}
fn get_skill_detail(&self, skill_id: &str) -> Option<zclaw_runtime::tool::SkillDetail> {
let manifests = self.skills.manifests_snapshot();
let manifest = manifests.get(&zclaw_types::SkillId::new(skill_id))?;
Some(zclaw_runtime::tool::SkillDetail {
id: manifest.id.as_str().to_string(),
name: manifest.name.clone(),
description: manifest.description.clone(),
category: manifest.category.clone(),
input_schema: manifest.input_schema.clone(),
triggers: manifest.triggers.clone(),
capabilities: manifest.capabilities.clone(),
})
}
fn list_skill_index(&self) -> Vec<zclaw_runtime::tool::SkillIndexEntry> {
let manifests = self.skills.manifests_snapshot();
manifests.values()
.filter(|m| m.enabled)
.map(|m| zclaw_runtime::tool::SkillIndexEntry {
id: m.id.as_str().to_string(),
description: m.description.clone(),
triggers: m.triggers.clone(),
})
.collect()
}
}
/// The ZCLAW Kernel
pub struct Kernel {
config: KernelConfig,
registry: AgentRegistry,
capabilities: CapabilityManager,
events: EventBus,
memory: Arc<MemoryStore>,
driver: Arc<dyn LlmDriver>,
llm_completer: Arc<dyn zclaw_skills::LlmCompleter>,
skills: Arc<SkillRegistry>,
skill_executor: Arc<KernelSkillExecutor>,
hands: Arc<HandRegistry>,
trigger_manager: crate::trigger_manager::TriggerManager,
pending_approvals: Arc<Mutex<Vec<ApprovalEntry>>>,
/// Running hand runs that can be cancelled (run_id -> cancelled flag)
running_hand_runs: Arc<dashmap::DashMap<HandRunId, Arc<std::sync::atomic::AtomicBool>>>,
/// Shared memory storage backend for Growth system
viking: Arc<zclaw_runtime::VikingAdapter>,
/// A2A router for inter-agent messaging (gated by multi-agent feature)
#[cfg(feature = "multi-agent")]
a2a_router: Arc<A2aRouter>,
/// Per-agent A2A inbox receivers
#[cfg(feature = "multi-agent")]
a2a_inboxes: Arc<dashmap::DashMap<AgentId, Arc<Mutex<mpsc::Receiver<A2aEnvelope>>>>>,
}
impl Kernel {
/// Boot the kernel with the given configuration
pub async fn boot(config: KernelConfig) -> Result<Self> {
// Initialize memory store
let memory = Arc::new(MemoryStore::new(&config.database_url).await?);
// Initialize driver based on config
let driver = config.create_driver()?;
// Initialize subsystems
let registry = AgentRegistry::new();
let capabilities = CapabilityManager::new();
let events = EventBus::new();
// Initialize skill registry
let skills = Arc::new(SkillRegistry::new());
// Scan skills directory if configured
if let Some(ref skills_dir) = config.skills_dir {
if skills_dir.exists() {
skills.add_skill_dir(skills_dir.clone()).await?;
}
}
// Initialize hand registry with built-in hands
let hands = Arc::new(HandRegistry::new());
let quiz_model = config.model().to_string();
let quiz_generator = Arc::new(LlmQuizGenerator::new(driver.clone(), quiz_model));
hands.register(Arc::new(BrowserHand::new())).await;
hands.register(Arc::new(SlideshowHand::new())).await;
hands.register(Arc::new(SpeechHand::new())).await;
hands.register(Arc::new(QuizHand::with_generator(quiz_generator))).await;
hands.register(Arc::new(WhiteboardHand::new())).await;
hands.register(Arc::new(ResearcherHand::new())).await;
hands.register(Arc::new(CollectorHand::new())).await;
hands.register(Arc::new(ClipHand::new())).await;
hands.register(Arc::new(TwitterHand::new())).await;
// Create skill executor
let skill_executor = Arc::new(KernelSkillExecutor::new(skills.clone(), driver.clone()));
// Create LLM completer for skill system (shared with skill_executor)
let llm_completer: Arc<dyn zclaw_skills::LlmCompleter> =
Arc::new(LlmDriverAdapter {
driver: driver.clone(),
max_tokens: config.max_tokens(),
temperature: config.temperature(),
});
// Initialize trigger manager
let trigger_manager = crate::trigger_manager::TriggerManager::new(hands.clone());
// Initialize Growth system — shared VikingAdapter for memory storage
let viking = Arc::new(zclaw_runtime::VikingAdapter::in_memory());
// Restore persisted agents
let persisted = memory.list_agents().await?;
for agent in persisted {
registry.register(agent);
}
// Initialize A2A router for multi-agent support
#[cfg(feature = "multi-agent")]
let a2a_router = {
let kernel_agent_id = AgentId::new();
Arc::new(A2aRouter::new(kernel_agent_id))
};
Ok(Self {
config,
registry,
capabilities,
events,
memory,
driver,
llm_completer,
skills,
skill_executor,
hands,
trigger_manager,
pending_approvals: Arc::new(Mutex::new(Vec::new())),
running_hand_runs: Arc::new(dashmap::DashMap::new()),
viking,
#[cfg(feature = "multi-agent")]
a2a_router,
#[cfg(feature = "multi-agent")]
a2a_inboxes: Arc::new(dashmap::DashMap::new()),
})
}
/// Create a tool registry with built-in tools
fn create_tool_registry(&self) -> ToolRegistry {
let mut tools = ToolRegistry::new();
zclaw_runtime::tool::builtin::register_builtin_tools(&mut tools);
tools
}
/// Create the middleware chain for the agent loop.
///
/// When middleware is configured, cross-cutting concerns (compaction, loop guard,
/// token calibration, etc.) are delegated to the chain. When no middleware is
/// registered, the legacy inline path in `AgentLoop` is used instead.
fn create_middleware_chain(&self) -> Option<zclaw_runtime::middleware::MiddlewareChain> {
let mut chain = zclaw_runtime::middleware::MiddlewareChain::new();
// Growth integration — shared VikingAdapter for memory middleware & compaction
let growth = zclaw_runtime::GrowthIntegration::new(self.viking.clone());
// Compaction middleware — only register when threshold > 0
let threshold = self.config.compaction_threshold();
if threshold > 0 {
use std::sync::Arc;
let growth_for_compaction = zclaw_runtime::GrowthIntegration::new(self.viking.clone());
let mw = zclaw_runtime::middleware::compaction::CompactionMiddleware::new(
threshold,
zclaw_runtime::CompactionConfig::default(),
Some(self.driver.clone()),
Some(growth_for_compaction),
);
chain.register(Arc::new(mw));
}
// Memory middleware — auto-extract memories after conversations
{
use std::sync::Arc;
let mw = zclaw_runtime::middleware::memory::MemoryMiddleware::new(growth);
chain.register(Arc::new(mw));
}
// Loop guard middleware
{
use std::sync::Arc;
let mw = zclaw_runtime::middleware::loop_guard::LoopGuardMiddleware::with_defaults();
chain.register(Arc::new(mw));
}
// Token calibration middleware
{
use std::sync::Arc;
let mw = zclaw_runtime::middleware::token_calibration::TokenCalibrationMiddleware::new();
chain.register(Arc::new(mw));
}
// Skill index middleware — inject lightweight index instead of full descriptions
{
use std::sync::Arc;
let entries = self.skill_executor.list_skill_index();
if !entries.is_empty() {
let mw = zclaw_runtime::middleware::skill_index::SkillIndexMiddleware::new(entries);
chain.register(Arc::new(mw));
}
}
// Guardrail middleware — safety rules for tool calls
{
use std::sync::Arc;
let mw = zclaw_runtime::middleware::guardrail::GuardrailMiddleware::new(true)
.with_builtin_rules();
chain.register(Arc::new(mw));
}
// Only return Some if we actually registered middleware
if chain.is_empty() {
None
} else {
tracing::info!("[Kernel] Middleware chain created with {} middlewares", chain.len());
Some(chain)
}
}
/// Build a system prompt with skill information injected
async fn build_system_prompt_with_skills(&self, base_prompt: Option<&String>) -> String {
// Get skill list asynchronously
let skills = self.skills.list().await;
let mut prompt = base_prompt
.map(|p| p.clone())
.unwrap_or_else(|| "You are a helpful AI assistant.".to_string());
// Inject skill information with categories
if !skills.is_empty() {
prompt.push_str("\n\n## Available Skills\n\n");
prompt.push_str("You have access to specialized skills. Analyze user intent and autonomously call `execute_skill` with the appropriate skill_id.\n\n");
// Group skills by category based on their ID patterns
let categories = self.categorize_skills(&skills);
for (category, category_skills) in categories {
prompt.push_str(&format!("### {}\n", category));
for skill in category_skills {
prompt.push_str(&format!(
"- **{}**: {}",
skill.id.as_str(),
skill.description
));
prompt.push('\n');
}
prompt.push('\n');
}
prompt.push_str("### When to use skills:\n");
prompt.push_str("- **IMPORTANT**: You should autonomously decide when to use skills based on your understanding of the user's intent.\n");
prompt.push_str("- Do not wait for explicit skill names - recognize the need and act.\n");
prompt.push_str("- Match user's request to the most appropriate skill's domain.\n");
prompt.push_str("- If multiple skills could apply, choose the most specialized one.\n\n");
prompt.push_str("### Example:\n");
prompt.push_str("User: \"分析腾讯财报\" → Intent: Financial analysis → Call: execute_skill(\"finance-tracker\", {...})\n");
}
prompt
}
/// Categorize skills into logical groups
///
/// Priority:
/// 1. Use skill's `category` field if defined in SKILL.md
/// 2. Fall back to pattern matching for backward compatibility
fn categorize_skills<'a>(&self, skills: &'a [zclaw_skills::SkillManifest]) -> Vec<(String, Vec<&'a zclaw_skills::SkillManifest>)> {
let mut categories: std::collections::HashMap<String, Vec<&zclaw_skills::SkillManifest>> = std::collections::HashMap::new();
// Fallback category patterns for skills without explicit category
let fallback_patterns = [
("开发工程", vec!["senior-developer", "frontend-developer", "backend-architect", "ai-engineer", "devops-automator", "rapid-prototyper", "lsp-index-engineer"]),
("测试质量", vec!["api-tester", "evidence-collector", "reality-checker", "performance-benchmarker", "test-results-analyzer", "accessibility-auditor", "code-review"]),
("安全合规", vec!["security-engineer", "legal-compliance-checker", "agentic-identity-trust"]),
("数据分析", vec!["analytics-reporter", "finance-tracker", "data-analysis", "sales-data-extraction-agent", "data-consolidation-agent", "report-distribution-agent"]),
("项目管理", vec!["senior-pm", "project-shepherd", "sprint-prioritizer", "experiment-tracker", "feedback-synthesizer", "trend-researcher", "agents-orchestrator"]),
("设计UX", vec!["ui-designer", "ux-architect", "ux-researcher", "visual-storyteller", "image-prompt-engineer", "whimsy-injector", "brand-guardian"]),
("内容营销", vec!["content-creator", "chinese-writing", "executive-summary-generator", "social-media-strategist"]),
("社交平台", vec!["twitter-engager", "instagram-curator", "tiktok-strategist", "reddit-community-builder", "zhihu-strategist", "xiaohongshu-specialist", "wechat-official-account", "growth-hacker", "app-store-optimizer"]),
("运营支持", vec!["studio-operations", "studio-producer", "support-responder", "workflow-optimizer", "infrastructure-maintainer", "tool-evaluator"]),
("XR/空间计算", vec!["visionos-spatial-engineer", "macos-spatial-metal-engineer", "xr-immersive-developer", "xr-interface-architect", "xr-cockpit-interaction-specialist", "terminal-integration-specialist"]),
("基础工具", vec!["web-search", "file-operations", "shell-command", "git", "translation", "feishu-docs"]),
];
// Categorize each skill
for skill in skills {
// Priority 1: Use skill's explicit category
if let Some(ref category) = skill.category {
if !category.is_empty() {
categories.entry(category.clone()).or_default().push(skill);
continue;
}
}
// Priority 2: Fallback to pattern matching
let skill_id = skill.id.as_str();
let mut categorized = false;
for (category, patterns) in &fallback_patterns {
if patterns.iter().any(|p| skill_id.contains(p) || *p == skill_id) {
categories.entry(category.to_string()).or_default().push(skill);
categorized = true;
break;
}
}
// Put uncategorized skills in "其他"
if !categorized {
categories.entry("其他".to_string()).or_default().push(skill);
}
}
// Convert to ordered vector
let mut result: Vec<(String, Vec<_>)> = categories.into_iter().collect();
result.sort_by(|a, b| {
// Sort by predefined order
let order = ["开发工程", "测试质量", "安全合规", "数据分析", "项目管理", "设计UX", "内容营销", "社交平台", "运营支持", "XR/空间计算", "基础工具", "其他"];
let a_idx = order.iter().position(|&x| x == a.0).unwrap_or(99);
let b_idx = order.iter().position(|&x| x == b.0).unwrap_or(99);
a_idx.cmp(&b_idx)
});
result
}
/// Spawn a new agent
pub async fn spawn_agent(&self, config: AgentConfig) -> Result<AgentId> {
let id = config.id;
// Validate capabilities
self.capabilities.validate(&config.capabilities)?;
// Register in memory
self.memory.save_agent(&config).await?;
// Register in registry
let config_clone = config.clone();
self.registry.register(config);
// Register with A2A router for multi-agent messaging
#[cfg(feature = "multi-agent")]
{
let profile = Self::agent_config_to_a2a_profile(&config_clone);
let rx = self.a2a_router.register_agent(profile).await;
self.a2a_inboxes.insert(id, Arc::new(Mutex::new(rx)));
}
// Emit event
self.events.publish(Event::AgentSpawned {
agent_id: id,
name: self.registry.get(&id).map(|a| a.name.clone()).unwrap_or_default(),
});
Ok(id)
}
/// Kill an agent
pub async fn kill_agent(&self, id: &AgentId) -> Result<()> {
// Remove from registry
self.registry.unregister(id);
// Remove from memory
self.memory.delete_agent(id).await?;
// Unregister from A2A router
#[cfg(feature = "multi-agent")]
{
self.a2a_router.unregister_agent(id).await;
self.a2a_inboxes.remove(id);
}
// Emit event
self.events.publish(Event::AgentTerminated {
agent_id: *id,
reason: "killed".to_string(),
});
Ok(())
}
/// List all agents
pub fn list_agents(&self) -> Vec<AgentInfo> {
self.registry.list()
}
/// Get agent info
pub fn get_agent(&self, id: &AgentId) -> Option<AgentInfo> {
self.registry.get_info(id)
}
/// Send a message to an agent
pub async fn send_message(&self, agent_id: &AgentId, message: String) -> Result<MessageResponse> {
let agent_config = self.registry.get(agent_id)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Agent not found: {}", agent_id)))?;
// Create or get session
let session_id = self.memory.create_session(agent_id).await?;
// Always use Kernel's current model configuration
// This ensures user's "模型与 API" settings are respected
let model = self.config.model().to_string();
// Create agent loop with model configuration
let tools = self.create_tool_registry();
let mut loop_runner = AgentLoop::new(
*agent_id,
self.driver.clone(),
tools,
self.memory.clone(),
)
.with_model(&model)
.with_skill_executor(self.skill_executor.clone())
.with_max_tokens(agent_config.max_tokens.unwrap_or_else(|| self.config.max_tokens()))
.with_temperature(agent_config.temperature.unwrap_or_else(|| self.config.temperature()))
.with_compaction_threshold(
agent_config.compaction_threshold
.map(|t| t as usize)
.unwrap_or_else(|| self.config.compaction_threshold()),
);
// Set path validator from agent's workspace directory (if configured)
if let Some(ref workspace) = agent_config.workspace {
let path_validator = PathValidator::new().with_workspace(workspace.clone());
tracing::info!(
"[Kernel] Setting path_validator with workspace: {} for agent {}",
workspace.display(),
agent_id
);
loop_runner = loop_runner.with_path_validator(path_validator);
}
// Inject middleware chain if available
if let Some(chain) = self.create_middleware_chain() {
loop_runner = loop_runner.with_middleware_chain(chain);
}
// Build system prompt with skill information injected
let system_prompt = self.build_system_prompt_with_skills(agent_config.system_prompt.as_ref()).await;
let loop_runner = loop_runner.with_system_prompt(&system_prompt);
// Run the loop
let result = loop_runner.run(session_id, message).await?;
// Track message count
self.registry.increment_message_count(agent_id);
Ok(MessageResponse {
content: result.response,
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
})
}
/// Send a message with streaming
pub async fn send_message_stream(
&self,
agent_id: &AgentId,
message: String,
) -> Result<mpsc::Receiver<zclaw_runtime::LoopEvent>> {
self.send_message_stream_with_prompt(agent_id, message, None, None).await
}
/// Send a message with streaming, optional system prompt, and optional session reuse
pub async fn send_message_stream_with_prompt(
&self,
agent_id: &AgentId,
message: String,
system_prompt_override: Option<String>,
session_id_override: Option<zclaw_types::SessionId>,
) -> Result<mpsc::Receiver<zclaw_runtime::LoopEvent>> {
let agent_config = self.registry.get(agent_id)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Agent not found: {}", agent_id)))?;
// Reuse existing session or create new one
let session_id = match session_id_override {
Some(id) => {
// Verify the session exists; if not, create a new one
let existing = self.memory.get_messages(&id).await;
match existing {
Ok(msgs) if !msgs.is_empty() => id,
_ => {
tracing::debug!("Session {} not found or empty, creating new session", id);
self.memory.create_session(agent_id).await?
}
}
}
None => self.memory.create_session(agent_id).await?,
};
// Always use Kernel's current model configuration
// This ensures user's "模型与 API" settings are respected
let model = self.config.model().to_string();
// Create agent loop with model configuration
let tools = self.create_tool_registry();
let mut loop_runner = AgentLoop::new(
*agent_id,
self.driver.clone(),
tools,
self.memory.clone(),
)
.with_model(&model)
.with_skill_executor(self.skill_executor.clone())
.with_max_tokens(agent_config.max_tokens.unwrap_or_else(|| self.config.max_tokens()))
.with_temperature(agent_config.temperature.unwrap_or_else(|| self.config.temperature()))
.with_compaction_threshold(
agent_config.compaction_threshold
.map(|t| t as usize)
.unwrap_or_else(|| self.config.compaction_threshold()),
);
// Set path validator from agent's workspace directory (if configured)
// This enables file_read / file_write tools to access the workspace
if let Some(ref workspace) = agent_config.workspace {
let path_validator = PathValidator::new().with_workspace(workspace.clone());
tracing::info!(
"[Kernel] Setting path_validator with workspace: {} for agent {}",
workspace.display(),
agent_id
);
loop_runner = loop_runner.with_path_validator(path_validator);
}
// Inject middleware chain if available
if let Some(chain) = self.create_middleware_chain() {
loop_runner = loop_runner.with_middleware_chain(chain);
}
// Use external prompt if provided, otherwise build default
let system_prompt = match system_prompt_override {
Some(prompt) => prompt,
None => self.build_system_prompt_with_skills(agent_config.system_prompt.as_ref()).await,
};
let loop_runner = loop_runner.with_system_prompt(&system_prompt);
// Run with streaming
self.registry.increment_message_count(agent_id);
loop_runner.run_streaming(session_id, message).await
}
/// Subscribe to events
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.events.subscribe()
}
/// Shutdown the kernel
pub async fn shutdown(&self) -> Result<()> {
self.events.publish(Event::KernelShutdown);
Ok(())
}
/// Get the kernel configuration
pub fn config(&self) -> &KernelConfig {
&self.config
}
/// Get the LLM driver
pub fn driver(&self) -> Arc<dyn LlmDriver> {
self.driver.clone()
}
/// Get the skills registry
pub fn skills(&self) -> &Arc<SkillRegistry> {
&self.skills
}
/// List all discovered skills
pub async fn list_skills(&self) -> Vec<zclaw_skills::SkillManifest> {
self.skills.list().await
}
/// Refresh skills from a directory
pub async fn refresh_skills(&self, dir: Option<std::path::PathBuf>) -> Result<()> {
if let Some(path) = dir {
self.skills.add_skill_dir(path).await?;
} else if let Some(ref skills_dir) = self.config.skills_dir {
self.skills.add_skill_dir(skills_dir.clone()).await?;
}
Ok(())
}
/// Execute a skill with the given ID and input
pub async fn execute_skill(
&self,
id: &str,
context: zclaw_skills::SkillContext,
input: serde_json::Value,
) -> Result<zclaw_skills::SkillResult> {
// Inject LLM completer into context for PromptOnly skills
let mut ctx = context;
if ctx.llm.is_none() {
ctx.llm = Some(self.llm_completer.clone());
}
self.skills.execute(&zclaw_types::SkillId::new(id), &ctx, input).await
}
/// Get the hands registry
pub fn hands(&self) -> &Arc<HandRegistry> {
&self.hands
}
/// List all registered hands
pub async fn list_hands(&self) -> Vec<zclaw_hands::HandConfig> {
self.hands.list().await
}
/// Execute a hand with the given input, tracking the run
pub async fn execute_hand(
&self,
hand_id: &str,
input: serde_json::Value,
) -> Result<(HandResult, HandRunId)> {
let run_id = HandRunId::new();
let now = chrono::Utc::now().to_rfc3339();
// Create the initial HandRun record
let mut run = HandRun {
id: run_id,
hand_name: hand_id.to_string(),
trigger_source: TriggerSource::Manual,
params: input.clone(),
status: HandRunStatus::Pending,
result: None,
error: None,
duration_ms: None,
created_at: now.clone(),
started_at: None,
completed_at: None,
};
self.memory.save_hand_run(&run).await?;
// Transition to Running
run.status = HandRunStatus::Running;
run.started_at = Some(chrono::Utc::now().to_rfc3339());
self.memory.update_hand_run(&run).await?;
// Register cancellation flag
let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
self.running_hand_runs.insert(run_id, cancel_flag.clone());
// Execute the hand
let context = HandContext::default();
let start = std::time::Instant::now();
let hand_result = self.hands.execute(hand_id, &context, input).await;
let duration = start.elapsed();
// Check if cancelled during execution
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
let mut run_update = run.clone();
run_update.status = HandRunStatus::Cancelled;
run_update.completed_at = Some(chrono::Utc::now().to_rfc3339());
run_update.duration_ms = Some(duration.as_millis() as u64);
self.memory.update_hand_run(&run_update).await?;
self.running_hand_runs.remove(&run_id);
return Err(zclaw_types::ZclawError::Internal("Hand execution cancelled".to_string()));
}
// Remove from running map
self.running_hand_runs.remove(&run_id);
// Update HandRun with result
let completed_at = chrono::Utc::now().to_rfc3339();
match &hand_result {
Ok(res) => {
run.status = HandRunStatus::Completed;
run.result = Some(res.output.clone());
run.error = res.error.clone();
}
Err(e) => {
run.status = HandRunStatus::Failed;
run.error = Some(e.to_string());
}
}
run.duration_ms = Some(duration.as_millis() as u64);
run.completed_at = Some(completed_at);
self.memory.update_hand_run(&run).await?;
hand_result.map(|res| (res, run_id))
}
/// Execute a hand with a specific trigger source (for scheduled/event triggers)
pub async fn execute_hand_with_source(
&self,
hand_id: &str,
input: serde_json::Value,
trigger_source: TriggerSource,
) -> Result<(HandResult, HandRunId)> {
let run_id = HandRunId::new();
let now = chrono::Utc::now().to_rfc3339();
let mut run = HandRun {
id: run_id,
hand_name: hand_id.to_string(),
trigger_source,
params: input.clone(),
status: HandRunStatus::Pending,
result: None,
error: None,
duration_ms: None,
created_at: now,
started_at: None,
completed_at: None,
};
self.memory.save_hand_run(&run).await?;
run.status = HandRunStatus::Running;
run.started_at = Some(chrono::Utc::now().to_rfc3339());
self.memory.update_hand_run(&run).await?;
let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
self.running_hand_runs.insert(run_id, cancel_flag.clone());
let context = HandContext::default();
let start = std::time::Instant::now();
let hand_result = self.hands.execute(hand_id, &context, input).await;
let duration = start.elapsed();
// Check if cancelled during execution
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
run.status = HandRunStatus::Cancelled;
run.completed_at = Some(chrono::Utc::now().to_rfc3339());
run.duration_ms = Some(duration.as_millis() as u64);
self.memory.update_hand_run(&run).await?;
self.running_hand_runs.remove(&run_id);
return Err(zclaw_types::ZclawError::Internal("Hand execution cancelled".to_string()));
}
self.running_hand_runs.remove(&run_id);
let completed_at = chrono::Utc::now().to_rfc3339();
match &hand_result {
Ok(res) => {
run.status = HandRunStatus::Completed;
run.result = Some(res.output.clone());
run.error = res.error.clone();
}
Err(e) => {
run.status = HandRunStatus::Failed;
run.error = Some(e.to_string());
}
}
run.duration_ms = Some(duration.as_millis() as u64);
run.completed_at = Some(completed_at);
self.memory.update_hand_run(&run).await?;
hand_result.map(|res| (res, run_id))
}
// ============================================================
// Hand Run Tracking
// ============================================================
/// Get a hand run by ID
pub async fn get_hand_run(&self, id: &HandRunId) -> Result<Option<HandRun>> {
self.memory.get_hand_run(id).await
}
/// List hand runs with filter
pub async fn list_hand_runs(&self, filter: &HandRunFilter) -> Result<Vec<HandRun>> {
self.memory.list_hand_runs(filter).await
}
/// Count hand runs matching filter
pub async fn count_hand_runs(&self, filter: &HandRunFilter) -> Result<u32> {
self.memory.count_hand_runs(filter).await
}
/// Cancel a running hand execution
pub async fn cancel_hand_run(&self, id: &HandRunId) -> Result<()> {
if let Some((_, flag)) = self.running_hand_runs.remove(id) {
flag.store(true, std::sync::atomic::Ordering::Relaxed);
// Note: the actual status update happens in execute_hand_with_source
// when it detects the cancel flag
Ok(())
} else {
// Not currently running — check if exists at all
let run = self.memory.get_hand_run(id).await?;
match run {
Some(r) if r.status == HandRunStatus::Pending => {
let mut updated = r;
updated.status = HandRunStatus::Cancelled;
updated.completed_at = Some(chrono::Utc::now().to_rfc3339());
self.memory.update_hand_run(&updated).await?;
Ok(())
}
Some(r) => Err(zclaw_types::ZclawError::InvalidInput(
format!("Cannot cancel hand run {} with status {}", id, r.status)
)),
None => Err(zclaw_types::ZclawError::NotFound(
format!("Hand run {} not found", id)
)),
}
}
}
// ============================================================
// Trigger Management
// ============================================================
/// List all triggers
pub async fn list_triggers(&self) -> Vec<crate::trigger_manager::TriggerEntry> {
self.trigger_manager.list_triggers().await
}
/// Get a specific trigger
pub async fn get_trigger(&self, id: &str) -> Option<crate::trigger_manager::TriggerEntry> {
self.trigger_manager.get_trigger(id).await
}
/// Create a new trigger
pub async fn create_trigger(
&self,
config: zclaw_hands::TriggerConfig,
) -> Result<crate::trigger_manager::TriggerEntry> {
self.trigger_manager.create_trigger(config).await
}
/// Update a trigger
pub async fn update_trigger(
&self,
id: &str,
updates: crate::trigger_manager::TriggerUpdateRequest,
) -> Result<crate::trigger_manager::TriggerEntry> {
self.trigger_manager.update_trigger(id, updates).await
}
/// Delete a trigger
pub async fn delete_trigger(&self, id: &str) -> Result<()> {
self.trigger_manager.delete_trigger(id).await
}
/// Execute a trigger
pub async fn execute_trigger(
&self,
id: &str,
input: serde_json::Value,
) -> Result<zclaw_hands::TriggerResult> {
self.trigger_manager.execute_trigger(id, input).await
}
// ============================================================
// Approval Management
// ============================================================
/// List pending approvals
pub async fn list_approvals(&self) -> Vec<ApprovalEntry> {
let approvals = self.pending_approvals.lock().await;
approvals.iter().filter(|a| a.status == "pending").cloned().collect()
}
/// Get a single approval by ID (any status, not just pending)
///
/// Returns None if no approval with the given ID exists.
pub async fn get_approval(&self, id: &str) -> Option<ApprovalEntry> {
let approvals = self.pending_approvals.lock().await;
approvals.iter().find(|a| a.id == id).cloned()
}
/// Create a pending approval (called when a needs_approval hand is triggered)
pub async fn create_approval(&self, hand_id: String, input: serde_json::Value) -> ApprovalEntry {
let entry = ApprovalEntry {
id: uuid::Uuid::new_v4().to_string(),
hand_id,
status: "pending".to_string(),
created_at: chrono::Utc::now(),
input,
reject_reason: None,
};
let mut approvals = self.pending_approvals.lock().await;
approvals.push(entry.clone());
entry
}
/// Respond to an approval
pub async fn respond_to_approval(
&self,
id: &str,
approved: bool,
reason: Option<String>,
) -> Result<()> {
let mut approvals = self.pending_approvals.lock().await;
let entry = approvals.iter_mut().find(|a| a.id == id && a.status == "pending")
.ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Approval not found: {}", id)))?;
entry.status = if approved { "approved".to_string() } else { "rejected".to_string() };
if let Some(r) = reason {
entry.reject_reason = Some(r);
}
if approved {
let hand_id = entry.hand_id.clone();
let input = entry.input.clone();
drop(approvals); // Release lock before async hand execution
// Execute the hand in background with HandRun tracking
let hands = self.hands.clone();
let approvals = self.pending_approvals.clone();
let memory = self.memory.clone();
let running_hand_runs = self.running_hand_runs.clone();
let id_owned = id.to_string();
tokio::spawn(async move {
// Create HandRun record for tracking
let run_id = HandRunId::new();
let now = chrono::Utc::now().to_rfc3339();
let mut run = HandRun {
id: run_id,
hand_name: hand_id.clone(),
trigger_source: TriggerSource::Manual,
params: input.clone(),
status: HandRunStatus::Pending,
result: None,
error: None,
duration_ms: None,
created_at: now.clone(),
started_at: None,
completed_at: None,
};
let _ = memory.save_hand_run(&run).await;
run.status = HandRunStatus::Running;
run.started_at = Some(chrono::Utc::now().to_rfc3339());
let _ = memory.update_hand_run(&run).await;
// Register cancellation flag
let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
running_hand_runs.insert(run.id, cancel_flag.clone());
let context = HandContext::default();
let start = std::time::Instant::now();
let result = hands.execute(&hand_id, &context, input).await;
let duration = start.elapsed();
// Remove from running map
running_hand_runs.remove(&run.id);
// Update HandRun with result
let completed_at = chrono::Utc::now().to_rfc3339();
match &result {
Ok(res) => {
run.status = HandRunStatus::Completed;
run.result = Some(res.output.clone());
run.error = res.error.clone();
}
Err(e) => {
run.status = HandRunStatus::Failed;
run.error = Some(e.to_string());
}
}
run.duration_ms = Some(duration.as_millis() as u64);
run.completed_at = Some(completed_at);
let _ = memory.update_hand_run(&run).await;
// Update approval status based on execution result
let mut approvals = approvals.lock().await;
if let Some(entry) = approvals.iter_mut().find(|a| a.id == id_owned) {
match result {
Ok(_) => entry.status = "completed".to_string(),
Err(e) => {
entry.status = "failed".to_string();
if let Some(obj) = entry.input.as_object_mut() {
obj.insert("error".to_string(), Value::String(format!("{}", e)));
}
}
}
}
});
}
Ok(())
}
/// Cancel a pending approval
pub async fn cancel_approval(&self, id: &str) -> Result<()> {
let mut approvals = self.pending_approvals.lock().await;
let entry = approvals.iter_mut().find(|a| a.id == id && a.status == "pending")
.ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Approval not found: {}", id)))?;
entry.status = "cancelled".to_string();
Ok(())
}
// ============================================================
// A2A (Agent-to-Agent) Messaging
// ============================================================
/// Derive an A2A agent profile from an AgentConfig
#[cfg(feature = "multi-agent")]
fn agent_config_to_a2a_profile(config: &AgentConfig) -> A2aAgentProfile {
let caps: Vec<A2aCapability> = config.tools.iter().map(|tool_name| {
A2aCapability {
name: tool_name.clone(),
description: format!("Tool: {}", tool_name),
input_schema: None,
output_schema: None,
requires_approval: false,
version: "1.0.0".to_string(),
tags: vec![],
}
}).collect();
A2aAgentProfile {
id: config.id,
name: config.name.clone(),
description: config.description.clone().unwrap_or_default(),
capabilities: caps,
protocols: vec!["a2a".to_string()],
role: "worker".to_string(),
priority: 5,
metadata: std::collections::HashMap::new(),
groups: vec![],
last_seen: 0,
}
}
/// Check if an agent is authorized to send messages to a target
#[cfg(feature = "multi-agent")]
fn check_a2a_permission(&self, from: &AgentId, to: &AgentId) -> Result<()> {
let caps = self.capabilities.get(from);
match caps {
Some(cap_set) => {
let has_permission = cap_set.capabilities.iter().any(|cap| {
match cap {
Capability::AgentMessage { pattern } => {
pattern == "*" || to.to_string().starts_with(pattern)
}
_ => false,
}
});
if !has_permission {
return Err(zclaw_types::ZclawError::PermissionDenied(
format!("Agent {} does not have AgentMessage capability for {}", from, to)
));
}
Ok(())
}
None => {
// No capabilities registered — deny by default
Err(zclaw_types::ZclawError::PermissionDenied(
format!("Agent {} has no capabilities registered", from)
))
}
}
}
/// Send a direct A2A message from one agent to another
#[cfg(feature = "multi-agent")]
pub async fn a2a_send(
&self,
from: &AgentId,
to: &AgentId,
payload: serde_json::Value,
message_type: Option<A2aMessageType>,
) -> Result<()> {
// Validate sender exists
self.registry.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Sender agent not found: {}", from)
))?;
// Validate receiver exists and is running
self.registry.get(to)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Target agent not found: {}", to)
))?;
// Check capability permission
self.check_a2a_permission(from, to)?;
// Build and route envelope
let envelope = A2aEnvelope::new(
*from,
A2aRecipient::Direct { agent_id: *to },
message_type.unwrap_or(A2aMessageType::Notification),
payload,
);
self.a2a_router.route(envelope).await?;
// Emit event
self.events.publish(Event::A2aMessageSent {
from: *from,
to: format!("{}", to),
message_type: "direct".to_string(),
});
Ok(())
}
/// Broadcast a message from one agent to all other agents
#[cfg(feature = "multi-agent")]
pub async fn a2a_broadcast(
&self,
from: &AgentId,
payload: serde_json::Value,
) -> Result<()> {
// Validate sender exists
self.registry.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Sender agent not found: {}", from)
))?;
let envelope = A2aEnvelope::new(
*from,
A2aRecipient::Broadcast,
A2aMessageType::Notification,
payload,
);
self.a2a_router.route(envelope).await?;
self.events.publish(Event::A2aMessageSent {
from: *from,
to: "broadcast".to_string(),
message_type: "broadcast".to_string(),
});
Ok(())
}
/// Discover agents that have a specific capability
#[cfg(feature = "multi-agent")]
pub async fn a2a_discover(&self, capability: &str) -> Result<Vec<A2aAgentProfile>> {
let result = self.a2a_router.discover(capability).await?;
self.events.publish(Event::A2aAgentDiscovered {
agent_id: AgentId::new(),
capabilities: vec![capability.to_string()],
});
Ok(result)
}
/// Try to receive a pending A2A message for an agent (non-blocking)
#[cfg(feature = "multi-agent")]
pub async fn a2a_receive(&self, agent_id: &AgentId) -> Result<Option<A2aEnvelope>> {
let inbox = self.a2a_inboxes.get(agent_id)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("No A2A inbox for agent: {}", agent_id)
))?;
let mut rx = inbox.lock().await;
match rx.try_recv() {
Ok(envelope) => {
self.events.publish(Event::A2aMessageReceived {
from: envelope.from,
to: format!("{}", agent_id),
message_type: "direct".to_string(),
});
Ok(Some(envelope))
}
Err(_) => Ok(None),
}
}
/// Delegate a task to another agent and wait for response with timeout
#[cfg(feature = "multi-agent")]
pub async fn a2a_delegate_task(
&self,
from: &AgentId,
to: &AgentId,
task_description: String,
timeout_ms: u64,
) -> Result<serde_json::Value> {
// Validate both agents exist
self.registry.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Sender agent not found: {}", from)
))?;
self.registry.get(to)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("Target agent not found: {}", to)
))?;
// Check capability permission
self.check_a2a_permission(from, to)?;
// Send task request
let task_id = uuid::Uuid::new_v4().to_string();
let envelope = A2aEnvelope::new(
*from,
A2aRecipient::Direct { agent_id: *to },
A2aMessageType::Task,
serde_json::json!({
"task_id": task_id,
"description": task_description,
}),
).with_conversation(task_id.clone());
let envelope_id = envelope.id.clone();
self.a2a_router.route(envelope).await?;
self.events.publish(Event::A2aMessageSent {
from: *from,
to: format!("{}", to),
message_type: "task".to_string(),
});
// Wait for response with timeout
let timeout = tokio::time::Duration::from_millis(timeout_ms);
let result = tokio::time::timeout(timeout, async {
let inbox = self.a2a_inboxes.get(from)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
format!("No A2A inbox for agent: {}", from)
))?;
let mut rx = inbox.lock().await;
// Poll for matching response
loop {
match rx.recv().await {
Some(msg) => {
// Check if this is a response to our task
if msg.message_type == A2aMessageType::Response
&& msg.reply_to.as_deref() == Some(&envelope_id) {
return Ok::<_, zclaw_types::ZclawError>(msg.payload);
}
// Not our response — put it back by logging it (would need a re-queue mechanism for production)
tracing::warn!("Received non-matching A2A response, discarding: {}", msg.id);
}
None => {
return Err(zclaw_types::ZclawError::Internal(
"A2A inbox channel closed".to_string()
));
}
}
}
}).await;
match result {
Ok(Ok(payload)) => Ok(payload),
Ok(Err(e)) => Err(e),
Err(_) => Err(zclaw_types::ZclawError::Timeout(
format!("A2A task delegation timed out after {}ms", timeout_ms)
)),
}
}
/// Get all online agents via A2A profiles
#[cfg(feature = "multi-agent")]
pub async fn a2a_get_online_agents(&self) -> Result<Vec<A2aAgentProfile>> {
Ok(self.a2a_router.list_profiles().await)
}
}
#[derive(Debug, Clone)]
pub struct ApprovalEntry {
pub id: String,
pub hand_id: String,
pub status: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub input: serde_json::Value,
pub reject_reason: Option<String>,
}
/// Response from sending a message
#[derive(Debug, Clone)]
pub struct MessageResponse {
pub content: String,
pub input_tokens: u32,
pub output_tokens: u32,
}