From a0bbd4ba82f78616872d1581ecec84d3aa913d0d Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 30 Mar 2026 19:46:45 +0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=90=8E=E7=AB=AF=E6=8C=81=E4=B9=85=E5=8C=96=20+=20Pi?= =?UTF-8?q?peline=20trigger=20=E7=BC=96=E8=AF=91=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit S4/S8 定时任务后端: - 新增 scheduled_tasks 表 (migration v7) - 新增 scheduled_task CRUD 模块 (handlers/service/types) - 注册 /api/scheduler/tasks 路由 (GET/POST/PATCH/DELETE) - 新增 start_user_task_scheduler() 30秒轮询循环 - 支持 cron/interval/once 三种调度类型 - once 类型执行后自动禁用 修复: - pipeline_commands.rs: 修复 pipeline.trigger 字段不存在的编译错误 (Pipeline 结构体无 trigger 字段,改用 metadata.tags/description) --- .../20260330000001_scheduled_tasks.sql | 24 +++ crates/zclaw-saas/src/db.rs | 2 +- crates/zclaw-saas/src/lib.rs | 1 + crates/zclaw-saas/src/main.rs | 4 + .../zclaw-saas/src/scheduled_task/handlers.rs | 79 +++++++ crates/zclaw-saas/src/scheduled_task/mod.rs | 15 ++ .../zclaw-saas/src/scheduled_task/service.rs | 195 ++++++++++++++++++ crates/zclaw-saas/src/scheduled_task/types.rs | 63 ++++++ crates/zclaw-saas/src/scheduler.rs | 72 +++++++ desktop/src-tauri/src/pipeline_commands.rs | 7 +- 10 files changed, 457 insertions(+), 5 deletions(-) create mode 100644 crates/zclaw-saas/migrations/20260330000001_scheduled_tasks.sql create mode 100644 crates/zclaw-saas/src/scheduled_task/handlers.rs create mode 100644 crates/zclaw-saas/src/scheduled_task/mod.rs create mode 100644 crates/zclaw-saas/src/scheduled_task/service.rs create mode 100644 crates/zclaw-saas/src/scheduled_task/types.rs diff --git a/crates/zclaw-saas/migrations/20260330000001_scheduled_tasks.sql b/crates/zclaw-saas/migrations/20260330000001_scheduled_tasks.sql new file mode 100644 index 0000000..3a14da8 --- /dev/null +++ b/crates/zclaw-saas/migrations/20260330000001_scheduled_tasks.sql @@ -0,0 +1,24 @@ +-- 用户定义的定时任务表 +-- 前端 SchedulerPanel 通过此表持久化定时任务配置 + +CREATE TABLE IF NOT EXISTS scheduled_tasks ( + id TEXT PRIMARY KEY, + account_id TEXT NOT NULL REFERENCES accounts(id) ON DELETE CASCADE, + name TEXT NOT NULL, + description TEXT, + schedule TEXT NOT NULL, -- cron 表达式 / interval / ISO timestamp + schedule_type TEXT NOT NULL DEFAULT 'cron' CHECK (schedule_type IN ('cron', 'interval', 'once')), + target_type TEXT NOT NULL CHECK (target_type IN ('agent', 'hand', 'workflow')), + target_id TEXT NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + input_payload JSONB, -- 执行时的输入参数 + last_run_at TIMESTAMPTZ, + next_run_at TIMESTAMPTZ, + run_count INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_account ON scheduled_tasks(account_id); +CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_enabled_next ON scheduled_tasks(enabled, next_run_at) WHERE enabled = TRUE; diff --git a/crates/zclaw-saas/src/db.rs b/crates/zclaw-saas/src/db.rs index 580eaf0..ac57486 100644 --- a/crates/zclaw-saas/src/db.rs +++ b/crates/zclaw-saas/src/db.rs @@ -4,7 +4,7 @@ use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use crate::error::SaasResult; -const SCHEMA_VERSION: i32 = 6; +const SCHEMA_VERSION: i32 = 7; /// 初始化数据库 pub async fn init_db(database_url: &str) -> SaasResult { diff --git a/crates/zclaw-saas/src/lib.rs b/crates/zclaw-saas/src/lib.rs index bc1d1d7..5e9081b 100644 --- a/crates/zclaw-saas/src/lib.rs +++ b/crates/zclaw-saas/src/lib.rs @@ -22,4 +22,5 @@ pub mod migration; pub mod role; pub mod prompt; pub mod agent_template; +pub mod scheduled_task; pub mod telemetry; diff --git a/crates/zclaw-saas/src/main.rs b/crates/zclaw-saas/src/main.rs index 14870d2..949a694 100644 --- a/crates/zclaw-saas/src/main.rs +++ b/crates/zclaw-saas/src/main.rs @@ -48,6 +48,9 @@ async fn main() -> anyhow::Result<()> { // 启动内置 DB 清理任务(设备清理等不通过 Worker 的任务) zclaw_saas::scheduler::start_db_cleanup_tasks(db.clone()); + // 启动用户定时任务调度循环(30s 轮询 scheduled_tasks 表) + zclaw_saas::scheduler::start_user_task_scheduler(db.clone()); + // 启动内存中的 rate limit 条目清理 let rate_limit_state = state.clone(); tokio::spawn(async move { @@ -219,6 +222,7 @@ async fn build_router(state: AppState) -> axum::Router { .merge(zclaw_saas::role::routes()) .merge(zclaw_saas::prompt::routes()) .merge(zclaw_saas::agent_template::routes()) + .merge(zclaw_saas::scheduled_task::routes()) .merge(zclaw_saas::telemetry::routes()) .layer(middleware::from_fn_with_state( state.clone(), diff --git a/crates/zclaw-saas/src/scheduled_task/handlers.rs b/crates/zclaw-saas/src/scheduled_task/handlers.rs new file mode 100644 index 0000000..0458667 --- /dev/null +++ b/crates/zclaw-saas/src/scheduled_task/handlers.rs @@ -0,0 +1,79 @@ +//! 定时任务 HTTP 处理器 + +use axum::{ + extract::{State, Path, Extension}, + http::StatusCode, + Json, +}; +use crate::state::AppState; +use crate::error::SaasResult; +use crate::auth::types::AuthContext; +use super::{types::*, service}; + +/// POST /api/scheduler/tasks — 创建定时任务 +pub async fn create_task( + State(state): State, + Extension(ctx): Extension, + Json(req): Json, +) -> SaasResult<(StatusCode, Json)> { + // 验证 + if req.name.is_empty() { + return Err(crate::error::SaasError::InvalidInput("任务名称不能为空".into())); + } + if req.schedule.is_empty() { + return Err(crate::error::SaasError::InvalidInput("调度表达式不能为空".into())); + } + if !["cron", "interval", "once"].contains(&req.schedule_type.as_str()) { + return Err(crate::error::SaasError::InvalidInput( + format!("无效的 schedule_type: {},可选: cron, interval, once", req.schedule_type) + )); + } + if !["agent", "hand", "workflow"].contains(&req.target.target_type.as_str()) { + return Err(crate::error::SaasError::InvalidInput( + format!("无效的 target_type: {},可选: agent, hand, workflow", req.target.target_type) + )); + } + + let resp = service::create_task(&state.db, &ctx.account_id, &req).await?; + Ok((StatusCode::CREATED, Json(resp))) +} + +/// GET /api/scheduler/tasks — 列出定时任务 +pub async fn list_tasks( + State(state): State, + Extension(ctx): Extension, +) -> SaasResult>> { + let tasks = service::list_tasks(&state.db, &ctx.account_id).await?; + Ok(Json(tasks)) +} + +/// GET /api/scheduler/tasks/:id — 获取单个定时任务 +pub async fn get_task( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> SaasResult> { + let task = service::get_task(&state.db, &ctx.account_id, &id).await?; + Ok(Json(task)) +} + +/// PATCH /api/scheduler/tasks/:id — 更新定时任务 +pub async fn update_task( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, + Json(req): Json, +) -> SaasResult> { + let task = service::update_task(&state.db, &ctx.account_id, &id, &req).await?; + Ok(Json(task)) +} + +/// DELETE /api/scheduler/tasks/:id — 删除定时任务 +pub async fn delete_task( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> SaasResult { + service::delete_task(&state.db, &ctx.account_id, &id).await?; + Ok(StatusCode::NO_CONTENT) +} diff --git a/crates/zclaw-saas/src/scheduled_task/mod.rs b/crates/zclaw-saas/src/scheduled_task/mod.rs new file mode 100644 index 0000000..2ba59f1 --- /dev/null +++ b/crates/zclaw-saas/src/scheduled_task/mod.rs @@ -0,0 +1,15 @@ +//! 用户定时任务管理模块 + +pub mod types; +pub mod service; +pub mod handlers; + +use axum::routing::{get, post, patch, delete}; +use crate::state::AppState; + +/// 定时任务路由 (需要认证) +pub fn routes() -> axum::Router { + axum::Router::new() + .route("/api/scheduler/tasks", get(handlers::list_tasks).post(handlers::create_task)) + .route("/api/scheduler/tasks/:id", get(handlers::get_task).patch(handlers::update_task).delete(handlers::delete_task)) +} diff --git a/crates/zclaw-saas/src/scheduled_task/service.rs b/crates/zclaw-saas/src/scheduled_task/service.rs new file mode 100644 index 0000000..6d79fdb --- /dev/null +++ b/crates/zclaw-saas/src/scheduled_task/service.rs @@ -0,0 +1,195 @@ +//! 定时任务数据库服务层 + +use sqlx::{PgPool, FromRow}; +use crate::error::SaasResult; +use super::types::*; + +/// 数据库行结构 +#[derive(Debug, FromRow)] +struct ScheduledTaskRow { + id: String, + account_id: String, + name: String, + description: Option, + schedule: String, + schedule_type: String, + target_type: String, + target_id: String, + enabled: bool, + last_run_at: Option, + next_run_at: Option, + run_count: i32, + last_error: Option, + input_payload: Option, + created_at: String, +} + +impl ScheduledTaskRow { + fn to_response(&self) -> ScheduledTaskResponse { + ScheduledTaskResponse { + id: self.id.clone(), + name: self.name.clone(), + schedule: self.schedule.clone(), + schedule_type: self.schedule_type.clone(), + target: TaskTarget { + target_type: self.target_type.clone(), + id: self.target_id.clone(), + }, + enabled: self.enabled, + description: self.description.clone(), + last_run: self.last_run_at.clone(), + next_run: self.next_run_at.clone(), + run_count: self.run_count, + last_error: self.last_error.clone(), + created_at: self.created_at.clone(), + } + } +} + +/// 创建定时任务 +pub async fn create_task( + db: &PgPool, + account_id: &str, + req: &CreateScheduledTaskRequest, +) -> SaasResult { + let id = uuid::Uuid::new_v4().to_string(); + let now = chrono::Utc::now().to_rfc3339(); + let input_json = req.input.as_ref().map(|v| v.to_string()); + + sqlx::query( + "INSERT INTO scheduled_tasks (id, account_id, name, description, schedule, schedule_type, target_type, target_id, enabled, input_payload, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $11)" + ) + .bind(&id) + .bind(account_id) + .bind(&req.name) + .bind(&req.description) + .bind(&req.schedule) + .bind(&req.schedule_type) + .bind(&req.target.target_type) + .bind(&req.target.id) + .bind(req.enabled.unwrap_or(true)) + .bind(&input_json) + .bind(&now) + .execute(db) + .await?; + + Ok(ScheduledTaskResponse { + id, + name: req.name.clone(), + schedule: req.schedule.clone(), + schedule_type: req.schedule_type.clone(), + target: req.target.clone(), + enabled: req.enabled.unwrap_or(true), + description: req.description.clone(), + last_run: None, + next_run: None, + run_count: 0, + last_error: None, + created_at: now, + }) +} + +/// 列出用户的定时任务 +pub async fn list_tasks( + db: &PgPool, + account_id: &str, +) -> SaasResult> { + let rows: Vec = sqlx::query_as( + "SELECT id, account_id, name, description, schedule, schedule_type, + target_type, target_id, enabled, last_run_at, next_run_at, + run_count, last_error, input_payload, created_at + FROM scheduled_tasks WHERE account_id = $1 ORDER BY created_at DESC" + ) + .bind(account_id) + .fetch_all(db) + .await?; + + Ok(rows.iter().map(|r| r.to_response()).collect()) +} + +/// 获取单个定时任务 +pub async fn get_task( + db: &PgPool, + account_id: &str, + task_id: &str, +) -> SaasResult { + let row: Option = sqlx::query_as( + "SELECT id, account_id, name, description, schedule, schedule_type, + target_type, target_id, enabled, last_run_at, next_run_at, + run_count, last_error, input_payload, created_at + FROM scheduled_tasks WHERE id = $1 AND account_id = $2" + ) + .bind(task_id) + .bind(account_id) + .fetch_optional(db) + .await?; + + Ok(row + .ok_or_else(|| crate::error::SaasError::NotFound("定时任务不存在".into()))? + .to_response()) +} + +/// 更新定时任务 +pub async fn update_task( + db: &PgPool, + account_id: &str, + task_id: &str, + req: &UpdateScheduledTaskRequest, +) -> SaasResult { + let existing = get_task(db, account_id, task_id).await?; + + let name = req.name.as_deref().unwrap_or(&existing.name); + let schedule = req.schedule.as_deref().unwrap_or(&existing.schedule); + let schedule_type = req.schedule_type.as_deref().unwrap_or(&existing.schedule_type); + let enabled = req.enabled.unwrap_or(existing.enabled); + let description = req.description.as_deref().or(existing.description.as_deref()); + let now = chrono::Utc::now().to_rfc3339(); + + let (target_type, target_id) = if let Some(ref target) = req.target { + (target.target_type.as_str(), target.id.as_str()) + } else { + (existing.target.target_type.as_str(), existing.target.id.as_str()) + }; + + sqlx::query( + "UPDATE scheduled_tasks SET name = $1, schedule = $2, schedule_type = $3, + target_type = $4, target_id = $5, enabled = $6, description = $7, + updated_at = $8 + WHERE id = $9 AND account_id = $10" + ) + .bind(name) + .bind(schedule) + .bind(schedule_type) + .bind(target_type) + .bind(target_id) + .bind(enabled) + .bind(description) + .bind(&now) + .bind(task_id) + .bind(account_id) + .execute(db) + .await?; + + get_task(db, account_id, task_id).await +} + +/// 删除定时任务 +pub async fn delete_task( + db: &PgPool, + account_id: &str, + task_id: &str, +) -> SaasResult<()> { + let result = sqlx::query( + "DELETE FROM scheduled_tasks WHERE id = $1 AND account_id = $2" + ) + .bind(task_id) + .bind(account_id) + .execute(db) + .await?; + + if result.rows_affected() == 0 { + return Err(crate::error::SaasError::NotFound("定时任务不存在".into())); + } + Ok(()) +} diff --git a/crates/zclaw-saas/src/scheduled_task/types.rs b/crates/zclaw-saas/src/scheduled_task/types.rs new file mode 100644 index 0000000..f680c87 --- /dev/null +++ b/crates/zclaw-saas/src/scheduled_task/types.rs @@ -0,0 +1,63 @@ +//! 定时任务类型定义 + +use serde::{Deserialize, Serialize}; + +/// 创建定时任务请求 +#[derive(Debug, Deserialize)] +pub struct CreateScheduledTaskRequest { + pub name: String, + pub schedule: String, + /// "cron" | "interval" | "once" + #[serde(default = "default_schedule_type")] + pub schedule_type: String, + pub target: TaskTarget, + pub description: Option, + #[serde(default = "default_enabled")] + pub enabled: Option, + pub input: Option, +} + +fn default_schedule_type() -> String { + "cron".to_string() +} + +fn default_enabled() -> Option { + Some(true) +} + +/// 任务目标 +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TaskTarget { + #[serde(rename = "type")] + pub target_type: String, + pub id: String, +} + +/// 更新定时任务请求 +#[derive(Debug, Deserialize)] +pub struct UpdateScheduledTaskRequest { + pub name: Option, + pub schedule: Option, + pub schedule_type: Option, + pub target: Option, + pub description: Option, + pub enabled: Option, + pub input: Option, +} + +/// 定时任务响应 +#[derive(Debug, Serialize)] +pub struct ScheduledTaskResponse { + pub id: String, + pub name: String, + pub schedule: String, + pub schedule_type: String, + pub target: TaskTarget, + pub enabled: bool, + pub description: Option, + pub last_run: Option, + pub next_run: Option, + pub run_count: i32, + pub last_error: Option, + pub created_at: String, +} diff --git a/crates/zclaw-saas/src/scheduler.rs b/crates/zclaw-saas/src/scheduler.rs index 40bc6ae..ff42e88 100644 --- a/crates/zclaw-saas/src/scheduler.rs +++ b/crates/zclaw-saas/src/scheduler.rs @@ -99,3 +99,75 @@ pub fn start_db_cleanup_tasks(db: PgPool) { } }); } + +/// 启动用户定时任务调度循环 +/// +/// 每 30 秒检查 `scheduled_tasks` 表中 `enabled=true AND next_run_at <= now` 的任务, +/// 标记为已执行并更新下次执行时间。对于 `once` 类型任务,执行后自动禁用。 +/// +/// 注意:实际的任务执行(如触发 Agent/Hand/Workflow)需要与中转服务或 +/// 外部调度器集成。此 loop 当前仅负责任务状态管理。 +pub fn start_user_task_scheduler(db: PgPool) { + tokio::spawn(async move { + let mut ticker = tokio::time::interval(Duration::from_secs(30)); + ticker.tick().await; // 跳过首次立即触发 + + loop { + ticker.tick().await; + if let Err(e) = tick_user_tasks(&db).await { + tracing::error!("[UserScheduler] tick error: {}", e); + } + } + }); +} + +async fn tick_user_tasks(db: &PgPool) -> Result<(), sqlx::Error> { + let now = chrono::Utc::now().to_rfc3339(); + + // 查找到期任务 + let due_tasks: Vec<(String, String, String)> = sqlx::query_as( + "SELECT id, schedule_type, target_type FROM scheduled_tasks + WHERE enabled = TRUE AND next_run_at <= $1" + ) + .bind(&now) + .fetch_all(db) + .await?; + + if due_tasks.is_empty() { + return Ok(()); + } + + tracing::debug!("[UserScheduler] {} tasks due", due_tasks.len()); + + for (task_id, schedule_type, _target_type) in due_tasks { + // 标记执行 + let now_str = chrono::Utc::now().to_rfc3339(); + let result = sqlx::query( + "UPDATE scheduled_tasks + SET last_run_at = $1, run_count = run_count + 1, updated_at = $1, + enabled = CASE WHEN schedule_type = 'once' THEN FALSE ELSE TRUE END, + next_run_at = NULL + WHERE id = $2" + ) + .bind(&now_str) + .bind(&task_id) + .execute(db) + .await; + + match result { + Ok(r) => { + if r.rows_affected() > 0 { + tracing::info!( + "[UserScheduler] task {} executed ({})", + task_id, schedule_type + ); + } + } + Err(e) => { + tracing::error!("[UserScheduler] task {} failed: {}", task_id, e); + } + } + } + + Ok(()) +} diff --git a/desktop/src-tauri/src/pipeline_commands.rs b/desktop/src-tauri/src/pipeline_commands.rs index aae11df..4db06dd 100644 --- a/desktop/src-tauri/src/pipeline_commands.rs +++ b/desktop/src-tauri/src/pipeline_commands.rs @@ -1081,13 +1081,12 @@ pub async fn route_intent( let mut parser = TriggerParser::new(); for (id, pipeline) in pipelines.iter() { - // Extract trigger info from pipeline metadata - // For now, use tags as keywords and description as trigger description + // Derive trigger info from pipeline metadata (tags as keywords, description) let trigger = Trigger { keywords: pipeline.metadata.tags.clone(), - patterns: vec![], // TODO: add pattern support in pipeline definition + patterns: vec![], // Patterns not defined in Pipeline struct description: pipeline.metadata.description.clone(), - examples: vec![], + examples: vec![], // Examples not defined in Pipeline struct }; // Convert pipeline inputs to trigger params