//! Scheduler service for automatic trigger execution //! //! Periodically scans scheduled triggers and fires them at the appropriate time. use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use chrono::{Datelike, Timelike}; use tokio::sync::Mutex; use tokio::time::{self, Duration}; use zclaw_types::Result; use crate::Kernel; /// Scheduler service that runs in the background and executes scheduled triggers pub struct SchedulerService { kernel: Arc>>, running: Arc, check_interval: Duration, } impl SchedulerService { /// Create a new scheduler service pub fn new(kernel: Arc>>, check_interval_secs: u64) -> Self { Self { kernel, running: Arc::new(AtomicBool::new(false)), check_interval: Duration::from_secs(check_interval_secs), } } /// Start the scheduler loop in the background pub fn start(&self) { if self.running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() { tracing::warn!("[Scheduler] Already running, ignoring start request"); return; } let kernel = self.kernel.clone(); let running = self.running.clone(); let interval = self.check_interval; tokio::spawn(async move { tracing::info!("[Scheduler] Starting scheduler loop with {}s interval", interval.as_secs()); let mut ticker = time::interval(interval); // First tick fires immediately — skip it ticker.tick().await; while running.load(Ordering::Relaxed) { ticker.tick().await; if !running.load(Ordering::Relaxed) { break; } if let Err(e) = Self::check_and_fire_scheduled_triggers(&kernel).await { tracing::error!("[Scheduler] Error checking triggers: {}", e); } } tracing::info!("[Scheduler] Scheduler loop stopped"); }); } /// Stop the scheduler loop pub fn stop(&self) { self.running.store(false, Ordering::Relaxed); tracing::info!("[Scheduler] Stop requested"); } /// Check if the scheduler is running pub fn is_running(&self) -> bool { self.running.load(Ordering::Relaxed) } /// Check all scheduled triggers and fire those that are due async fn check_and_fire_scheduled_triggers( kernel_lock: &Arc>>, ) -> Result<()> { // Collect due triggers under lock let to_execute: Vec<(String, String, String)> = { let kernel_guard = kernel_lock.lock().await; let kernel = match kernel_guard.as_ref() { Some(k) => k, None => return Ok(()), }; let triggers = kernel.list_triggers().await; let now = chrono::Utc::now(); let scheduled: Vec<_> = triggers.iter() .filter(|t| { t.config.enabled && matches!(t.config.trigger_type, zclaw_hands::TriggerType::Schedule { .. }) }) .collect(); if scheduled.is_empty() { return Ok(()); } tracing::debug!("[Scheduler] Checking {} scheduled triggers", scheduled.len()); scheduled.iter() .filter_map(|t| { if let zclaw_hands::TriggerType::Schedule { ref cron } = t.config.trigger_type { if Self::should_fire_cron(cron, &now) { Some((t.config.id.clone(), t.config.hand_id.clone(), cron.clone())) } else { None } } else { None } }) .collect() }; // Lock dropped here // Execute due triggers (acquire lock per execution) // DESIGN NOTE: Triggers execute sequentially within each check cycle. This is intentional: // 1. Prevents concurrent hand runs from competing for shared resources // 2. Maintains deterministic ordering for trigger execution // 3. A long-running hand will delay subsequent triggers in the same cycle // but will NOT skip them — they are processed on the next tick. // If parallel execution is needed, spawn each execute_hand in a separate task // and collect results via JoinSet. let now = chrono::Utc::now(); for (trigger_id, hand_id, cron_expr) in to_execute { tracing::info!( "[Scheduler] Firing scheduled trigger '{}' → hand '{}' (cron: {})", trigger_id, hand_id, cron_expr ); let kernel_guard = kernel_lock.lock().await; if let Some(kernel) = kernel_guard.as_ref() { let trigger_source = zclaw_types::TriggerSource::Scheduled { trigger_id: trigger_id.clone(), }; let input = serde_json::json!({ "trigger_id": trigger_id, "trigger_type": "schedule", "cron": cron_expr, "fired_at": now.to_rfc3339(), }); match kernel.execute_hand_with_source(&hand_id, input, trigger_source).await { Ok((_result, run_id)) => { tracing::info!( "[Scheduler] Successfully fired trigger '{}' → run {}", trigger_id, run_id ); } Err(e) => { tracing::error!( "[Scheduler] Failed to execute trigger '{}': {}", trigger_id, e ); } } } } Ok(()) } /// Simple cron expression matcher /// /// Supports basic cron format: `minute hour day month weekday` /// Also supports interval shorthand: `every:Ns`, `every:Nm`, `every:Nh` fn should_fire_cron(cron: &str, now: &chrono::DateTime) -> bool { let cron = cron.trim(); // Handle interval shorthand: "every:30s", "every:5m", "every:1h" if let Some(interval_str) = cron.strip_prefix("every:") { return Self::check_interval_shorthand(interval_str, now); } // Handle ISO timestamp for one-shot: "2026-03-29T10:00:00Z" if cron.contains('T') && cron.contains('-') { if let Ok(target) = chrono::DateTime::parse_from_rfc3339(cron) { let target_utc = target.with_timezone(&chrono::Utc); // Fire if within the check window (± check_interval/2, approx 30s) let diff = (*now - target_utc).num_seconds().abs(); return diff <= 30; } } // Standard 5-field cron: minute hour day_of_month month day_of_week let parts: Vec<&str> = cron.split_whitespace().collect(); if parts.len() != 5 { tracing::warn!("[Scheduler] Invalid cron expression (expected 5 fields): '{}'", cron); return false; } let minute = now.minute() as i32; let hour = now.hour() as i32; let day = now.day() as i32; let month = now.month() as i32; let weekday = now.weekday().num_days_from_monday() as i32; // Mon=0..Sun=6 Self::cron_field_matches(parts[0], minute) && Self::cron_field_matches(parts[1], hour) && Self::cron_field_matches(parts[2], day) && Self::cron_field_matches(parts[3], month) && Self::cron_field_matches(parts[4], weekday) } /// Check if a single cron field matches the current value fn cron_field_matches(field: &str, value: i32) -> bool { if field == "*" || field == "?" { return true; } // Handle step: */N if let Some(step_str) = field.strip_prefix("*/") { if let Ok(step) = step_str.parse::() { if step > 0 { return value % step == 0; } } return false; } // Handle range: N-M if field.contains('-') { let range_parts: Vec<&str> = field.split('-').collect(); if range_parts.len() == 2 { if let (Ok(start), Ok(end)) = (range_parts[0].parse::(), range_parts[1].parse::()) { return value >= start && value <= end; } } return false; } // Handle list: N,M,O if field.contains(',') { return field.split(',').any(|part| { part.trim().parse::().map(|p| p == value).unwrap_or(false) }); } // Simple value field.parse::().map(|p| p == value).unwrap_or(false) } /// Check interval shorthand expressions fn check_interval_shorthand(interval: &str, now: &chrono::DateTime) -> bool { let (num_str, unit) = if interval.ends_with('s') { (&interval[..interval.len()-1], 's') } else if interval.ends_with('m') { (&interval[..interval.len()-1], 'm') } else if interval.ends_with('h') { (&interval[..interval.len()-1], 'h') } else { return false; }; let num: i64 = match num_str.parse() { Ok(n) => n, Err(_) => return false, }; if num <= 0 { return false; } let interval_secs = match unit { 's' => num, 'm' => num * 60, 'h' => num * 3600, _ => return false, }; // Check if current timestamp is within the scheduler check window of an interval boundary. // The scheduler checks every `check_interval` seconds (default 60s), so we use ±30s window. let timestamp = now.timestamp(); let remainder = timestamp % interval_secs; // Fire if we're within ±30 seconds of an interval boundary remainder <= 30 || remainder >= (interval_secs - 30) } } #[cfg(test)] mod tests { use super::*; use chrono::Timelike; #[test] fn test_cron_field_wildcard() { assert!(SchedulerService::cron_field_matches("*", 5)); assert!(SchedulerService::cron_field_matches("?", 5)); } #[test] fn test_cron_field_exact() { assert!(SchedulerService::cron_field_matches("5", 5)); assert!(!SchedulerService::cron_field_matches("5", 6)); } #[test] fn test_cron_field_step() { assert!(SchedulerService::cron_field_matches("*/5", 0)); assert!(SchedulerService::cron_field_matches("*/5", 5)); assert!(SchedulerService::cron_field_matches("*/5", 10)); assert!(!SchedulerService::cron_field_matches("*/5", 3)); } #[test] fn test_cron_field_range() { assert!(SchedulerService::cron_field_matches("1-5", 1)); assert!(SchedulerService::cron_field_matches("1-5", 3)); assert!(SchedulerService::cron_field_matches("1-5", 5)); assert!(!SchedulerService::cron_field_matches("1-5", 0)); assert!(!SchedulerService::cron_field_matches("1-5", 6)); } #[test] fn test_cron_field_list() { assert!(SchedulerService::cron_field_matches("1,3,5", 1)); assert!(SchedulerService::cron_field_matches("1,3,5", 3)); assert!(SchedulerService::cron_field_matches("1,3,5", 5)); assert!(!SchedulerService::cron_field_matches("1,3,5", 2)); } #[test] fn test_should_fire_every_minute() { let now = chrono::Utc::now(); assert!(SchedulerService::should_fire_cron("every:1m", &now)); } #[test] fn test_should_fire_cron_wildcard() { let now = chrono::Utc::now(); // Every minute match assert!(SchedulerService::should_fire_cron( &format!("{} * * * *", now.minute()), &now, )); } #[test] fn test_should_not_fire_cron() { let now = chrono::Utc::now(); let wrong_minute = if now.minute() < 59 { now.minute() + 1 } else { 0 }; assert!(!SchedulerService::should_fire_cron( &format!("{} * * * *", wrong_minute), &now, )); } }