Files
zclaw_openfang/crates/zclaw-saas/src/scheduler.rs
iven cc26797faf
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(saas): eliminate 6 compiler warnings + stabilize directive complete
- Remove unused imports: Utc (billing/service), StatusCode (billing/handlers), Sha256 (billing/handlers)
- Fix unused variables: _db (scheduler), _e (payment WeChat error)
- Fix visibility: RegisterDeviceRequest pub(super) → pub (used in pub handler)
- Update STABILIZATION_DIRECTIVE.md: all 7 criteria met, downgrade to advisory
- Fix TRUTH.md §2.2: mark P0/P1 defects as resolved, update Admin pages count to 14

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-03 21:57:04 +08:00

295 lines
9.7 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! 声明式 Scheduler — 借鉴 loco-rs 的定时任务模式
//!
//! 通过 TOML 配置定时任务,无需改代码调整调度时间。
//! 配置格式在 config.rs 的 SchedulerConfig / JobConfig 中定义。
use std::time::{Duration, Instant};
use sqlx::PgPool;
use crate::config::SchedulerConfig;
use crate::workers::WorkerDispatcher;
/// 单次任务执行的产出
struct TaskExecution {
result: Option<String>,
error: Option<String>,
duration_ms: i64,
}
/// 解析时间间隔字符串为 Duration
pub fn parse_duration(s: &str) -> Result<Duration, String> {
let s = s.trim().to_lowercase();
let (num_part, multiplier) = if s.ends_with('s') {
(&s[..s.len()-1], 1u64)
} else if s.ends_with('m') {
(&s[..s.len()-1], 60u64)
} else if s.ends_with('h') {
(&s[..s.len()-1], 3600u64)
} else if s.ends_with('d') {
(&s[..s.len()-1], 86400u64)
} else {
return Err(format!("Invalid interval format: '{}'. Use '30s', '5m', '1h', '1d'", s));
};
let num: u64 = num_part.parse()
.map_err(|_| format!("Invalid number in interval: '{}'", num_part))?;
Ok(Duration::from_secs(num * multiplier))
}
/// 启动所有定时任务
pub fn start_scheduler(config: &SchedulerConfig, _db: PgPool, dispatcher: WorkerDispatcher) {
for job in &config.jobs {
let interval = match parse_duration(&job.interval) {
Ok(d) => d,
Err(e) => {
tracing::error!("Scheduler job '{}': {}", job.name, e);
continue;
}
};
let job_name = job.name.clone();
let task_name = job.task.clone();
let args_json = job.args.clone();
let dispatcher = dispatcher.clone_ref();
let run_on_start = job.run_on_start;
tracing::info!(
"Scheduler: registering job '{}' ({} interval, task={})",
job_name, job.interval, task_name
);
tokio::spawn(async move {
if run_on_start {
tracing::info!("Scheduler: running '{}' on start", job_name);
if let Err(e) = dispatcher.dispatch_raw(&task_name, args_json.clone()).await {
tracing::error!("Scheduler job '{}' on-start failed: {}", job_name, e);
}
}
let mut interval_timer = tokio::time::interval(interval);
loop {
interval_timer.tick().await;
tracing::debug!("Scheduler: triggering job '{}'", job_name);
if let Err(e) = dispatcher.dispatch_raw(&task_name, args_json.clone()).await {
tracing::error!("Scheduler job '{}' failed: {}", job_name, e);
}
}
});
}
}
/// 内置的 DB 清理任务(不通过 Worker直接执行 SQL
pub fn start_db_cleanup_tasks(db: PgPool) {
let db_devices = db.clone();
let db_key_pool = db.clone();
// 每 24 小时清理不活跃设备
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(86400));
loop {
interval.tick().await;
match sqlx::query(
"DELETE FROM devices WHERE last_seen_at < $1"
)
.bind({
let cutoff = (chrono::Utc::now() - chrono::Duration::days(90)).to_rfc3339();
cutoff
})
.execute(&db_devices)
.await
{
Ok(result) => {
if result.rows_affected() > 0 {
tracing::info!("Cleaned up {} inactive devices (90d)", result.rows_affected());
}
}
Err(e) => tracing::error!("Device cleanup failed: {}", e),
}
}
});
// 每 24 小时清理过期的 key_usage_window 记录
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(86400));
loop {
interval.tick().await;
match sqlx::query(
"DELETE FROM key_usage_window WHERE window_minute < to_char(NOW() - INTERVAL '24 hours', 'YYYY-MM-DD\"T\"HH24:MI')"
)
.execute(&db_key_pool)
.await
{
Ok(result) => {
if result.rows_affected() > 0 {
tracing::info!("Cleaned up {} expired key_usage_window records (24h)", result.rows_affected());
}
}
Err(e) => tracing::error!("Key usage window cleanup failed: {}", e),
}
}
});
}
/// 用户任务调度器
///
/// 每 30 秒轮询 scheduled_tasks 表,执行到期任务。
/// 支持 agent/hand/workflow 三种任务类型。
/// 当前版本执行状态管理和日志记录;未来将通过内部 API 触发实际执行。
pub fn start_user_task_scheduler(db: PgPool) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(30));
ticker.tick().await; // 跳过首次立即触发
loop {
ticker.tick().await;
if let Err(e) = tick_user_tasks(&db).await {
tracing::error!("[UserScheduler] tick error: {}", e);
}
}
});
}
/// 执行单个调度任务,返回执行产出(结果/错误/耗时)
async fn execute_scheduled_task(
db: &PgPool,
task_id: &str,
target_type: &str,
) -> TaskExecution {
let start = Instant::now();
let task_info: Option<(String, Option<String>)> = match sqlx::query_as(
"SELECT name, config_json FROM scheduled_tasks WHERE id = $1"
)
.bind(task_id)
.fetch_optional(db)
.await
{
Ok(info) => info,
Err(e) => {
let elapsed = start.elapsed().as_millis() as i64;
return TaskExecution {
result: None,
error: Some(format!("Failed to fetch task {}: {}", task_id, e)),
duration_ms: elapsed,
};
}
};
let (task_name, _config_json) = match task_info {
Some(info) => info,
None => {
let elapsed = start.elapsed().as_millis() as i64;
return TaskExecution {
result: None,
error: Some(format!("Task {} not found", task_id)),
duration_ms: elapsed,
};
}
};
tracing::info!(
"[UserScheduler] Dispatching task '{}' (target_type={})",
task_name, target_type
);
let exec_result = match target_type {
t if t == "agent" => {
tracing::info!("[UserScheduler] Agent task '{}' queued for execution", task_name);
Ok("agent_dispatched".to_string())
}
t if t == "hand" => {
tracing::info!("[UserScheduler] Hand task '{}' queued for execution", task_name);
Ok("hand_dispatched".to_string())
}
t if t == "workflow" => {
tracing::info!("[UserScheduler] Workflow task '{}' queued for execution", task_name);
Ok("workflow_dispatched".to_string())
}
other => {
tracing::warn!("[UserScheduler] Unknown target_type '{}' for task '{}'", other, task_name);
Err(format!("Unknown target_type: {}", other))
}
};
let elapsed = start.elapsed().as_millis() as i64;
match exec_result {
Ok(msg) => TaskExecution {
result: Some(msg),
error: None,
duration_ms: elapsed,
},
Err(err) => TaskExecution {
result: None,
error: Some(err),
duration_ms: elapsed,
},
}
}
async fn tick_user_tasks(db: &PgPool) -> Result<(), sqlx::Error> {
// 查找到期任务next_run_at 兼容 TEXT 和 TIMESTAMPTZ 两种列类型)
let due_tasks: Vec<(String, String, String)> = sqlx::query_as(
"SELECT id, schedule_type, target_type FROM scheduled_tasks
WHERE enabled = TRUE AND next_run_at::TIMESTAMPTZ <= NOW()"
)
.fetch_all(db)
.await?;
if due_tasks.is_empty() {
return Ok(());
}
tracing::debug!("[UserScheduler] {} tasks due", due_tasks.len());
for (task_id, schedule_type, target_type) in due_tasks {
tracing::info!(
"[UserScheduler] Executing task {} (type={}, schedule={})",
task_id, target_type, schedule_type
);
// 执行任务并收集产出
let exec = execute_scheduled_task(db, &task_id, &target_type).await;
if let Some(ref err) = exec.error {
tracing::error!("[UserScheduler] task {} execution failed: {}", task_id, err);
} else {
tracing::info!(
"[UserScheduler] task {} executed successfully ({}ms)",
task_id, exec.duration_ms
);
}
// 更新任务状态(含执行产出)
let result = sqlx::query(
"UPDATE scheduled_tasks
SET last_run_at = NOW(),
run_count = run_count + 1,
updated_at = NOW(),
enabled = CASE WHEN schedule_type = 'once' THEN FALSE ELSE TRUE END,
next_run_at = CASE
WHEN schedule_type = 'once' THEN NULL
WHEN schedule_type = 'interval' AND interval_seconds IS NOT NULL
THEN NOW() + (interval_seconds || ' seconds')::INTERVAL
ELSE NULL
END,
last_result = $2,
last_error = $3,
last_duration_ms = $4
WHERE id = $1"
)
.bind(&task_id)
.bind(&exec.result)
.bind(&exec.error)
.bind(exec.duration_ms)
.execute(db)
.await;
if let Err(e) = result {
tracing::error!("[UserScheduler] task {} status update failed: {}", task_id, e);
}
}
Ok(())
}