//! Trigger Manager //! //! Manages triggers for automated task execution. //! //! # Lock Order Safety //! //! This module uses a single `RwLock` to avoid potential deadlocks. //! Previously, multiple locks (`triggers` and `states`) could cause deadlocks when //! acquired in different orders across methods. //! //! The unified state structure ensures atomic access to all trigger-related data. use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use zclaw_types::Result; use zclaw_hands::{TriggerConfig, TriggerType, TriggerState, TriggerResult, HandRegistry}; /// Internal state container for all trigger-related data. /// /// Using a single structure behind one RwLock eliminates the possibility of /// deadlocks caused by inconsistent lock acquisition orders. #[derive(Debug)] struct InternalState { /// Registered triggers triggers: HashMap, /// Execution states states: HashMap, } impl InternalState { fn new() -> Self { Self { triggers: HashMap::new(), states: HashMap::new(), } } } /// Trigger manager for coordinating automated triggers pub struct TriggerManager { /// Unified internal state behind a single RwLock. /// /// This prevents deadlocks by ensuring all trigger data is accessed /// through a single lock acquisition point. state: RwLock, /// Hand registry hand_registry: Arc, /// Configuration config: TriggerManagerConfig, } /// Trigger entry with additional metadata #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TriggerEntry { /// Core trigger configuration #[serde(flatten)] pub config: TriggerConfig, /// Creation timestamp pub created_at: DateTime, /// Last modification timestamp pub modified_at: DateTime, /// Optional description pub description: Option, /// Optional tags #[serde(default)] pub tags: Vec, } /// Default max executions per hour fn default_max_executions_per_hour() -> u32 { 10 } /// Default persist value fn default_persist() -> bool { true } /// Trigger manager configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TriggerManagerConfig { /// Maximum executions per hour (default) #[serde(default = "default_max_executions_per_hour")] pub max_executions_per_hour: u32, /// Enable persistent storage #[serde(default = "default_persist")] pub persist: bool, /// Storage path for trigger data pub storage_path: Option, } impl Default for TriggerManagerConfig { fn default() -> Self { Self { max_executions_per_hour: 10, persist: true, storage_path: None, } } } impl TriggerManager { /// Create new trigger manager pub fn new(hand_registry: Arc) -> Self { Self { state: RwLock::new(InternalState::new()), hand_registry, config: TriggerManagerConfig::default(), } } /// Create with custom configuration pub fn with_config( hand_registry: Arc, config: TriggerManagerConfig, ) -> Self { Self { state: RwLock::new(InternalState::new()), hand_registry, config, } } /// List all triggers pub async fn list_triggers(&self) -> Vec { let state = self.state.read().await; state.triggers.values().cloned().collect() } /// Get a specific trigger pub async fn get_trigger(&self, id: &str) -> Option { let state = self.state.read().await; state.triggers.get(id).cloned() } /// Create a new trigger pub async fn create_trigger(&self, config: TriggerConfig) -> Result { // Validate hand exists (outside of our lock to avoid holding two locks) // System hands (prefixed with '_') are exempt from validation — they are // registered at boot but may not appear in the hand registry scan path. if !config.hand_id.starts_with('_') && self.hand_registry.get(&config.hand_id).await.is_none() { return Err(zclaw_types::ZclawError::InvalidInput( format!("Hand '{}' not found", config.hand_id) )); } let id = config.id.clone(); let now = Utc::now(); let entry = TriggerEntry { config, created_at: now, modified_at: now, description: None, tags: Vec::new(), }; // Initialize state and insert trigger atomically under single lock let state = TriggerState::new(&id); { let mut internal = self.state.write().await; internal.states.insert(id.clone(), state); internal.triggers.insert(id.clone(), entry.clone()); } Ok(entry) } /// Update an existing trigger pub async fn update_trigger( &self, id: &str, updates: TriggerUpdateRequest, ) -> Result { // Validate hand exists if being updated (outside of our lock) if let Some(hand_id) = &updates.hand_id { if !hand_id.starts_with('_') && self.hand_registry.get(hand_id).await.is_none() { return Err(zclaw_types::ZclawError::InvalidInput( format!("Hand '{}' not found", hand_id) )); } } let mut internal = self.state.write().await; let Some(entry) = internal.triggers.get_mut(id) else { return Err(zclaw_types::ZclawError::NotFound( format!("Trigger '{}' not found", id) )); }; // Apply updates if let Some(name) = &updates.name { entry.config.name = name.clone(); } if let Some(enabled) = updates.enabled { entry.config.enabled = enabled; } if let Some(hand_id) = &updates.hand_id { entry.config.hand_id = hand_id.clone(); } if let Some(trigger_type) = &updates.trigger_type { entry.config.trigger_type = trigger_type.clone(); } entry.modified_at = Utc::now(); Ok(entry.clone()) } /// Delete a trigger pub async fn delete_trigger(&self, id: &str) -> Result<()> { let mut internal = self.state.write().await; if internal.triggers.remove(id).is_none() { return Err(zclaw_types::ZclawError::NotFound( format!("Trigger '{}' not found", id) )); } // Also remove associated state atomically internal.states.remove(id); Ok(()) } /// Get trigger state pub async fn get_state(&self, id: &str) -> Option { let state = self.state.read().await; state.states.get(id).cloned() } /// Check if trigger should fire based on type and input. /// /// This method performs rate limiting and condition checks using a single /// read lock to avoid deadlocks. pub async fn should_fire(&self, id: &str, input: &serde_json::Value) -> bool { let internal = self.state.read().await; let Some(entry) = internal.triggers.get(id) else { return false; }; // Check if enabled if !entry.config.enabled { return false; } // Check rate limiting using the same lock if let Some(state) = internal.states.get(id) { // Check execution count this hour let one_hour_ago = Utc::now() - chrono::Duration::hours(1); if let Some(last_exec) = state.last_execution { if last_exec > one_hour_ago { if state.execution_count >= self.config.max_executions_per_hour { return false; } } } } // Check trigger-specific conditions match &entry.config.trigger_type { TriggerType::Manual => false, TriggerType::Schedule { cron: _ } => { // For schedule triggers, use cron parser // Simplified check - real implementation would use cron library true } TriggerType::Event { pattern } => { // Check if input matches pattern input.to_string().contains(pattern) } TriggerType::Webhook { path: _, secret: _ } => { // Webhook triggers are fired externally false } TriggerType::MessagePattern { pattern } => { // Check if message matches pattern input.to_string().contains(pattern) } TriggerType::FileSystem { path: _, events: _ } => { // File system triggers are fired by file watcher false } } } /// Execute a trigger. /// /// This method carefully manages lock scope to avoid deadlocks: /// 1. Acquires read lock to check trigger exists and get config /// 2. Releases lock before calling external hand registry /// 3. Acquires write lock to update state pub async fn execute_trigger(&self, id: &str, input: serde_json::Value) -> Result { // Check if should fire (uses its own lock scope) if !self.should_fire(id, &input).await { return Err(zclaw_types::ZclawError::InvalidInput( format!("Trigger '{}' should not fire", id) )); } // Get hand_id (release lock before calling hand registry) let hand_id = { let internal = self.state.read().await; let entry = internal.triggers.get(id) .ok_or_else(|| zclaw_types::ZclawError::NotFound( format!("Trigger '{}' not found", id) ))?; entry.config.hand_id.clone() }; // Get hand (outside of our lock to avoid potential deadlock with hand_registry) // System hands (prefixed with '_') must be registered at boot — same rule as create_trigger. let hand = self.hand_registry.get(&hand_id).await .ok_or_else(|| zclaw_types::ZclawError::InvalidInput( format!("Hand '{}' not found (system hands must be registered at boot)", hand_id) ))?; // Update state before execution { let mut internal = self.state.write().await; let state = internal.states.entry(id.to_string()).or_insert_with(|| TriggerState::new(id)); state.execution_count += 1; } // Execute hand (outside of lock to avoid blocking other operations) let context = zclaw_hands::HandContext { agent_id: zclaw_types::AgentId::new(), working_dir: None, env: std::collections::HashMap::new(), timeout_secs: 300, callback_url: None, }; let hand_result = hand.execute(&context, input.clone()).await; // Build trigger result from hand result let trigger_result = match &hand_result { Ok(res) => TriggerResult { timestamp: Utc::now(), success: res.success, output: Some(res.output.clone()), error: res.error.clone(), trigger_input: input.clone(), }, Err(e) => TriggerResult { timestamp: Utc::now(), success: false, output: None, error: Some(e.to_string()), trigger_input: input.clone(), }, }; // Update state after execution { let mut internal = self.state.write().await; if let Some(state) = internal.states.get_mut(id) { state.last_execution = Some(Utc::now()); state.last_result = Some(trigger_result.clone()); } } // Return the original hand result or convert to trigger result hand_result.map(|_| trigger_result) } } /// Request for updating a trigger #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TriggerUpdateRequest { /// New name pub name: Option, /// Enable/disable pub enabled: Option, /// New hand ID pub hand_id: Option, /// New trigger type pub trigger_type: Option, }