refactor: 清理未使用代码并添加未来功能标记
Some checks failed
CI / Rust Check (push) Has been cancelled
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Rust Check (push) Has been cancelled
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
style: 统一代码格式和注释风格 docs: 更新多个功能文档的完整度和状态 feat(runtime): 添加路径验证工具支持 fix(pipeline): 改进条件判断和变量解析逻辑 test(types): 为ID类型添加全面测试用例 chore: 更新依赖项和Cargo.lock文件 perf(mcp): 优化MCP协议传输和错误处理
This commit is contained in:
372
crates/zclaw-kernel/src/trigger_manager.rs
Normal file
372
crates/zclaw-kernel/src/trigger_manager.rs
Normal file
@@ -0,0 +1,372 @@
|
||||
//! Trigger Manager
|
||||
//!
|
||||
//! Manages triggers for automated task execution.
|
||||
//!
|
||||
//! # Lock Order Safety
|
||||
//!
|
||||
//! This module uses a single `RwLock<InternalState>` to avoid potential deadlocks.
|
||||
//! Previously, multiple locks (`triggers` and `states`) could cause deadlocks when
|
||||
//! acquired in different orders across methods.
|
||||
//!
|
||||
//! The unified state structure ensures atomic access to all trigger-related data.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zclaw_types::Result;
|
||||
use zclaw_hands::{TriggerConfig, TriggerType, TriggerState, TriggerResult, HandRegistry};
|
||||
|
||||
/// Internal state container for all trigger-related data.
|
||||
///
|
||||
/// Using a single structure behind one RwLock eliminates the possibility of
|
||||
/// deadlocks caused by inconsistent lock acquisition orders.
|
||||
#[derive(Debug)]
|
||||
struct InternalState {
|
||||
/// Registered triggers
|
||||
triggers: HashMap<String, TriggerEntry>,
|
||||
/// Execution states
|
||||
states: HashMap<String, TriggerState>,
|
||||
}
|
||||
|
||||
impl InternalState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
triggers: HashMap::new(),
|
||||
states: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trigger manager for coordinating automated triggers
|
||||
pub struct TriggerManager {
|
||||
/// Unified internal state behind a single RwLock.
|
||||
///
|
||||
/// This prevents deadlocks by ensuring all trigger data is accessed
|
||||
/// through a single lock acquisition point.
|
||||
state: RwLock<InternalState>,
|
||||
/// Hand registry
|
||||
hand_registry: Arc<HandRegistry>,
|
||||
/// Configuration
|
||||
config: TriggerManagerConfig,
|
||||
}
|
||||
|
||||
/// Trigger entry with additional metadata
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TriggerEntry {
|
||||
/// Core trigger configuration
|
||||
#[serde(flatten)]
|
||||
pub config: TriggerConfig,
|
||||
/// Creation timestamp
|
||||
pub created_at: DateTime<Utc>,
|
||||
/// Last modification timestamp
|
||||
pub modified_at: DateTime<Utc>,
|
||||
/// Optional description
|
||||
pub description: Option<String>,
|
||||
/// Optional tags
|
||||
#[serde(default)]
|
||||
pub tags: Vec<String>,
|
||||
}
|
||||
|
||||
/// Default max executions per hour
|
||||
fn default_max_executions_per_hour() -> u32 { 10 }
|
||||
/// Default persist value
|
||||
fn default_persist() -> bool { true }
|
||||
|
||||
/// Trigger manager configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TriggerManagerConfig {
|
||||
/// Maximum executions per hour (default)
|
||||
#[serde(default = "default_max_executions_per_hour")]
|
||||
pub max_executions_per_hour: u32,
|
||||
/// Enable persistent storage
|
||||
#[serde(default = "default_persist")]
|
||||
pub persist: bool,
|
||||
/// Storage path for trigger data
|
||||
pub storage_path: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for TriggerManagerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_executions_per_hour: 10,
|
||||
persist: true,
|
||||
storage_path: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TriggerManager {
|
||||
/// Create new trigger manager
|
||||
pub fn new(hand_registry: Arc<HandRegistry>) -> Self {
|
||||
Self {
|
||||
state: RwLock::new(InternalState::new()),
|
||||
hand_registry,
|
||||
config: TriggerManagerConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create with custom configuration
|
||||
pub fn with_config(
|
||||
hand_registry: Arc<HandRegistry>,
|
||||
config: TriggerManagerConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
state: RwLock::new(InternalState::new()),
|
||||
hand_registry,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
/// List all triggers
|
||||
pub async fn list_triggers(&self) -> Vec<TriggerEntry> {
|
||||
let state = self.state.read().await;
|
||||
state.triggers.values().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get a specific trigger
|
||||
pub async fn get_trigger(&self, id: &str) -> Option<TriggerEntry> {
|
||||
let state = self.state.read().await;
|
||||
state.triggers.get(id).cloned()
|
||||
}
|
||||
|
||||
/// Create a new trigger
|
||||
pub async fn create_trigger(&self, config: TriggerConfig) -> Result<TriggerEntry> {
|
||||
// Validate hand exists (outside of our lock to avoid holding two locks)
|
||||
if self.hand_registry.get(&config.hand_id).await.is_none() {
|
||||
return Err(zclaw_types::ZclawError::InvalidInput(
|
||||
format!("Hand '{}' not found", config.hand_id)
|
||||
));
|
||||
}
|
||||
|
||||
let id = config.id.clone();
|
||||
let now = Utc::now();
|
||||
|
||||
let entry = TriggerEntry {
|
||||
config,
|
||||
created_at: now,
|
||||
modified_at: now,
|
||||
description: None,
|
||||
tags: Vec::new(),
|
||||
};
|
||||
|
||||
// Initialize state and insert trigger atomically under single lock
|
||||
let state = TriggerState::new(&id);
|
||||
{
|
||||
let mut internal = self.state.write().await;
|
||||
internal.states.insert(id.clone(), state);
|
||||
internal.triggers.insert(id.clone(), entry.clone());
|
||||
}
|
||||
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
/// Update an existing trigger
|
||||
pub async fn update_trigger(
|
||||
&self,
|
||||
id: &str,
|
||||
updates: TriggerUpdateRequest,
|
||||
) -> Result<TriggerEntry> {
|
||||
// Validate hand exists if being updated (outside of our lock)
|
||||
if let Some(hand_id) = &updates.hand_id {
|
||||
if self.hand_registry.get(hand_id).await.is_none() {
|
||||
return Err(zclaw_types::ZclawError::InvalidInput(
|
||||
format!("Hand '{}' not found", hand_id)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let mut internal = self.state.write().await;
|
||||
|
||||
let Some(entry) = internal.triggers.get_mut(id) else {
|
||||
return Err(zclaw_types::ZclawError::NotFound(
|
||||
format!("Trigger '{}' not found", id)
|
||||
));
|
||||
};
|
||||
|
||||
// Apply updates
|
||||
if let Some(name) = &updates.name {
|
||||
entry.config.name = name.clone();
|
||||
}
|
||||
if let Some(enabled) = updates.enabled {
|
||||
entry.config.enabled = enabled;
|
||||
}
|
||||
if let Some(hand_id) = &updates.hand_id {
|
||||
entry.config.hand_id = hand_id.clone();
|
||||
}
|
||||
if let Some(trigger_type) = &updates.trigger_type {
|
||||
entry.config.trigger_type = trigger_type.clone();
|
||||
}
|
||||
|
||||
entry.modified_at = Utc::now();
|
||||
|
||||
Ok(entry.clone())
|
||||
}
|
||||
|
||||
/// Delete a trigger
|
||||
pub async fn delete_trigger(&self, id: &str) -> Result<()> {
|
||||
let mut internal = self.state.write().await;
|
||||
if internal.triggers.remove(id).is_none() {
|
||||
return Err(zclaw_types::ZclawError::NotFound(
|
||||
format!("Trigger '{}' not found", id)
|
||||
));
|
||||
}
|
||||
// Also remove associated state atomically
|
||||
internal.states.remove(id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get trigger state
|
||||
pub async fn get_state(&self, id: &str) -> Option<TriggerState> {
|
||||
let state = self.state.read().await;
|
||||
state.states.get(id).cloned()
|
||||
}
|
||||
|
||||
/// Check if trigger should fire based on type and input.
|
||||
///
|
||||
/// This method performs rate limiting and condition checks using a single
|
||||
/// read lock to avoid deadlocks.
|
||||
pub async fn should_fire(&self, id: &str, input: &serde_json::Value) -> bool {
|
||||
let internal = self.state.read().await;
|
||||
let Some(entry) = internal.triggers.get(id) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
// Check if enabled
|
||||
if !entry.config.enabled {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check rate limiting using the same lock
|
||||
if let Some(state) = internal.states.get(id) {
|
||||
// Check execution count this hour
|
||||
let one_hour_ago = Utc::now() - chrono::Duration::hours(1);
|
||||
if let Some(last_exec) = state.last_execution {
|
||||
if last_exec > one_hour_ago {
|
||||
if state.execution_count >= self.config.max_executions_per_hour {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check trigger-specific conditions
|
||||
match &entry.config.trigger_type {
|
||||
TriggerType::Manual => false,
|
||||
TriggerType::Schedule { cron: _ } => {
|
||||
// For schedule triggers, use cron parser
|
||||
// Simplified check - real implementation would use cron library
|
||||
true
|
||||
}
|
||||
TriggerType::Event { pattern } => {
|
||||
// Check if input matches pattern
|
||||
input.to_string().contains(pattern)
|
||||
}
|
||||
TriggerType::Webhook { path: _, secret: _ } => {
|
||||
// Webhook triggers are fired externally
|
||||
false
|
||||
}
|
||||
TriggerType::MessagePattern { pattern } => {
|
||||
// Check if message matches pattern
|
||||
input.to_string().contains(pattern)
|
||||
}
|
||||
TriggerType::FileSystem { path: _, events: _ } => {
|
||||
// File system triggers are fired by file watcher
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a trigger.
|
||||
///
|
||||
/// This method carefully manages lock scope to avoid deadlocks:
|
||||
/// 1. Acquires read lock to check trigger exists and get config
|
||||
/// 2. Releases lock before calling external hand registry
|
||||
/// 3. Acquires write lock to update state
|
||||
pub async fn execute_trigger(&self, id: &str, input: serde_json::Value) -> Result<TriggerResult> {
|
||||
// Check if should fire (uses its own lock scope)
|
||||
if !self.should_fire(id, &input).await {
|
||||
return Err(zclaw_types::ZclawError::InvalidInput(
|
||||
format!("Trigger '{}' should not fire", id)
|
||||
));
|
||||
}
|
||||
|
||||
// Get hand_id (release lock before calling hand registry)
|
||||
let hand_id = {
|
||||
let internal = self.state.read().await;
|
||||
let entry = internal.triggers.get(id)
|
||||
.ok_or_else(|| zclaw_types::ZclawError::NotFound(
|
||||
format!("Trigger '{}' not found", id)
|
||||
))?;
|
||||
entry.config.hand_id.clone()
|
||||
};
|
||||
|
||||
// Get hand (outside of our lock to avoid potential deadlock with hand_registry)
|
||||
let hand = self.hand_registry.get(&hand_id).await
|
||||
.ok_or_else(|| zclaw_types::ZclawError::InvalidInput(
|
||||
format!("Hand '{}' not found", hand_id)
|
||||
))?;
|
||||
|
||||
// Update state before execution
|
||||
{
|
||||
let mut internal = self.state.write().await;
|
||||
let state = internal.states.entry(id.to_string()).or_insert_with(|| TriggerState::new(id));
|
||||
state.execution_count += 1;
|
||||
}
|
||||
|
||||
// Execute hand (outside of lock to avoid blocking other operations)
|
||||
let context = zclaw_hands::HandContext {
|
||||
agent_id: zclaw_types::AgentId::new(),
|
||||
working_dir: None,
|
||||
env: std::collections::HashMap::new(),
|
||||
timeout_secs: 300,
|
||||
callback_url: None,
|
||||
};
|
||||
|
||||
let hand_result = hand.execute(&context, input.clone()).await;
|
||||
|
||||
// Build trigger result from hand result
|
||||
let trigger_result = match &hand_result {
|
||||
Ok(res) => TriggerResult {
|
||||
timestamp: Utc::now(),
|
||||
success: res.success,
|
||||
output: Some(res.output.clone()),
|
||||
error: res.error.clone(),
|
||||
trigger_input: input.clone(),
|
||||
},
|
||||
Err(e) => TriggerResult {
|
||||
timestamp: Utc::now(),
|
||||
success: false,
|
||||
output: None,
|
||||
error: Some(e.to_string()),
|
||||
trigger_input: input.clone(),
|
||||
},
|
||||
};
|
||||
|
||||
// Update state after execution
|
||||
{
|
||||
let mut internal = self.state.write().await;
|
||||
if let Some(state) = internal.states.get_mut(id) {
|
||||
state.last_execution = Some(Utc::now());
|
||||
state.last_result = Some(trigger_result.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Return the original hand result or convert to trigger result
|
||||
hand_result.map(|_| trigger_result)
|
||||
}
|
||||
}
|
||||
|
||||
/// Request for updating a trigger
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TriggerUpdateRequest {
|
||||
/// New name
|
||||
pub name: Option<String>,
|
||||
/// Enable/disable
|
||||
pub enabled: Option<bool>,
|
||||
/// New hand ID
|
||||
pub hand_id: Option<String>,
|
||||
/// New trigger type
|
||||
pub trigger_type: Option<TriggerType>,
|
||||
}
|
||||
Reference in New Issue
Block a user