Some checks failed
CI / Rust Check (push) Has been cancelled
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
style: 统一代码格式和注释风格 docs: 更新多个功能文档的完整度和状态 feat(runtime): 添加路径验证工具支持 fix(pipeline): 改进条件判断和变量解析逻辑 test(types): 为ID类型添加全面测试用例 chore: 更新依赖项和Cargo.lock文件 perf(mcp): 优化MCP协议传输和错误处理
373 lines
12 KiB
Rust
373 lines
12 KiB
Rust
//! Trigger Manager
|
|
//!
|
|
//! Manages triggers for automated task execution.
|
|
//!
|
|
//! # Lock Order Safety
|
|
//!
|
|
//! This module uses a single `RwLock<InternalState>` to avoid potential deadlocks.
|
|
//! Previously, multiple locks (`triggers` and `states`) could cause deadlocks when
|
|
//! acquired in different orders across methods.
|
|
//!
|
|
//! The unified state structure ensures atomic access to all trigger-related data.
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
use chrono::{DateTime, Utc};
|
|
use serde::{Deserialize, Serialize};
|
|
use zclaw_types::Result;
|
|
use zclaw_hands::{TriggerConfig, TriggerType, TriggerState, TriggerResult, HandRegistry};
|
|
|
|
/// Internal state container for all trigger-related data.
|
|
///
|
|
/// Using a single structure behind one RwLock eliminates the possibility of
|
|
/// deadlocks caused by inconsistent lock acquisition orders.
|
|
#[derive(Debug)]
|
|
struct InternalState {
|
|
/// Registered triggers
|
|
triggers: HashMap<String, TriggerEntry>,
|
|
/// Execution states
|
|
states: HashMap<String, TriggerState>,
|
|
}
|
|
|
|
impl InternalState {
|
|
fn new() -> Self {
|
|
Self {
|
|
triggers: HashMap::new(),
|
|
states: HashMap::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Trigger manager for coordinating automated triggers
|
|
pub struct TriggerManager {
|
|
/// Unified internal state behind a single RwLock.
|
|
///
|
|
/// This prevents deadlocks by ensuring all trigger data is accessed
|
|
/// through a single lock acquisition point.
|
|
state: RwLock<InternalState>,
|
|
/// Hand registry
|
|
hand_registry: Arc<HandRegistry>,
|
|
/// Configuration
|
|
config: TriggerManagerConfig,
|
|
}
|
|
|
|
/// Trigger entry with additional metadata
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct TriggerEntry {
|
|
/// Core trigger configuration
|
|
#[serde(flatten)]
|
|
pub config: TriggerConfig,
|
|
/// Creation timestamp
|
|
pub created_at: DateTime<Utc>,
|
|
/// Last modification timestamp
|
|
pub modified_at: DateTime<Utc>,
|
|
/// Optional description
|
|
pub description: Option<String>,
|
|
/// Optional tags
|
|
#[serde(default)]
|
|
pub tags: Vec<String>,
|
|
}
|
|
|
|
/// Default max executions per hour
|
|
fn default_max_executions_per_hour() -> u32 { 10 }
|
|
/// Default persist value
|
|
fn default_persist() -> bool { true }
|
|
|
|
/// Trigger manager configuration
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct TriggerManagerConfig {
|
|
/// Maximum executions per hour (default)
|
|
#[serde(default = "default_max_executions_per_hour")]
|
|
pub max_executions_per_hour: u32,
|
|
/// Enable persistent storage
|
|
#[serde(default = "default_persist")]
|
|
pub persist: bool,
|
|
/// Storage path for trigger data
|
|
pub storage_path: Option<String>,
|
|
}
|
|
|
|
impl Default for TriggerManagerConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
max_executions_per_hour: 10,
|
|
persist: true,
|
|
storage_path: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TriggerManager {
|
|
/// Create new trigger manager
|
|
pub fn new(hand_registry: Arc<HandRegistry>) -> Self {
|
|
Self {
|
|
state: RwLock::new(InternalState::new()),
|
|
hand_registry,
|
|
config: TriggerManagerConfig::default(),
|
|
}
|
|
}
|
|
|
|
/// Create with custom configuration
|
|
pub fn with_config(
|
|
hand_registry: Arc<HandRegistry>,
|
|
config: TriggerManagerConfig,
|
|
) -> Self {
|
|
Self {
|
|
state: RwLock::new(InternalState::new()),
|
|
hand_registry,
|
|
config,
|
|
}
|
|
}
|
|
|
|
/// List all triggers
|
|
pub async fn list_triggers(&self) -> Vec<TriggerEntry> {
|
|
let state = self.state.read().await;
|
|
state.triggers.values().cloned().collect()
|
|
}
|
|
|
|
/// Get a specific trigger
|
|
pub async fn get_trigger(&self, id: &str) -> Option<TriggerEntry> {
|
|
let state = self.state.read().await;
|
|
state.triggers.get(id).cloned()
|
|
}
|
|
|
|
/// Create a new trigger
|
|
pub async fn create_trigger(&self, config: TriggerConfig) -> Result<TriggerEntry> {
|
|
// Validate hand exists (outside of our lock to avoid holding two locks)
|
|
if self.hand_registry.get(&config.hand_id).await.is_none() {
|
|
return Err(zclaw_types::ZclawError::InvalidInput(
|
|
format!("Hand '{}' not found", config.hand_id)
|
|
));
|
|
}
|
|
|
|
let id = config.id.clone();
|
|
let now = Utc::now();
|
|
|
|
let entry = TriggerEntry {
|
|
config,
|
|
created_at: now,
|
|
modified_at: now,
|
|
description: None,
|
|
tags: Vec::new(),
|
|
};
|
|
|
|
// Initialize state and insert trigger atomically under single lock
|
|
let state = TriggerState::new(&id);
|
|
{
|
|
let mut internal = self.state.write().await;
|
|
internal.states.insert(id.clone(), state);
|
|
internal.triggers.insert(id.clone(), entry.clone());
|
|
}
|
|
|
|
Ok(entry)
|
|
}
|
|
|
|
/// Update an existing trigger
|
|
pub async fn update_trigger(
|
|
&self,
|
|
id: &str,
|
|
updates: TriggerUpdateRequest,
|
|
) -> Result<TriggerEntry> {
|
|
// Validate hand exists if being updated (outside of our lock)
|
|
if let Some(hand_id) = &updates.hand_id {
|
|
if self.hand_registry.get(hand_id).await.is_none() {
|
|
return Err(zclaw_types::ZclawError::InvalidInput(
|
|
format!("Hand '{}' not found", hand_id)
|
|
));
|
|
}
|
|
}
|
|
|
|
let mut internal = self.state.write().await;
|
|
|
|
let Some(entry) = internal.triggers.get_mut(id) else {
|
|
return Err(zclaw_types::ZclawError::NotFound(
|
|
format!("Trigger '{}' not found", id)
|
|
));
|
|
};
|
|
|
|
// Apply updates
|
|
if let Some(name) = &updates.name {
|
|
entry.config.name = name.clone();
|
|
}
|
|
if let Some(enabled) = updates.enabled {
|
|
entry.config.enabled = enabled;
|
|
}
|
|
if let Some(hand_id) = &updates.hand_id {
|
|
entry.config.hand_id = hand_id.clone();
|
|
}
|
|
if let Some(trigger_type) = &updates.trigger_type {
|
|
entry.config.trigger_type = trigger_type.clone();
|
|
}
|
|
|
|
entry.modified_at = Utc::now();
|
|
|
|
Ok(entry.clone())
|
|
}
|
|
|
|
/// Delete a trigger
|
|
pub async fn delete_trigger(&self, id: &str) -> Result<()> {
|
|
let mut internal = self.state.write().await;
|
|
if internal.triggers.remove(id).is_none() {
|
|
return Err(zclaw_types::ZclawError::NotFound(
|
|
format!("Trigger '{}' not found", id)
|
|
));
|
|
}
|
|
// Also remove associated state atomically
|
|
internal.states.remove(id);
|
|
Ok(())
|
|
}
|
|
|
|
/// Get trigger state
|
|
pub async fn get_state(&self, id: &str) -> Option<TriggerState> {
|
|
let state = self.state.read().await;
|
|
state.states.get(id).cloned()
|
|
}
|
|
|
|
/// Check if trigger should fire based on type and input.
|
|
///
|
|
/// This method performs rate limiting and condition checks using a single
|
|
/// read lock to avoid deadlocks.
|
|
pub async fn should_fire(&self, id: &str, input: &serde_json::Value) -> bool {
|
|
let internal = self.state.read().await;
|
|
let Some(entry) = internal.triggers.get(id) else {
|
|
return false;
|
|
};
|
|
|
|
// Check if enabled
|
|
if !entry.config.enabled {
|
|
return false;
|
|
}
|
|
|
|
// Check rate limiting using the same lock
|
|
if let Some(state) = internal.states.get(id) {
|
|
// Check execution count this hour
|
|
let one_hour_ago = Utc::now() - chrono::Duration::hours(1);
|
|
if let Some(last_exec) = state.last_execution {
|
|
if last_exec > one_hour_ago {
|
|
if state.execution_count >= self.config.max_executions_per_hour {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check trigger-specific conditions
|
|
match &entry.config.trigger_type {
|
|
TriggerType::Manual => false,
|
|
TriggerType::Schedule { cron: _ } => {
|
|
// For schedule triggers, use cron parser
|
|
// Simplified check - real implementation would use cron library
|
|
true
|
|
}
|
|
TriggerType::Event { pattern } => {
|
|
// Check if input matches pattern
|
|
input.to_string().contains(pattern)
|
|
}
|
|
TriggerType::Webhook { path: _, secret: _ } => {
|
|
// Webhook triggers are fired externally
|
|
false
|
|
}
|
|
TriggerType::MessagePattern { pattern } => {
|
|
// Check if message matches pattern
|
|
input.to_string().contains(pattern)
|
|
}
|
|
TriggerType::FileSystem { path: _, events: _ } => {
|
|
// File system triggers are fired by file watcher
|
|
false
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Execute a trigger.
|
|
///
|
|
/// This method carefully manages lock scope to avoid deadlocks:
|
|
/// 1. Acquires read lock to check trigger exists and get config
|
|
/// 2. Releases lock before calling external hand registry
|
|
/// 3. Acquires write lock to update state
|
|
pub async fn execute_trigger(&self, id: &str, input: serde_json::Value) -> Result<TriggerResult> {
|
|
// Check if should fire (uses its own lock scope)
|
|
if !self.should_fire(id, &input).await {
|
|
return Err(zclaw_types::ZclawError::InvalidInput(
|
|
format!("Trigger '{}' should not fire", id)
|
|
));
|
|
}
|
|
|
|
// Get hand_id (release lock before calling hand registry)
|
|
let hand_id = {
|
|
let internal = self.state.read().await;
|
|
let entry = internal.triggers.get(id)
|
|
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
|
|
format!("Trigger '{}' not found", id)
|
|
))?;
|
|
entry.config.hand_id.clone()
|
|
};
|
|
|
|
// Get hand (outside of our lock to avoid potential deadlock with hand_registry)
|
|
let hand = self.hand_registry.get(&hand_id).await
|
|
.ok_or_else(|| zclaw_types::ZclawError::InvalidInput(
|
|
format!("Hand '{}' not found", hand_id)
|
|
))?;
|
|
|
|
// Update state before execution
|
|
{
|
|
let mut internal = self.state.write().await;
|
|
let state = internal.states.entry(id.to_string()).or_insert_with(|| TriggerState::new(id));
|
|
state.execution_count += 1;
|
|
}
|
|
|
|
// Execute hand (outside of lock to avoid blocking other operations)
|
|
let context = zclaw_hands::HandContext {
|
|
agent_id: zclaw_types::AgentId::new(),
|
|
working_dir: None,
|
|
env: std::collections::HashMap::new(),
|
|
timeout_secs: 300,
|
|
callback_url: None,
|
|
};
|
|
|
|
let hand_result = hand.execute(&context, input.clone()).await;
|
|
|
|
// Build trigger result from hand result
|
|
let trigger_result = match &hand_result {
|
|
Ok(res) => TriggerResult {
|
|
timestamp: Utc::now(),
|
|
success: res.success,
|
|
output: Some(res.output.clone()),
|
|
error: res.error.clone(),
|
|
trigger_input: input.clone(),
|
|
},
|
|
Err(e) => TriggerResult {
|
|
timestamp: Utc::now(),
|
|
success: false,
|
|
output: None,
|
|
error: Some(e.to_string()),
|
|
trigger_input: input.clone(),
|
|
},
|
|
};
|
|
|
|
// Update state after execution
|
|
{
|
|
let mut internal = self.state.write().await;
|
|
if let Some(state) = internal.states.get_mut(id) {
|
|
state.last_execution = Some(Utc::now());
|
|
state.last_result = Some(trigger_result.clone());
|
|
}
|
|
}
|
|
|
|
// Return the original hand result or convert to trigger result
|
|
hand_result.map(|_| trigger_result)
|
|
}
|
|
}
|
|
|
|
/// Request for updating a trigger
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct TriggerUpdateRequest {
|
|
/// New name
|
|
pub name: Option<String>,
|
|
/// Enable/disable
|
|
pub enabled: Option<bool>,
|
|
/// New hand ID
|
|
pub hand_id: Option<String>,
|
|
/// New trigger type
|
|
pub trigger_type: Option<TriggerType>,
|
|
}
|