//! Kernel - central coordinator use std::pin::Pin; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, Mutex}; use zclaw_types::{AgentConfig, AgentId, AgentInfo, Capability, Event, Result, HandRun, HandRunId, HandRunStatus, HandRunFilter, TriggerSource}; #[cfg(feature = "multi-agent")] use zclaw_protocols::{A2aRouter, A2aAgentProfile, A2aCapability, A2aEnvelope, A2aMessageType, A2aRecipient}; use async_trait::async_trait; use serde_json::Value; use crate::registry::AgentRegistry; use crate::capabilities::CapabilityManager; use crate::events::EventBus; use crate::config::KernelConfig; use zclaw_memory::MemoryStore; use zclaw_runtime::{AgentLoop, LlmDriver, ToolRegistry, tool::SkillExecutor, tool::builtin::PathValidator}; use zclaw_skills::SkillRegistry; use zclaw_skills::LlmCompleter; use zclaw_hands::{HandRegistry, HandContext, HandResult, hands::{BrowserHand, SlideshowHand, SpeechHand, QuizHand, WhiteboardHand, ResearcherHand, CollectorHand, ClipHand, TwitterHand, quiz::LlmQuizGenerator}}; /// Adapter that bridges `zclaw_runtime::LlmDriver` → `zclaw_skills::LlmCompleter` struct LlmDriverAdapter { driver: Arc, max_tokens: u32, temperature: f32, } impl zclaw_skills::LlmCompleter for LlmDriverAdapter { fn complete( &self, prompt: &str, ) -> Pin> + Send + '_>> { let driver = self.driver.clone(); let prompt = prompt.to_string(); Box::pin(async move { let request = zclaw_runtime::CompletionRequest { messages: vec![zclaw_types::Message::user(prompt)], max_tokens: Some(self.max_tokens), temperature: Some(self.temperature), ..Default::default() }; let response = driver.complete(request).await .map_err(|e| format!("LLM completion error: {}", e))?; // Extract text from content blocks let text: String = response.content.iter() .filter_map(|block| match block { zclaw_runtime::ContentBlock::Text { text } => Some(text.as_str()), _ => None, }) .collect::>() .join(""); Ok(text) }) } } /// Skill executor implementation for Kernel pub struct KernelSkillExecutor { skills: Arc, llm: Arc, } impl KernelSkillExecutor { pub fn new(skills: Arc, driver: Arc) -> Self { let llm: Arc = Arc::new(LlmDriverAdapter { driver, max_tokens: 4096, temperature: 0.7 }); Self { skills, llm } } } #[async_trait] impl SkillExecutor for KernelSkillExecutor { async fn execute_skill( &self, skill_id: &str, agent_id: &str, session_id: &str, input: Value, ) -> Result { let context = zclaw_skills::SkillContext { agent_id: agent_id.to_string(), session_id: session_id.to_string(), llm: Some(self.llm.clone()), ..Default::default() }; let result = self.skills.execute(&zclaw_types::SkillId::new(skill_id), &context, input).await?; Ok(result.output) } fn get_skill_detail(&self, skill_id: &str) -> Option { let manifests = self.skills.manifests_snapshot(); let manifest = manifests.get(&zclaw_types::SkillId::new(skill_id))?; Some(zclaw_runtime::tool::SkillDetail { id: manifest.id.as_str().to_string(), name: manifest.name.clone(), description: manifest.description.clone(), category: manifest.category.clone(), input_schema: manifest.input_schema.clone(), triggers: manifest.triggers.clone(), capabilities: manifest.capabilities.clone(), }) } fn list_skill_index(&self) -> Vec { let manifests = self.skills.manifests_snapshot(); manifests.values() .filter(|m| m.enabled) .map(|m| zclaw_runtime::tool::SkillIndexEntry { id: m.id.as_str().to_string(), description: m.description.clone(), triggers: m.triggers.clone(), }) .collect() } } /// The ZCLAW Kernel pub struct Kernel { config: KernelConfig, registry: AgentRegistry, capabilities: CapabilityManager, events: EventBus, memory: Arc, driver: Arc, llm_completer: Arc, skills: Arc, skill_executor: Arc, hands: Arc, trigger_manager: crate::trigger_manager::TriggerManager, pending_approvals: Arc>>, /// Running hand runs that can be cancelled (run_id -> cancelled flag) running_hand_runs: Arc>>, /// Shared memory storage backend for Growth system viking: Arc, /// A2A router for inter-agent messaging (gated by multi-agent feature) #[cfg(feature = "multi-agent")] a2a_router: Arc, /// Per-agent A2A inbox receivers #[cfg(feature = "multi-agent")] a2a_inboxes: Arc>>>>, } impl Kernel { /// Boot the kernel with the given configuration pub async fn boot(config: KernelConfig) -> Result { // Initialize memory store let memory = Arc::new(MemoryStore::new(&config.database_url).await?); // Initialize driver based on config let driver = config.create_driver()?; // Initialize subsystems let registry = AgentRegistry::new(); let capabilities = CapabilityManager::new(); let events = EventBus::new(); // Initialize skill registry let skills = Arc::new(SkillRegistry::new()); // Scan skills directory if configured if let Some(ref skills_dir) = config.skills_dir { if skills_dir.exists() { skills.add_skill_dir(skills_dir.clone()).await?; } } // Initialize hand registry with built-in hands let hands = Arc::new(HandRegistry::new()); let quiz_model = config.model().to_string(); let quiz_generator = Arc::new(LlmQuizGenerator::new(driver.clone(), quiz_model)); hands.register(Arc::new(BrowserHand::new())).await; hands.register(Arc::new(SlideshowHand::new())).await; hands.register(Arc::new(SpeechHand::new())).await; hands.register(Arc::new(QuizHand::with_generator(quiz_generator))).await; hands.register(Arc::new(WhiteboardHand::new())).await; hands.register(Arc::new(ResearcherHand::new())).await; hands.register(Arc::new(CollectorHand::new())).await; hands.register(Arc::new(ClipHand::new())).await; hands.register(Arc::new(TwitterHand::new())).await; // Create skill executor let skill_executor = Arc::new(KernelSkillExecutor::new(skills.clone(), driver.clone())); // Create LLM completer for skill system (shared with skill_executor) let llm_completer: Arc = Arc::new(LlmDriverAdapter { driver: driver.clone(), max_tokens: config.max_tokens(), temperature: config.temperature(), }); // Initialize trigger manager let trigger_manager = crate::trigger_manager::TriggerManager::new(hands.clone()); // Initialize Growth system — shared VikingAdapter for memory storage let viking = Arc::new(zclaw_runtime::VikingAdapter::in_memory()); // Restore persisted agents let persisted = memory.list_agents().await?; for agent in persisted { registry.register(agent); } // Initialize A2A router for multi-agent support #[cfg(feature = "multi-agent")] let a2a_router = { let kernel_agent_id = AgentId::new(); Arc::new(A2aRouter::new(kernel_agent_id)) }; Ok(Self { config, registry, capabilities, events, memory, driver, llm_completer, skills, skill_executor, hands, trigger_manager, pending_approvals: Arc::new(Mutex::new(Vec::new())), running_hand_runs: Arc::new(dashmap::DashMap::new()), viking, #[cfg(feature = "multi-agent")] a2a_router, #[cfg(feature = "multi-agent")] a2a_inboxes: Arc::new(dashmap::DashMap::new()), }) } /// Create a tool registry with built-in tools fn create_tool_registry(&self) -> ToolRegistry { let mut tools = ToolRegistry::new(); zclaw_runtime::tool::builtin::register_builtin_tools(&mut tools); tools } /// Create the middleware chain for the agent loop. /// /// When middleware is configured, cross-cutting concerns (compaction, loop guard, /// token calibration, etc.) are delegated to the chain. When no middleware is /// registered, the legacy inline path in `AgentLoop` is used instead. fn create_middleware_chain(&self) -> Option { let mut chain = zclaw_runtime::middleware::MiddlewareChain::new(); // Growth integration — shared VikingAdapter for memory middleware & compaction let growth = zclaw_runtime::GrowthIntegration::new(self.viking.clone()); // Compaction middleware — only register when threshold > 0 let threshold = self.config.compaction_threshold(); if threshold > 0 { use std::sync::Arc; let growth_for_compaction = zclaw_runtime::GrowthIntegration::new(self.viking.clone()); let mw = zclaw_runtime::middleware::compaction::CompactionMiddleware::new( threshold, zclaw_runtime::CompactionConfig::default(), Some(self.driver.clone()), Some(growth_for_compaction), ); chain.register(Arc::new(mw)); } // Memory middleware — auto-extract memories after conversations { use std::sync::Arc; let mw = zclaw_runtime::middleware::memory::MemoryMiddleware::new(growth); chain.register(Arc::new(mw)); } // Loop guard middleware { use std::sync::Arc; let mw = zclaw_runtime::middleware::loop_guard::LoopGuardMiddleware::with_defaults(); chain.register(Arc::new(mw)); } // Token calibration middleware { use std::sync::Arc; let mw = zclaw_runtime::middleware::token_calibration::TokenCalibrationMiddleware::new(); chain.register(Arc::new(mw)); } // Skill index middleware — inject lightweight index instead of full descriptions { use std::sync::Arc; let entries = self.skill_executor.list_skill_index(); if !entries.is_empty() { let mw = zclaw_runtime::middleware::skill_index::SkillIndexMiddleware::new(entries); chain.register(Arc::new(mw)); } } // Guardrail middleware — safety rules for tool calls { use std::sync::Arc; let mw = zclaw_runtime::middleware::guardrail::GuardrailMiddleware::new(true) .with_builtin_rules(); chain.register(Arc::new(mw)); } // Only return Some if we actually registered middleware if chain.is_empty() { None } else { tracing::info!("[Kernel] Middleware chain created with {} middlewares", chain.len()); Some(chain) } } /// Build a system prompt with skill information injected async fn build_system_prompt_with_skills(&self, base_prompt: Option<&String>) -> String { // Get skill list asynchronously let skills = self.skills.list().await; let mut prompt = base_prompt .map(|p| p.clone()) .unwrap_or_else(|| "You are a helpful AI assistant.".to_string()); // Inject skill information with categories if !skills.is_empty() { prompt.push_str("\n\n## Available Skills\n\n"); prompt.push_str("You have access to specialized skills. Analyze user intent and autonomously call `execute_skill` with the appropriate skill_id.\n\n"); // Group skills by category based on their ID patterns let categories = self.categorize_skills(&skills); for (category, category_skills) in categories { prompt.push_str(&format!("### {}\n", category)); for skill in category_skills { prompt.push_str(&format!( "- **{}**: {}", skill.id.as_str(), skill.description )); prompt.push('\n'); } prompt.push('\n'); } prompt.push_str("### When to use skills:\n"); prompt.push_str("- **IMPORTANT**: You should autonomously decide when to use skills based on your understanding of the user's intent.\n"); prompt.push_str("- Do not wait for explicit skill names - recognize the need and act.\n"); prompt.push_str("- Match user's request to the most appropriate skill's domain.\n"); prompt.push_str("- If multiple skills could apply, choose the most specialized one.\n\n"); prompt.push_str("### Example:\n"); prompt.push_str("User: \"分析腾讯财报\" → Intent: Financial analysis → Call: execute_skill(\"finance-tracker\", {...})\n"); } prompt } /// Categorize skills into logical groups /// /// Priority: /// 1. Use skill's `category` field if defined in SKILL.md /// 2. Fall back to pattern matching for backward compatibility fn categorize_skills<'a>(&self, skills: &'a [zclaw_skills::SkillManifest]) -> Vec<(String, Vec<&'a zclaw_skills::SkillManifest>)> { let mut categories: std::collections::HashMap> = std::collections::HashMap::new(); // Fallback category patterns for skills without explicit category let fallback_patterns = [ ("开发工程", vec!["senior-developer", "frontend-developer", "backend-architect", "ai-engineer", "devops-automator", "rapid-prototyper", "lsp-index-engineer"]), ("测试质量", vec!["api-tester", "evidence-collector", "reality-checker", "performance-benchmarker", "test-results-analyzer", "accessibility-auditor", "code-review"]), ("安全合规", vec!["security-engineer", "legal-compliance-checker", "agentic-identity-trust"]), ("数据分析", vec!["analytics-reporter", "finance-tracker", "data-analysis", "sales-data-extraction-agent", "data-consolidation-agent", "report-distribution-agent"]), ("项目管理", vec!["senior-pm", "project-shepherd", "sprint-prioritizer", "experiment-tracker", "feedback-synthesizer", "trend-researcher", "agents-orchestrator"]), ("设计UX", vec!["ui-designer", "ux-architect", "ux-researcher", "visual-storyteller", "image-prompt-engineer", "whimsy-injector", "brand-guardian"]), ("内容营销", vec!["content-creator", "chinese-writing", "executive-summary-generator", "social-media-strategist"]), ("社交平台", vec!["twitter-engager", "instagram-curator", "tiktok-strategist", "reddit-community-builder", "zhihu-strategist", "xiaohongshu-specialist", "wechat-official-account", "growth-hacker", "app-store-optimizer"]), ("运营支持", vec!["studio-operations", "studio-producer", "support-responder", "workflow-optimizer", "infrastructure-maintainer", "tool-evaluator"]), ("XR/空间计算", vec!["visionos-spatial-engineer", "macos-spatial-metal-engineer", "xr-immersive-developer", "xr-interface-architect", "xr-cockpit-interaction-specialist", "terminal-integration-specialist"]), ("基础工具", vec!["web-search", "file-operations", "shell-command", "git", "translation", "feishu-docs"]), ]; // Categorize each skill for skill in skills { // Priority 1: Use skill's explicit category if let Some(ref category) = skill.category { if !category.is_empty() { categories.entry(category.clone()).or_default().push(skill); continue; } } // Priority 2: Fallback to pattern matching let skill_id = skill.id.as_str(); let mut categorized = false; for (category, patterns) in &fallback_patterns { if patterns.iter().any(|p| skill_id.contains(p) || *p == skill_id) { categories.entry(category.to_string()).or_default().push(skill); categorized = true; break; } } // Put uncategorized skills in "其他" if !categorized { categories.entry("其他".to_string()).or_default().push(skill); } } // Convert to ordered vector let mut result: Vec<(String, Vec<_>)> = categories.into_iter().collect(); result.sort_by(|a, b| { // Sort by predefined order let order = ["开发工程", "测试质量", "安全合规", "数据分析", "项目管理", "设计UX", "内容营销", "社交平台", "运营支持", "XR/空间计算", "基础工具", "其他"]; let a_idx = order.iter().position(|&x| x == a.0).unwrap_or(99); let b_idx = order.iter().position(|&x| x == b.0).unwrap_or(99); a_idx.cmp(&b_idx) }); result } /// Spawn a new agent pub async fn spawn_agent(&self, config: AgentConfig) -> Result { let id = config.id; // Validate capabilities self.capabilities.validate(&config.capabilities)?; // Register in memory self.memory.save_agent(&config).await?; // Register in registry let config_clone = config.clone(); self.registry.register(config); // Register with A2A router for multi-agent messaging #[cfg(feature = "multi-agent")] { let profile = Self::agent_config_to_a2a_profile(&config_clone); let rx = self.a2a_router.register_agent(profile).await; self.a2a_inboxes.insert(id, Arc::new(Mutex::new(rx))); } // Emit event self.events.publish(Event::AgentSpawned { agent_id: id, name: self.registry.get(&id).map(|a| a.name.clone()).unwrap_or_default(), }); Ok(id) } /// Kill an agent pub async fn kill_agent(&self, id: &AgentId) -> Result<()> { // Remove from registry self.registry.unregister(id); // Remove from memory self.memory.delete_agent(id).await?; // Unregister from A2A router #[cfg(feature = "multi-agent")] { self.a2a_router.unregister_agent(id).await; self.a2a_inboxes.remove(id); } // Emit event self.events.publish(Event::AgentTerminated { agent_id: *id, reason: "killed".to_string(), }); Ok(()) } /// List all agents pub fn list_agents(&self) -> Vec { self.registry.list() } /// Get agent info pub fn get_agent(&self, id: &AgentId) -> Option { self.registry.get_info(id) } /// Get agent config (for export) pub fn get_agent_config(&self, id: &AgentId) -> Option { self.registry.get(id) } /// Send a message to an agent pub async fn send_message(&self, agent_id: &AgentId, message: String) -> Result { let agent_config = self.registry.get(agent_id) .ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Agent not found: {}", agent_id)))?; // Create or get session let session_id = self.memory.create_session(agent_id).await?; // Always use Kernel's current model configuration // This ensures user's "模型与 API" settings are respected let model = self.config.model().to_string(); // Create agent loop with model configuration let tools = self.create_tool_registry(); let mut loop_runner = AgentLoop::new( *agent_id, self.driver.clone(), tools, self.memory.clone(), ) .with_model(&model) .with_skill_executor(self.skill_executor.clone()) .with_max_tokens(agent_config.max_tokens.unwrap_or_else(|| self.config.max_tokens())) .with_temperature(agent_config.temperature.unwrap_or_else(|| self.config.temperature())) .with_compaction_threshold( agent_config.compaction_threshold .map(|t| t as usize) .unwrap_or_else(|| self.config.compaction_threshold()), ); // Set path validator from agent's workspace directory (if configured) if let Some(ref workspace) = agent_config.workspace { let path_validator = PathValidator::new().with_workspace(workspace.clone()); tracing::info!( "[Kernel] Setting path_validator with workspace: {} for agent {}", workspace.display(), agent_id ); loop_runner = loop_runner.with_path_validator(path_validator); } // Inject middleware chain if available if let Some(chain) = self.create_middleware_chain() { loop_runner = loop_runner.with_middleware_chain(chain); } // Build system prompt with skill information injected let system_prompt = self.build_system_prompt_with_skills(agent_config.system_prompt.as_ref()).await; let loop_runner = loop_runner.with_system_prompt(&system_prompt); // Run the loop let result = loop_runner.run(session_id, message).await?; // Track message count self.registry.increment_message_count(agent_id); Ok(MessageResponse { content: result.response, input_tokens: result.input_tokens, output_tokens: result.output_tokens, }) } /// Send a message with streaming pub async fn send_message_stream( &self, agent_id: &AgentId, message: String, ) -> Result> { self.send_message_stream_with_prompt(agent_id, message, None, None).await } /// Send a message with streaming, optional system prompt, and optional session reuse pub async fn send_message_stream_with_prompt( &self, agent_id: &AgentId, message: String, system_prompt_override: Option, session_id_override: Option, ) -> Result> { let agent_config = self.registry.get(agent_id) .ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Agent not found: {}", agent_id)))?; // Reuse existing session or create new one let session_id = match session_id_override { Some(id) => { // Verify the session exists; if not, create a new one let existing = self.memory.get_messages(&id).await; match existing { Ok(msgs) if !msgs.is_empty() => id, _ => { tracing::debug!("Session {} not found or empty, creating new session", id); self.memory.create_session(agent_id).await? } } } None => self.memory.create_session(agent_id).await?, }; // Always use Kernel's current model configuration // This ensures user's "模型与 API" settings are respected let model = self.config.model().to_string(); // Create agent loop with model configuration let tools = self.create_tool_registry(); let mut loop_runner = AgentLoop::new( *agent_id, self.driver.clone(), tools, self.memory.clone(), ) .with_model(&model) .with_skill_executor(self.skill_executor.clone()) .with_max_tokens(agent_config.max_tokens.unwrap_or_else(|| self.config.max_tokens())) .with_temperature(agent_config.temperature.unwrap_or_else(|| self.config.temperature())) .with_compaction_threshold( agent_config.compaction_threshold .map(|t| t as usize) .unwrap_or_else(|| self.config.compaction_threshold()), ); // Set path validator from agent's workspace directory (if configured) // This enables file_read / file_write tools to access the workspace if let Some(ref workspace) = agent_config.workspace { let path_validator = PathValidator::new().with_workspace(workspace.clone()); tracing::info!( "[Kernel] Setting path_validator with workspace: {} for agent {}", workspace.display(), agent_id ); loop_runner = loop_runner.with_path_validator(path_validator); } // Inject middleware chain if available if let Some(chain) = self.create_middleware_chain() { loop_runner = loop_runner.with_middleware_chain(chain); } // Use external prompt if provided, otherwise build default let system_prompt = match system_prompt_override { Some(prompt) => prompt, None => self.build_system_prompt_with_skills(agent_config.system_prompt.as_ref()).await, }; let loop_runner = loop_runner.with_system_prompt(&system_prompt); // Run with streaming self.registry.increment_message_count(agent_id); loop_runner.run_streaming(session_id, message).await } /// Subscribe to events pub fn subscribe(&self) -> broadcast::Receiver { self.events.subscribe() } /// Shutdown the kernel pub async fn shutdown(&self) -> Result<()> { self.events.publish(Event::KernelShutdown); Ok(()) } /// Get the kernel configuration pub fn config(&self) -> &KernelConfig { &self.config } /// Get the LLM driver pub fn driver(&self) -> Arc { self.driver.clone() } /// Get the skills registry pub fn skills(&self) -> &Arc { &self.skills } /// List all discovered skills pub async fn list_skills(&self) -> Vec { self.skills.list().await } /// Refresh skills from a directory pub async fn refresh_skills(&self, dir: Option) -> Result<()> { if let Some(path) = dir { self.skills.add_skill_dir(path).await?; } else if let Some(ref skills_dir) = self.config.skills_dir { self.skills.add_skill_dir(skills_dir.clone()).await?; } Ok(()) } /// Execute a skill with the given ID and input pub async fn execute_skill( &self, id: &str, context: zclaw_skills::SkillContext, input: serde_json::Value, ) -> Result { // Inject LLM completer into context for PromptOnly skills let mut ctx = context; if ctx.llm.is_none() { ctx.llm = Some(self.llm_completer.clone()); } self.skills.execute(&zclaw_types::SkillId::new(id), &ctx, input).await } /// Get the hands registry pub fn hands(&self) -> &Arc { &self.hands } /// List all registered hands pub async fn list_hands(&self) -> Vec { self.hands.list().await } /// Execute a hand with the given input, tracking the run pub async fn execute_hand( &self, hand_id: &str, input: serde_json::Value, ) -> Result<(HandResult, HandRunId)> { let run_id = HandRunId::new(); let now = chrono::Utc::now().to_rfc3339(); // Create the initial HandRun record let mut run = HandRun { id: run_id, hand_name: hand_id.to_string(), trigger_source: TriggerSource::Manual, params: input.clone(), status: HandRunStatus::Pending, result: None, error: None, duration_ms: None, created_at: now.clone(), started_at: None, completed_at: None, }; self.memory.save_hand_run(&run).await?; // Transition to Running run.status = HandRunStatus::Running; run.started_at = Some(chrono::Utc::now().to_rfc3339()); self.memory.update_hand_run(&run).await?; // Register cancellation flag let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); self.running_hand_runs.insert(run_id, cancel_flag.clone()); // Execute the hand let context = HandContext::default(); let start = std::time::Instant::now(); let hand_result = self.hands.execute(hand_id, &context, input).await; let duration = start.elapsed(); // Check if cancelled during execution if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) { let mut run_update = run.clone(); run_update.status = HandRunStatus::Cancelled; run_update.completed_at = Some(chrono::Utc::now().to_rfc3339()); run_update.duration_ms = Some(duration.as_millis() as u64); self.memory.update_hand_run(&run_update).await?; self.running_hand_runs.remove(&run_id); return Err(zclaw_types::ZclawError::Internal("Hand execution cancelled".to_string())); } // Remove from running map self.running_hand_runs.remove(&run_id); // Update HandRun with result let completed_at = chrono::Utc::now().to_rfc3339(); match &hand_result { Ok(res) => { run.status = HandRunStatus::Completed; run.result = Some(res.output.clone()); run.error = res.error.clone(); } Err(e) => { run.status = HandRunStatus::Failed; run.error = Some(e.to_string()); } } run.duration_ms = Some(duration.as_millis() as u64); run.completed_at = Some(completed_at); self.memory.update_hand_run(&run).await?; hand_result.map(|res| (res, run_id)) } /// Execute a hand with a specific trigger source (for scheduled/event triggers) pub async fn execute_hand_with_source( &self, hand_id: &str, input: serde_json::Value, trigger_source: TriggerSource, ) -> Result<(HandResult, HandRunId)> { let run_id = HandRunId::new(); let now = chrono::Utc::now().to_rfc3339(); let mut run = HandRun { id: run_id, hand_name: hand_id.to_string(), trigger_source, params: input.clone(), status: HandRunStatus::Pending, result: None, error: None, duration_ms: None, created_at: now, started_at: None, completed_at: None, }; self.memory.save_hand_run(&run).await?; run.status = HandRunStatus::Running; run.started_at = Some(chrono::Utc::now().to_rfc3339()); self.memory.update_hand_run(&run).await?; let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); self.running_hand_runs.insert(run_id, cancel_flag.clone()); let context = HandContext::default(); let start = std::time::Instant::now(); let hand_result = self.hands.execute(hand_id, &context, input).await; let duration = start.elapsed(); // Check if cancelled during execution if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) { run.status = HandRunStatus::Cancelled; run.completed_at = Some(chrono::Utc::now().to_rfc3339()); run.duration_ms = Some(duration.as_millis() as u64); self.memory.update_hand_run(&run).await?; self.running_hand_runs.remove(&run_id); return Err(zclaw_types::ZclawError::Internal("Hand execution cancelled".to_string())); } self.running_hand_runs.remove(&run_id); let completed_at = chrono::Utc::now().to_rfc3339(); match &hand_result { Ok(res) => { run.status = HandRunStatus::Completed; run.result = Some(res.output.clone()); run.error = res.error.clone(); } Err(e) => { run.status = HandRunStatus::Failed; run.error = Some(e.to_string()); } } run.duration_ms = Some(duration.as_millis() as u64); run.completed_at = Some(completed_at); self.memory.update_hand_run(&run).await?; hand_result.map(|res| (res, run_id)) } // ============================================================ // Hand Run Tracking // ============================================================ /// Get a hand run by ID pub async fn get_hand_run(&self, id: &HandRunId) -> Result> { self.memory.get_hand_run(id).await } /// List hand runs with filter pub async fn list_hand_runs(&self, filter: &HandRunFilter) -> Result> { self.memory.list_hand_runs(filter).await } /// Count hand runs matching filter pub async fn count_hand_runs(&self, filter: &HandRunFilter) -> Result { self.memory.count_hand_runs(filter).await } /// Cancel a running hand execution pub async fn cancel_hand_run(&self, id: &HandRunId) -> Result<()> { if let Some((_, flag)) = self.running_hand_runs.remove(id) { flag.store(true, std::sync::atomic::Ordering::Relaxed); // Note: the actual status update happens in execute_hand_with_source // when it detects the cancel flag Ok(()) } else { // Not currently running — check if exists at all let run = self.memory.get_hand_run(id).await?; match run { Some(r) if r.status == HandRunStatus::Pending => { let mut updated = r; updated.status = HandRunStatus::Cancelled; updated.completed_at = Some(chrono::Utc::now().to_rfc3339()); self.memory.update_hand_run(&updated).await?; Ok(()) } Some(r) => Err(zclaw_types::ZclawError::InvalidInput( format!("Cannot cancel hand run {} with status {}", id, r.status) )), None => Err(zclaw_types::ZclawError::NotFound( format!("Hand run {} not found", id) )), } } } // ============================================================ // Trigger Management // ============================================================ /// List all triggers pub async fn list_triggers(&self) -> Vec { self.trigger_manager.list_triggers().await } /// Get a specific trigger pub async fn get_trigger(&self, id: &str) -> Option { self.trigger_manager.get_trigger(id).await } /// Create a new trigger pub async fn create_trigger( &self, config: zclaw_hands::TriggerConfig, ) -> Result { self.trigger_manager.create_trigger(config).await } /// Update a trigger pub async fn update_trigger( &self, id: &str, updates: crate::trigger_manager::TriggerUpdateRequest, ) -> Result { self.trigger_manager.update_trigger(id, updates).await } /// Delete a trigger pub async fn delete_trigger(&self, id: &str) -> Result<()> { self.trigger_manager.delete_trigger(id).await } /// Execute a trigger pub async fn execute_trigger( &self, id: &str, input: serde_json::Value, ) -> Result { self.trigger_manager.execute_trigger(id, input).await } // ============================================================ // Approval Management // ============================================================ /// List pending approvals pub async fn list_approvals(&self) -> Vec { let approvals = self.pending_approvals.lock().await; approvals.iter().filter(|a| a.status == "pending").cloned().collect() } /// Get a single approval by ID (any status, not just pending) /// /// Returns None if no approval with the given ID exists. pub async fn get_approval(&self, id: &str) -> Option { let approvals = self.pending_approvals.lock().await; approvals.iter().find(|a| a.id == id).cloned() } /// Create a pending approval (called when a needs_approval hand is triggered) pub async fn create_approval(&self, hand_id: String, input: serde_json::Value) -> ApprovalEntry { let entry = ApprovalEntry { id: uuid::Uuid::new_v4().to_string(), hand_id, status: "pending".to_string(), created_at: chrono::Utc::now(), input, reject_reason: None, }; let mut approvals = self.pending_approvals.lock().await; approvals.push(entry.clone()); entry } /// Respond to an approval pub async fn respond_to_approval( &self, id: &str, approved: bool, reason: Option, ) -> Result<()> { let mut approvals = self.pending_approvals.lock().await; let entry = approvals.iter_mut().find(|a| a.id == id && a.status == "pending") .ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Approval not found: {}", id)))?; entry.status = if approved { "approved".to_string() } else { "rejected".to_string() }; if let Some(r) = reason { entry.reject_reason = Some(r); } if approved { let hand_id = entry.hand_id.clone(); let input = entry.input.clone(); drop(approvals); // Release lock before async hand execution // Execute the hand in background with HandRun tracking let hands = self.hands.clone(); let approvals = self.pending_approvals.clone(); let memory = self.memory.clone(); let running_hand_runs = self.running_hand_runs.clone(); let id_owned = id.to_string(); tokio::spawn(async move { // Create HandRun record for tracking let run_id = HandRunId::new(); let now = chrono::Utc::now().to_rfc3339(); let mut run = HandRun { id: run_id, hand_name: hand_id.clone(), trigger_source: TriggerSource::Manual, params: input.clone(), status: HandRunStatus::Pending, result: None, error: None, duration_ms: None, created_at: now.clone(), started_at: None, completed_at: None, }; let _ = memory.save_hand_run(&run).await; run.status = HandRunStatus::Running; run.started_at = Some(chrono::Utc::now().to_rfc3339()); let _ = memory.update_hand_run(&run).await; // Register cancellation flag let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); running_hand_runs.insert(run.id, cancel_flag.clone()); let context = HandContext::default(); let start = std::time::Instant::now(); let result = hands.execute(&hand_id, &context, input).await; let duration = start.elapsed(); // Remove from running map running_hand_runs.remove(&run.id); // Update HandRun with result let completed_at = chrono::Utc::now().to_rfc3339(); match &result { Ok(res) => { run.status = HandRunStatus::Completed; run.result = Some(res.output.clone()); run.error = res.error.clone(); } Err(e) => { run.status = HandRunStatus::Failed; run.error = Some(e.to_string()); } } run.duration_ms = Some(duration.as_millis() as u64); run.completed_at = Some(completed_at); let _ = memory.update_hand_run(&run).await; // Update approval status based on execution result let mut approvals = approvals.lock().await; if let Some(entry) = approvals.iter_mut().find(|a| a.id == id_owned) { match result { Ok(_) => entry.status = "completed".to_string(), Err(e) => { entry.status = "failed".to_string(); if let Some(obj) = entry.input.as_object_mut() { obj.insert("error".to_string(), Value::String(format!("{}", e))); } } } } }); } Ok(()) } /// Cancel a pending approval pub async fn cancel_approval(&self, id: &str) -> Result<()> { let mut approvals = self.pending_approvals.lock().await; let entry = approvals.iter_mut().find(|a| a.id == id && a.status == "pending") .ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Approval not found: {}", id)))?; entry.status = "cancelled".to_string(); Ok(()) } // ============================================================ // A2A (Agent-to-Agent) Messaging // ============================================================ /// Derive an A2A agent profile from an AgentConfig #[cfg(feature = "multi-agent")] fn agent_config_to_a2a_profile(config: &AgentConfig) -> A2aAgentProfile { let caps: Vec = config.tools.iter().map(|tool_name| { A2aCapability { name: tool_name.clone(), description: format!("Tool: {}", tool_name), input_schema: None, output_schema: None, requires_approval: false, version: "1.0.0".to_string(), tags: vec![], } }).collect(); A2aAgentProfile { id: config.id, name: config.name.clone(), description: config.description.clone().unwrap_or_default(), capabilities: caps, protocols: vec!["a2a".to_string()], role: "worker".to_string(), priority: 5, metadata: std::collections::HashMap::new(), groups: vec![], last_seen: 0, } } /// Check if an agent is authorized to send messages to a target #[cfg(feature = "multi-agent")] fn check_a2a_permission(&self, from: &AgentId, to: &AgentId) -> Result<()> { let caps = self.capabilities.get(from); match caps { Some(cap_set) => { let has_permission = cap_set.capabilities.iter().any(|cap| { match cap { Capability::AgentMessage { pattern } => { pattern == "*" || to.to_string().starts_with(pattern) } _ => false, } }); if !has_permission { return Err(zclaw_types::ZclawError::PermissionDenied( format!("Agent {} does not have AgentMessage capability for {}", from, to) )); } Ok(()) } None => { // No capabilities registered — deny by default Err(zclaw_types::ZclawError::PermissionDenied( format!("Agent {} has no capabilities registered", from) )) } } } /// Send a direct A2A message from one agent to another #[cfg(feature = "multi-agent")] pub async fn a2a_send( &self, from: &AgentId, to: &AgentId, payload: serde_json::Value, message_type: Option, ) -> Result<()> { // Validate sender exists self.registry.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Sender agent not found: {}", from) ))?; // Validate receiver exists and is running self.registry.get(to) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Target agent not found: {}", to) ))?; // Check capability permission self.check_a2a_permission(from, to)?; // Build and route envelope let envelope = A2aEnvelope::new( *from, A2aRecipient::Direct { agent_id: *to }, message_type.unwrap_or(A2aMessageType::Notification), payload, ); self.a2a_router.route(envelope).await?; // Emit event self.events.publish(Event::A2aMessageSent { from: *from, to: format!("{}", to), message_type: "direct".to_string(), }); Ok(()) } /// Broadcast a message from one agent to all other agents #[cfg(feature = "multi-agent")] pub async fn a2a_broadcast( &self, from: &AgentId, payload: serde_json::Value, ) -> Result<()> { // Validate sender exists self.registry.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Sender agent not found: {}", from) ))?; let envelope = A2aEnvelope::new( *from, A2aRecipient::Broadcast, A2aMessageType::Notification, payload, ); self.a2a_router.route(envelope).await?; self.events.publish(Event::A2aMessageSent { from: *from, to: "broadcast".to_string(), message_type: "broadcast".to_string(), }); Ok(()) } /// Discover agents that have a specific capability #[cfg(feature = "multi-agent")] pub async fn a2a_discover(&self, capability: &str) -> Result> { let result = self.a2a_router.discover(capability).await?; self.events.publish(Event::A2aAgentDiscovered { agent_id: AgentId::new(), capabilities: vec![capability.to_string()], }); Ok(result) } /// Try to receive a pending A2A message for an agent (non-blocking) #[cfg(feature = "multi-agent")] pub async fn a2a_receive(&self, agent_id: &AgentId) -> Result> { let inbox = self.a2a_inboxes.get(agent_id) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("No A2A inbox for agent: {}", agent_id) ))?; let mut rx = inbox.lock().await; match rx.try_recv() { Ok(envelope) => { self.events.publish(Event::A2aMessageReceived { from: envelope.from, to: format!("{}", agent_id), message_type: "direct".to_string(), }); Ok(Some(envelope)) } Err(_) => Ok(None), } } /// Delegate a task to another agent and wait for response with timeout #[cfg(feature = "multi-agent")] pub async fn a2a_delegate_task( &self, from: &AgentId, to: &AgentId, task_description: String, timeout_ms: u64, ) -> Result { // Validate both agents exist self.registry.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Sender agent not found: {}", from) ))?; self.registry.get(to) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Target agent not found: {}", to) ))?; // Check capability permission self.check_a2a_permission(from, to)?; // Send task request let task_id = uuid::Uuid::new_v4().to_string(); let envelope = A2aEnvelope::new( *from, A2aRecipient::Direct { agent_id: *to }, A2aMessageType::Task, serde_json::json!({ "task_id": task_id, "description": task_description, }), ).with_conversation(task_id.clone()); let envelope_id = envelope.id.clone(); self.a2a_router.route(envelope).await?; self.events.publish(Event::A2aMessageSent { from: *from, to: format!("{}", to), message_type: "task".to_string(), }); // Wait for response with timeout let timeout = tokio::time::Duration::from_millis(timeout_ms); let result = tokio::time::timeout(timeout, async { let inbox = self.a2a_inboxes.get(from) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("No A2A inbox for agent: {}", from) ))?; let mut rx = inbox.lock().await; // Poll for matching response loop { match rx.recv().await { Some(msg) => { // Check if this is a response to our task if msg.message_type == A2aMessageType::Response && msg.reply_to.as_deref() == Some(&envelope_id) { return Ok::<_, zclaw_types::ZclawError>(msg.payload); } // Not our response — put it back by logging it (would need a re-queue mechanism for production) tracing::warn!("Received non-matching A2A response, discarding: {}", msg.id); } None => { return Err(zclaw_types::ZclawError::Internal( "A2A inbox channel closed".to_string() )); } } } }).await; match result { Ok(Ok(payload)) => Ok(payload), Ok(Err(e)) => Err(e), Err(_) => Err(zclaw_types::ZclawError::Timeout( format!("A2A task delegation timed out after {}ms", timeout_ms) )), } } /// Get all online agents via A2A profiles #[cfg(feature = "multi-agent")] pub async fn a2a_get_online_agents(&self) -> Result> { Ok(self.a2a_router.list_profiles().await) } } #[derive(Debug, Clone)] pub struct ApprovalEntry { pub id: String, pub hand_id: String, pub status: String, pub created_at: chrono::DateTime, pub input: serde_json::Value, pub reject_reason: Option, } /// Response from sending a message #[derive(Debug, Clone)] pub struct MessageResponse { pub content: String, pub input_tokens: u32, pub output_tokens: u32, }