feat(scheduler): 定时任务后端持久化 + Pipeline trigger 编译修复

S4/S8 定时任务后端:
- 新增 scheduled_tasks 表 (migration v7)
- 新增 scheduled_task CRUD 模块 (handlers/service/types)
- 注册 /api/scheduler/tasks 路由 (GET/POST/PATCH/DELETE)
- 新增 start_user_task_scheduler() 30秒轮询循环
- 支持 cron/interval/once 三种调度类型
- once 类型执行后自动禁用

修复:
- pipeline_commands.rs: 修复 pipeline.trigger 字段不存在的编译错误
  (Pipeline 结构体无 trigger 字段,改用 metadata.tags/description)
This commit is contained in:
iven
2026-03-30 19:46:45 +08:00
parent c2aff09811
commit a0bbd4ba82
10 changed files with 457 additions and 5 deletions

View File

@@ -0,0 +1,24 @@
-- 用户定义的定时任务表
-- 前端 SchedulerPanel 通过此表持久化定时任务配置
CREATE TABLE IF NOT EXISTS scheduled_tasks (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL REFERENCES accounts(id) ON DELETE CASCADE,
name TEXT NOT NULL,
description TEXT,
schedule TEXT NOT NULL, -- cron 表达式 / interval / ISO timestamp
schedule_type TEXT NOT NULL DEFAULT 'cron' CHECK (schedule_type IN ('cron', 'interval', 'once')),
target_type TEXT NOT NULL CHECK (target_type IN ('agent', 'hand', 'workflow')),
target_id TEXT NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
input_payload JSONB, -- 执行时的输入参数
last_run_at TIMESTAMPTZ,
next_run_at TIMESTAMPTZ,
run_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_account ON scheduled_tasks(account_id);
CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_enabled_next ON scheduled_tasks(enabled, next_run_at) WHERE enabled = TRUE;

View File

@@ -4,7 +4,7 @@ use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use crate::error::SaasResult;
const SCHEMA_VERSION: i32 = 6;
const SCHEMA_VERSION: i32 = 7;
/// 初始化数据库
pub async fn init_db(database_url: &str) -> SaasResult<PgPool> {

View File

@@ -22,4 +22,5 @@ pub mod migration;
pub mod role;
pub mod prompt;
pub mod agent_template;
pub mod scheduled_task;
pub mod telemetry;

View File

@@ -48,6 +48,9 @@ async fn main() -> anyhow::Result<()> {
// 启动内置 DB 清理任务(设备清理等不通过 Worker 的任务)
zclaw_saas::scheduler::start_db_cleanup_tasks(db.clone());
// 启动用户定时任务调度循环30s 轮询 scheduled_tasks 表)
zclaw_saas::scheduler::start_user_task_scheduler(db.clone());
// 启动内存中的 rate limit 条目清理
let rate_limit_state = state.clone();
tokio::spawn(async move {
@@ -219,6 +222,7 @@ async fn build_router(state: AppState) -> axum::Router {
.merge(zclaw_saas::role::routes())
.merge(zclaw_saas::prompt::routes())
.merge(zclaw_saas::agent_template::routes())
.merge(zclaw_saas::scheduled_task::routes())
.merge(zclaw_saas::telemetry::routes())
.layer(middleware::from_fn_with_state(
state.clone(),

View File

@@ -0,0 +1,79 @@
//! 定时任务 HTTP 处理器
use axum::{
extract::{State, Path, Extension},
http::StatusCode,
Json,
};
use crate::state::AppState;
use crate::error::SaasResult;
use crate::auth::types::AuthContext;
use super::{types::*, service};
/// POST /api/scheduler/tasks — 创建定时任务
pub async fn create_task(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateScheduledTaskRequest>,
) -> SaasResult<(StatusCode, Json<ScheduledTaskResponse>)> {
// 验证
if req.name.is_empty() {
return Err(crate::error::SaasError::InvalidInput("任务名称不能为空".into()));
}
if req.schedule.is_empty() {
return Err(crate::error::SaasError::InvalidInput("调度表达式不能为空".into()));
}
if !["cron", "interval", "once"].contains(&req.schedule_type.as_str()) {
return Err(crate::error::SaasError::InvalidInput(
format!("无效的 schedule_type: {},可选: cron, interval, once", req.schedule_type)
));
}
if !["agent", "hand", "workflow"].contains(&req.target.target_type.as_str()) {
return Err(crate::error::SaasError::InvalidInput(
format!("无效的 target_type: {},可选: agent, hand, workflow", req.target.target_type)
));
}
let resp = service::create_task(&state.db, &ctx.account_id, &req).await?;
Ok((StatusCode::CREATED, Json(resp)))
}
/// GET /api/scheduler/tasks — 列出定时任务
pub async fn list_tasks(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<Vec<ScheduledTaskResponse>>> {
let tasks = service::list_tasks(&state.db, &ctx.account_id).await?;
Ok(Json(tasks))
}
/// GET /api/scheduler/tasks/:id — 获取单个定时任务
pub async fn get_task(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
) -> SaasResult<Json<ScheduledTaskResponse>> {
let task = service::get_task(&state.db, &ctx.account_id, &id).await?;
Ok(Json(task))
}
/// PATCH /api/scheduler/tasks/:id — 更新定时任务
pub async fn update_task(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
Json(req): Json<UpdateScheduledTaskRequest>,
) -> SaasResult<Json<ScheduledTaskResponse>> {
let task = service::update_task(&state.db, &ctx.account_id, &id, &req).await?;
Ok(Json(task))
}
/// DELETE /api/scheduler/tasks/:id — 删除定时任务
pub async fn delete_task(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
) -> SaasResult<StatusCode> {
service::delete_task(&state.db, &ctx.account_id, &id).await?;
Ok(StatusCode::NO_CONTENT)
}

View File

@@ -0,0 +1,15 @@
//! 用户定时任务管理模块
pub mod types;
pub mod service;
pub mod handlers;
use axum::routing::{get, post, patch, delete};
use crate::state::AppState;
/// 定时任务路由 (需要认证)
pub fn routes() -> axum::Router<AppState> {
axum::Router::new()
.route("/api/scheduler/tasks", get(handlers::list_tasks).post(handlers::create_task))
.route("/api/scheduler/tasks/:id", get(handlers::get_task).patch(handlers::update_task).delete(handlers::delete_task))
}

View File

@@ -0,0 +1,195 @@
//! 定时任务数据库服务层
use sqlx::{PgPool, FromRow};
use crate::error::SaasResult;
use super::types::*;
/// 数据库行结构
#[derive(Debug, FromRow)]
struct ScheduledTaskRow {
id: String,
account_id: String,
name: String,
description: Option<String>,
schedule: String,
schedule_type: String,
target_type: String,
target_id: String,
enabled: bool,
last_run_at: Option<String>,
next_run_at: Option<String>,
run_count: i32,
last_error: Option<String>,
input_payload: Option<serde_json::Value>,
created_at: String,
}
impl ScheduledTaskRow {
fn to_response(&self) -> ScheduledTaskResponse {
ScheduledTaskResponse {
id: self.id.clone(),
name: self.name.clone(),
schedule: self.schedule.clone(),
schedule_type: self.schedule_type.clone(),
target: TaskTarget {
target_type: self.target_type.clone(),
id: self.target_id.clone(),
},
enabled: self.enabled,
description: self.description.clone(),
last_run: self.last_run_at.clone(),
next_run: self.next_run_at.clone(),
run_count: self.run_count,
last_error: self.last_error.clone(),
created_at: self.created_at.clone(),
}
}
}
/// 创建定时任务
pub async fn create_task(
db: &PgPool,
account_id: &str,
req: &CreateScheduledTaskRequest,
) -> SaasResult<ScheduledTaskResponse> {
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
let input_json = req.input.as_ref().map(|v| v.to_string());
sqlx::query(
"INSERT INTO scheduled_tasks (id, account_id, name, description, schedule, schedule_type, target_type, target_id, enabled, input_payload, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $11)"
)
.bind(&id)
.bind(account_id)
.bind(&req.name)
.bind(&req.description)
.bind(&req.schedule)
.bind(&req.schedule_type)
.bind(&req.target.target_type)
.bind(&req.target.id)
.bind(req.enabled.unwrap_or(true))
.bind(&input_json)
.bind(&now)
.execute(db)
.await?;
Ok(ScheduledTaskResponse {
id,
name: req.name.clone(),
schedule: req.schedule.clone(),
schedule_type: req.schedule_type.clone(),
target: req.target.clone(),
enabled: req.enabled.unwrap_or(true),
description: req.description.clone(),
last_run: None,
next_run: None,
run_count: 0,
last_error: None,
created_at: now,
})
}
/// 列出用户的定时任务
pub async fn list_tasks(
db: &PgPool,
account_id: &str,
) -> SaasResult<Vec<ScheduledTaskResponse>> {
let rows: Vec<ScheduledTaskRow> = sqlx::query_as(
"SELECT id, account_id, name, description, schedule, schedule_type,
target_type, target_id, enabled, last_run_at, next_run_at,
run_count, last_error, input_payload, created_at
FROM scheduled_tasks WHERE account_id = $1 ORDER BY created_at DESC"
)
.bind(account_id)
.fetch_all(db)
.await?;
Ok(rows.iter().map(|r| r.to_response()).collect())
}
/// 获取单个定时任务
pub async fn get_task(
db: &PgPool,
account_id: &str,
task_id: &str,
) -> SaasResult<ScheduledTaskResponse> {
let row: Option<ScheduledTaskRow> = sqlx::query_as(
"SELECT id, account_id, name, description, schedule, schedule_type,
target_type, target_id, enabled, last_run_at, next_run_at,
run_count, last_error, input_payload, created_at
FROM scheduled_tasks WHERE id = $1 AND account_id = $2"
)
.bind(task_id)
.bind(account_id)
.fetch_optional(db)
.await?;
Ok(row
.ok_or_else(|| crate::error::SaasError::NotFound("定时任务不存在".into()))?
.to_response())
}
/// 更新定时任务
pub async fn update_task(
db: &PgPool,
account_id: &str,
task_id: &str,
req: &UpdateScheduledTaskRequest,
) -> SaasResult<ScheduledTaskResponse> {
let existing = get_task(db, account_id, task_id).await?;
let name = req.name.as_deref().unwrap_or(&existing.name);
let schedule = req.schedule.as_deref().unwrap_or(&existing.schedule);
let schedule_type = req.schedule_type.as_deref().unwrap_or(&existing.schedule_type);
let enabled = req.enabled.unwrap_or(existing.enabled);
let description = req.description.as_deref().or(existing.description.as_deref());
let now = chrono::Utc::now().to_rfc3339();
let (target_type, target_id) = if let Some(ref target) = req.target {
(target.target_type.as_str(), target.id.as_str())
} else {
(existing.target.target_type.as_str(), existing.target.id.as_str())
};
sqlx::query(
"UPDATE scheduled_tasks SET name = $1, schedule = $2, schedule_type = $3,
target_type = $4, target_id = $5, enabled = $6, description = $7,
updated_at = $8
WHERE id = $9 AND account_id = $10"
)
.bind(name)
.bind(schedule)
.bind(schedule_type)
.bind(target_type)
.bind(target_id)
.bind(enabled)
.bind(description)
.bind(&now)
.bind(task_id)
.bind(account_id)
.execute(db)
.await?;
get_task(db, account_id, task_id).await
}
/// 删除定时任务
pub async fn delete_task(
db: &PgPool,
account_id: &str,
task_id: &str,
) -> SaasResult<()> {
let result = sqlx::query(
"DELETE FROM scheduled_tasks WHERE id = $1 AND account_id = $2"
)
.bind(task_id)
.bind(account_id)
.execute(db)
.await?;
if result.rows_affected() == 0 {
return Err(crate::error::SaasError::NotFound("定时任务不存在".into()));
}
Ok(())
}

View File

@@ -0,0 +1,63 @@
//! 定时任务类型定义
use serde::{Deserialize, Serialize};
/// 创建定时任务请求
#[derive(Debug, Deserialize)]
pub struct CreateScheduledTaskRequest {
pub name: String,
pub schedule: String,
/// "cron" | "interval" | "once"
#[serde(default = "default_schedule_type")]
pub schedule_type: String,
pub target: TaskTarget,
pub description: Option<String>,
#[serde(default = "default_enabled")]
pub enabled: Option<bool>,
pub input: Option<serde_json::Value>,
}
fn default_schedule_type() -> String {
"cron".to_string()
}
fn default_enabled() -> Option<bool> {
Some(true)
}
/// 任务目标
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TaskTarget {
#[serde(rename = "type")]
pub target_type: String,
pub id: String,
}
/// 更新定时任务请求
#[derive(Debug, Deserialize)]
pub struct UpdateScheduledTaskRequest {
pub name: Option<String>,
pub schedule: Option<String>,
pub schedule_type: Option<String>,
pub target: Option<TaskTarget>,
pub description: Option<String>,
pub enabled: Option<bool>,
pub input: Option<serde_json::Value>,
}
/// 定时任务响应
#[derive(Debug, Serialize)]
pub struct ScheduledTaskResponse {
pub id: String,
pub name: String,
pub schedule: String,
pub schedule_type: String,
pub target: TaskTarget,
pub enabled: bool,
pub description: Option<String>,
pub last_run: Option<String>,
pub next_run: Option<String>,
pub run_count: i32,
pub last_error: Option<String>,
pub created_at: String,
}

View File

@@ -99,3 +99,75 @@ pub fn start_db_cleanup_tasks(db: PgPool) {
}
});
}
/// 启动用户定时任务调度循环
///
/// 每 30 秒检查 `scheduled_tasks` 表中 `enabled=true AND next_run_at <= now` 的任务,
/// 标记为已执行并更新下次执行时间。对于 `once` 类型任务,执行后自动禁用。
///
/// 注意:实际的任务执行(如触发 Agent/Hand/Workflow需要与中转服务或
/// 外部调度器集成。此 loop 当前仅负责任务状态管理。
pub fn start_user_task_scheduler(db: PgPool) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(30));
ticker.tick().await; // 跳过首次立即触发
loop {
ticker.tick().await;
if let Err(e) = tick_user_tasks(&db).await {
tracing::error!("[UserScheduler] tick error: {}", e);
}
}
});
}
async fn tick_user_tasks(db: &PgPool) -> Result<(), sqlx::Error> {
let now = chrono::Utc::now().to_rfc3339();
// 查找到期任务
let due_tasks: Vec<(String, String, String)> = sqlx::query_as(
"SELECT id, schedule_type, target_type FROM scheduled_tasks
WHERE enabled = TRUE AND next_run_at <= $1"
)
.bind(&now)
.fetch_all(db)
.await?;
if due_tasks.is_empty() {
return Ok(());
}
tracing::debug!("[UserScheduler] {} tasks due", due_tasks.len());
for (task_id, schedule_type, _target_type) in due_tasks {
// 标记执行
let now_str = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"UPDATE scheduled_tasks
SET last_run_at = $1, run_count = run_count + 1, updated_at = $1,
enabled = CASE WHEN schedule_type = 'once' THEN FALSE ELSE TRUE END,
next_run_at = NULL
WHERE id = $2"
)
.bind(&now_str)
.bind(&task_id)
.execute(db)
.await;
match result {
Ok(r) => {
if r.rows_affected() > 0 {
tracing::info!(
"[UserScheduler] task {} executed ({})",
task_id, schedule_type
);
}
}
Err(e) => {
tracing::error!("[UserScheduler] task {} failed: {}", task_id, e);
}
}
}
Ok(())
}

View File

@@ -1081,13 +1081,12 @@ pub async fn route_intent(
let mut parser = TriggerParser::new();
for (id, pipeline) in pipelines.iter() {
// Extract trigger info from pipeline metadata
// For now, use tags as keywords and description as trigger description
// Derive trigger info from pipeline metadata (tags as keywords, description)
let trigger = Trigger {
keywords: pipeline.metadata.tags.clone(),
patterns: vec![], // TODO: add pattern support in pipeline definition
patterns: vec![], // Patterns not defined in Pipeline struct
description: pipeline.metadata.description.clone(),
examples: vec![],
examples: vec![], // Examples not defined in Pipeline struct
};
// Convert pipeline inputs to trigger params