Files
zclaw_openfang/crates/zclaw-kernel/src/scheduler.rs
iven bd12bdb62b
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(chat): 定时功能审计修复 — 消除重复解析 + ID碰撞 + 输入补全
审计发现修复:
- H-01: 存储 ParsedSchedule 避免重复 parse_nl_schedule 调用
- H-03: trigger ID 追加 UUID 片段防止高并发碰撞
- C-02: execute_trigger 验证错误信息明确系统 Hand 必须注册
- M-02: SchedulerService 传递 trigger_name 作为 task_description
- M-01: 添加拦截路径跳过 post_hook 的设计注释
2026-04-15 10:02:49 +08:00

352 lines
12 KiB
Rust

//! Scheduler service for automatic trigger execution
//!
//! Periodically scans scheduled triggers and fires them at the appropriate time.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use chrono::{Datelike, Timelike};
use tokio::sync::Mutex;
use tokio::time::{self, Duration};
use zclaw_types::Result;
use crate::Kernel;
/// Scheduler service that runs in the background and executes scheduled triggers
pub struct SchedulerService {
kernel: Arc<Mutex<Option<Kernel>>>,
running: Arc<AtomicBool>,
check_interval: Duration,
}
impl SchedulerService {
/// Create a new scheduler service
pub fn new(kernel: Arc<Mutex<Option<Kernel>>>, check_interval_secs: u64) -> Self {
Self {
kernel,
running: Arc::new(AtomicBool::new(false)),
check_interval: Duration::from_secs(check_interval_secs),
}
}
/// Start the scheduler loop in the background
pub fn start(&self) {
if self.running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
tracing::warn!("[Scheduler] Already running, ignoring start request");
return;
}
let kernel = self.kernel.clone();
let running = self.running.clone();
let interval = self.check_interval;
tokio::spawn(async move {
tracing::info!("[Scheduler] Starting scheduler loop with {}s interval", interval.as_secs());
let mut ticker = time::interval(interval);
// First tick fires immediately — skip it
ticker.tick().await;
while running.load(Ordering::Relaxed) {
ticker.tick().await;
if !running.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::check_and_fire_scheduled_triggers(&kernel).await {
tracing::error!("[Scheduler] Error checking triggers: {}", e);
}
}
tracing::info!("[Scheduler] Scheduler loop stopped");
});
}
/// Stop the scheduler loop
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
tracing::info!("[Scheduler] Stop requested");
}
/// Check if the scheduler is running
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
/// Check all scheduled triggers and fire those that are due
async fn check_and_fire_scheduled_triggers(
kernel_lock: &Arc<Mutex<Option<Kernel>>>,
) -> Result<()> {
// Collect due triggers under lock
let to_execute: Vec<(String, String, String, String)> = {
let kernel_guard = kernel_lock.lock().await;
let kernel = match kernel_guard.as_ref() {
Some(k) => k,
None => return Ok(()),
};
let triggers = kernel.list_triggers().await;
let now = chrono::Utc::now();
let scheduled: Vec<_> = triggers.iter()
.filter(|t| {
t.config.enabled && matches!(t.config.trigger_type, zclaw_hands::TriggerType::Schedule { .. })
})
.collect();
if scheduled.is_empty() {
return Ok(());
}
tracing::debug!("[Scheduler] Checking {} scheduled triggers", scheduled.len());
scheduled.iter()
.filter_map(|t| {
if let zclaw_hands::TriggerType::Schedule { ref cron } = t.config.trigger_type {
if Self::should_fire_cron(cron, &now) {
// (trigger_id, hand_id, cron_expr, trigger_name)
Some((t.config.id.clone(), t.config.hand_id.clone(), cron.clone(), t.config.name.clone()))
} else {
None
}
} else {
None
}
})
.collect()
}; // Lock dropped here
// Execute due triggers (acquire lock per execution)
// DESIGN NOTE: Triggers execute sequentially within each check cycle. This is intentional:
// 1. Prevents concurrent hand runs from competing for shared resources
// 2. Maintains deterministic ordering for trigger execution
// 3. A long-running hand will delay subsequent triggers in the same cycle
// but will NOT skip them — they are processed on the next tick.
// If parallel execution is needed, spawn each execute_hand in a separate task
// and collect results via JoinSet.
let now = chrono::Utc::now();
for (trigger_id, hand_id, cron_expr, trigger_name) in to_execute {
tracing::info!(
"[Scheduler] Firing scheduled trigger '{}' → hand '{}' (cron: {})",
trigger_id, hand_id, cron_expr
);
let kernel_guard = kernel_lock.lock().await;
if let Some(kernel) = kernel_guard.as_ref() {
let trigger_source = zclaw_types::TriggerSource::Scheduled {
trigger_id: trigger_id.clone(),
};
let input = serde_json::json!({
"trigger_id": trigger_id,
"trigger_type": "schedule",
"task_description": trigger_name,
"cron": cron_expr,
"fired_at": now.to_rfc3339(),
});
match kernel.execute_hand_with_source(&hand_id, input, trigger_source).await {
Ok((_result, run_id)) => {
tracing::info!(
"[Scheduler] Successfully fired trigger '{}' → run {}",
trigger_id, run_id
);
}
Err(e) => {
tracing::error!(
"[Scheduler] Failed to execute trigger '{}': {}",
trigger_id, e
);
}
}
}
}
Ok(())
}
/// Simple cron expression matcher
///
/// Supports basic cron format: `minute hour day month weekday`
/// Also supports interval shorthand: `every:Ns`, `every:Nm`, `every:Nh`
fn should_fire_cron(cron: &str, now: &chrono::DateTime<chrono::Utc>) -> bool {
let cron = cron.trim();
// Handle interval shorthand: "every:30s", "every:5m", "every:1h"
if let Some(interval_str) = cron.strip_prefix("every:") {
return Self::check_interval_shorthand(interval_str, now);
}
// Handle ISO timestamp for one-shot: "2026-03-29T10:00:00Z"
if cron.contains('T') && cron.contains('-') {
if let Ok(target) = chrono::DateTime::parse_from_rfc3339(cron) {
let target_utc = target.with_timezone(&chrono::Utc);
// Fire if within the check window (± check_interval/2, approx 30s)
let diff = (*now - target_utc).num_seconds().abs();
return diff <= 30;
}
}
// Standard 5-field cron: minute hour day_of_month month day_of_week
let parts: Vec<&str> = cron.split_whitespace().collect();
if parts.len() != 5 {
tracing::warn!("[Scheduler] Invalid cron expression (expected 5 fields): '{}'", cron);
return false;
}
let minute = now.minute() as i32;
let hour = now.hour() as i32;
let day = now.day() as i32;
let month = now.month() as i32;
let weekday = now.weekday().num_days_from_monday() as i32; // Mon=0..Sun=6
Self::cron_field_matches(parts[0], minute)
&& Self::cron_field_matches(parts[1], hour)
&& Self::cron_field_matches(parts[2], day)
&& Self::cron_field_matches(parts[3], month)
&& Self::cron_field_matches(parts[4], weekday)
}
/// Check if a single cron field matches the current value
fn cron_field_matches(field: &str, value: i32) -> bool {
if field == "*" || field == "?" {
return true;
}
// Handle step: */N
if let Some(step_str) = field.strip_prefix("*/") {
if let Ok(step) = step_str.parse::<i32>() {
if step > 0 {
return value % step == 0;
}
}
return false;
}
// Handle range: N-M
if field.contains('-') {
let range_parts: Vec<&str> = field.split('-').collect();
if range_parts.len() == 2 {
if let (Ok(start), Ok(end)) = (range_parts[0].parse::<i32>(), range_parts[1].parse::<i32>()) {
return value >= start && value <= end;
}
}
return false;
}
// Handle list: N,M,O
if field.contains(',') {
return field.split(',').any(|part| {
part.trim().parse::<i32>().map(|p| p == value).unwrap_or(false)
});
}
// Simple value
field.parse::<i32>().map(|p| p == value).unwrap_or(false)
}
/// Check interval shorthand expressions
fn check_interval_shorthand(interval: &str, now: &chrono::DateTime<chrono::Utc>) -> bool {
let (num_str, unit) = if interval.ends_with('s') {
(&interval[..interval.len()-1], 's')
} else if interval.ends_with('m') {
(&interval[..interval.len()-1], 'm')
} else if interval.ends_with('h') {
(&interval[..interval.len()-1], 'h')
} else {
return false;
};
let num: i64 = match num_str.parse() {
Ok(n) => n,
Err(_) => return false,
};
if num <= 0 {
return false;
}
let interval_secs = match unit {
's' => num,
'm' => num * 60,
'h' => num * 3600,
_ => return false,
};
// Check if current timestamp is within the scheduler check window of an interval boundary.
// The scheduler checks every `check_interval` seconds (default 60s), so we use ±30s window.
let timestamp = now.timestamp();
let remainder = timestamp % interval_secs;
// Fire if we're within ±30 seconds of an interval boundary
remainder <= 30 || remainder >= (interval_secs - 30)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Timelike;
#[test]
fn test_cron_field_wildcard() {
assert!(SchedulerService::cron_field_matches("*", 5));
assert!(SchedulerService::cron_field_matches("?", 5));
}
#[test]
fn test_cron_field_exact() {
assert!(SchedulerService::cron_field_matches("5", 5));
assert!(!SchedulerService::cron_field_matches("5", 6));
}
#[test]
fn test_cron_field_step() {
assert!(SchedulerService::cron_field_matches("*/5", 0));
assert!(SchedulerService::cron_field_matches("*/5", 5));
assert!(SchedulerService::cron_field_matches("*/5", 10));
assert!(!SchedulerService::cron_field_matches("*/5", 3));
}
#[test]
fn test_cron_field_range() {
assert!(SchedulerService::cron_field_matches("1-5", 1));
assert!(SchedulerService::cron_field_matches("1-5", 3));
assert!(SchedulerService::cron_field_matches("1-5", 5));
assert!(!SchedulerService::cron_field_matches("1-5", 0));
assert!(!SchedulerService::cron_field_matches("1-5", 6));
}
#[test]
fn test_cron_field_list() {
assert!(SchedulerService::cron_field_matches("1,3,5", 1));
assert!(SchedulerService::cron_field_matches("1,3,5", 3));
assert!(SchedulerService::cron_field_matches("1,3,5", 5));
assert!(!SchedulerService::cron_field_matches("1,3,5", 2));
}
#[test]
fn test_should_fire_every_minute() {
let now = chrono::Utc::now();
assert!(SchedulerService::should_fire_cron("every:1m", &now));
}
#[test]
fn test_should_fire_cron_wildcard() {
let now = chrono::Utc::now();
// Every minute match
assert!(SchedulerService::should_fire_cron(
&format!("{} * * * *", now.minute()),
&now,
));
}
#[test]
fn test_should_not_fire_cron() {
let now = chrono::Utc::now();
let wrong_minute = if now.minute() < 59 { now.minute() + 1 } else { 0 };
assert!(!SchedulerService::should_fire_cron(
&format!("{} * * * *", wrong_minute),
&now,
));
}
}