//! 声明式 Scheduler — 借鉴 loco-rs 的定时任务模式 //! //! 通过 TOML 配置定时任务,无需改代码调整调度时间。 //! 配置格式在 config.rs 的 SchedulerConfig / JobConfig 中定义。 use std::time::{Duration, Instant}; use sqlx::PgPool; use crate::config::SchedulerConfig; use crate::workers::WorkerDispatcher; /// 单次任务执行的产出 struct TaskExecution { result: Option, error: Option, duration_ms: i64, } /// 解析时间间隔字符串为 Duration pub fn parse_duration(s: &str) -> Result { let s = s.trim().to_lowercase(); let (num_part, multiplier) = if s.ends_with('s') { (&s[..s.len()-1], 1u64) } else if s.ends_with('m') { (&s[..s.len()-1], 60u64) } else if s.ends_with('h') { (&s[..s.len()-1], 3600u64) } else if s.ends_with('d') { (&s[..s.len()-1], 86400u64) } else { return Err(format!("Invalid interval format: '{}'. Use '30s', '5m', '1h', '1d'", s)); }; let num: u64 = num_part.parse() .map_err(|_| format!("Invalid number in interval: '{}'", num_part))?; Ok(Duration::from_secs(num * multiplier)) } /// 启动所有定时任务 pub fn start_scheduler(config: &SchedulerConfig, _db: PgPool, dispatcher: WorkerDispatcher) { for job in &config.jobs { let interval = match parse_duration(&job.interval) { Ok(d) => d, Err(e) => { tracing::error!("Scheduler job '{}': {}", job.name, e); continue; } }; let job_name = job.name.clone(); let task_name = job.task.clone(); let args_json = job.args.clone(); let dispatcher = dispatcher.clone_ref(); let run_on_start = job.run_on_start; tracing::info!( "Scheduler: registering job '{}' ({} interval, task={})", job_name, job.interval, task_name ); tokio::spawn(async move { if run_on_start { tracing::info!("Scheduler: running '{}' on start", job_name); if let Err(e) = dispatcher.dispatch_raw(&task_name, args_json.clone()).await { tracing::error!("Scheduler job '{}' on-start failed: {}", job_name, e); } } let mut interval_timer = tokio::time::interval(interval); loop { interval_timer.tick().await; tracing::debug!("Scheduler: triggering job '{}'", job_name); if let Err(e) = dispatcher.dispatch_raw(&task_name, args_json.clone()).await { tracing::error!("Scheduler job '{}' failed: {}", job_name, e); } } }); } } /// 内置的 DB 清理任务(不通过 Worker,直接执行 SQL) pub fn start_db_cleanup_tasks(db: PgPool) { let db_devices = db.clone(); let db_key_pool = db.clone(); let db_relay = db.clone(); // 每 24 小时清理不活跃设备 tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(86400)); loop { interval.tick().await; match sqlx::query( "DELETE FROM devices WHERE last_seen_at::timestamptz < $1" ) .bind({ let cutoff = (chrono::Utc::now() - chrono::Duration::days(90)); cutoff }) .execute(&db_devices) .await { Ok(result) => { if result.rows_affected() > 0 { tracing::info!("Cleaned up {} inactive devices (90d)", result.rows_affected()); } } Err(e) => tracing::error!("Device cleanup failed: {}", e), } } }); // 每 24 小时清理过期的 key_usage_window 记录 tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(86400)); loop { interval.tick().await; match sqlx::query( "DELETE FROM key_usage_window WHERE window_minute < to_char(NOW() - INTERVAL '24 hours', 'YYYY-MM-DD\"T\"HH24:MI')" ) .execute(&db_key_pool) .await { Ok(result) => { if result.rows_affected() > 0 { tracing::info!("Cleaned up {} expired key_usage_window records (24h)", result.rows_affected()); } } Err(e) => tracing::error!("Key usage window cleanup failed: {}", e), } } }); // 每 5 分钟清理超时的 relay_tasks(status=processing 且 updated_at 超过 10 分钟) tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(300)); loop { interval.tick().await; match sqlx::query( "UPDATE relay_tasks SET status = 'failed', error_message = 'timeout: upstream not responding', completed_at = NOW() \ WHERE status = 'processing' AND updated_at < NOW() - INTERVAL '10 minutes'" ) .execute(&db_relay) .await { Ok(result) => { if result.rows_affected() > 0 { tracing::warn!("Cleaned up {} timed-out relay tasks (>10m processing)", result.rows_affected()); } } Err(e) => tracing::error!("Relay task timeout cleanup failed: {}", e), } } }); } /// 用户任务调度器 /// /// 每 30 秒轮询 scheduled_tasks 表,执行到期任务。 /// 支持 agent/hand/workflow 三种任务类型。 /// 当前版本执行状态管理和日志记录;未来将通过内部 API 触发实际执行。 pub fn start_user_task_scheduler(db: PgPool) { tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(30)); ticker.tick().await; // 跳过首次立即触发 loop { ticker.tick().await; if let Err(e) = tick_user_tasks(&db).await { tracing::error!("[UserScheduler] tick error: {}", e); } } }); } /// 执行单个调度任务,返回执行产出(结果/错误/耗时) async fn execute_scheduled_task( db: &PgPool, task_id: &str, target_type: &str, ) -> TaskExecution { let start = Instant::now(); let task_info: Option<(String, Option)> = match sqlx::query_as( "SELECT name, config_json FROM scheduled_tasks WHERE id = $1" ) .bind(task_id) .fetch_optional(db) .await { Ok(info) => info, Err(e) => { let elapsed = start.elapsed().as_millis() as i64; return TaskExecution { result: None, error: Some(format!("Failed to fetch task {}: {}", task_id, e)), duration_ms: elapsed, }; } }; let (task_name, _config_json) = match task_info { Some(info) => info, None => { let elapsed = start.elapsed().as_millis() as i64; return TaskExecution { result: None, error: Some(format!("Task {} not found", task_id)), duration_ms: elapsed, }; } }; tracing::info!( "[UserScheduler] Dispatching task '{}' (target_type={})", task_name, target_type ); let exec_result = match target_type { t if t == "agent" => { tracing::info!("[UserScheduler] Agent task '{}' queued for execution", task_name); Ok("agent_dispatched".to_string()) } t if t == "hand" => { tracing::info!("[UserScheduler] Hand task '{}' queued for execution", task_name); Ok("hand_dispatched".to_string()) } t if t == "workflow" => { tracing::info!("[UserScheduler] Workflow task '{}' queued for execution", task_name); Ok("workflow_dispatched".to_string()) } other => { tracing::warn!("[UserScheduler] Unknown target_type '{}' for task '{}'", other, task_name); Err(format!("Unknown target_type: {}", other)) } }; let elapsed = start.elapsed().as_millis() as i64; match exec_result { Ok(msg) => TaskExecution { result: Some(msg), error: None, duration_ms: elapsed, }, Err(err) => TaskExecution { result: None, error: Some(err), duration_ms: elapsed, }, } } async fn tick_user_tasks(db: &PgPool) -> Result<(), sqlx::Error> { // 查找到期任务(next_run_at 兼容 TEXT 和 TIMESTAMPTZ 两种列类型) let due_tasks: Vec<(String, String, String)> = sqlx::query_as( "SELECT id, schedule_type, target_type FROM scheduled_tasks WHERE enabled = TRUE AND next_run_at::TIMESTAMPTZ <= NOW()" ) .fetch_all(db) .await?; if due_tasks.is_empty() { return Ok(()); } tracing::debug!("[UserScheduler] {} tasks due", due_tasks.len()); for (task_id, schedule_type, target_type) in due_tasks { tracing::info!( "[UserScheduler] Executing task {} (type={}, schedule={})", task_id, target_type, schedule_type ); // 执行任务并收集产出 let exec = execute_scheduled_task(db, &task_id, &target_type).await; if let Some(ref err) = exec.error { tracing::error!("[UserScheduler] task {} execution failed: {}", task_id, err); } else { tracing::info!( "[UserScheduler] task {} executed successfully ({}ms)", task_id, exec.duration_ms ); } // 更新任务状态(含执行产出) let result = sqlx::query( "UPDATE scheduled_tasks SET last_run_at = NOW(), run_count = run_count + 1, updated_at = NOW(), enabled = CASE WHEN schedule_type = 'once' THEN FALSE ELSE TRUE END, next_run_at = CASE WHEN schedule_type = 'once' THEN NULL WHEN schedule_type = 'interval' AND interval_seconds IS NOT NULL THEN NOW() + (interval_seconds || ' seconds')::INTERVAL ELSE NULL END, last_result = $2, last_error = $3, last_duration_ms = $4 WHERE id = $1" ) .bind(&task_id) .bind(&exec.result) .bind(&exec.error) .bind(exec.duration_ms) .execute(db) .await; if let Err(e) = result { tracing::error!("[UserScheduler] task {} status update failed: {}", task_id, e); } } Ok(()) }