use axum::Router; use axum::routing::{get, post, put}; use std::time::Duration; use uuid::Uuid; use erp_core::error::AppResult; use erp_core::events::EventBus; use erp_core::module::ErpModule; use crate::handler::{definition_handler, instance_handler, task_handler}; /// Workflow module implementing the `ErpModule` trait. /// /// Manages workflow definitions, process instances, tasks, /// and the token-driven execution engine. pub struct WorkflowModule; impl WorkflowModule { pub fn new() -> Self { Self } /// Build protected (authenticated) routes for the workflow module. pub fn protected_routes() -> Router where crate::workflow_state::WorkflowState: axum::extract::FromRef, S: Clone + Send + Sync + 'static, { Router::new() // Definition routes .route( "/workflow/definitions", get(definition_handler::list_definitions) .post(definition_handler::create_definition), ) .route( "/workflow/definitions/{id}", get(definition_handler::get_definition).put(definition_handler::update_definition), ) .route( "/workflow/definitions/{id}/publish", post(definition_handler::publish_definition), ) .route( "/workflow/definitions/{id}/deprecate", post(definition_handler::deprecate_definition), ) // Instance routes .route( "/workflow/instances", post(instance_handler::start_instance).get(instance_handler::list_instances), ) .route( "/workflow/instances/{id}", get(instance_handler::get_instance), ) .route( "/workflow/instances/{id}/suspend", post(instance_handler::suspend_instance), ) .route( "/workflow/instances/{id}/resume", post(instance_handler::resume_instance), ) .route( "/workflow/instances/{id}/terminate", post(instance_handler::terminate_instance), ) // Task routes .route( "/workflow/tasks/pending", get(task_handler::list_pending_tasks), ) .route( "/workflow/tasks/completed", get(task_handler::list_completed_tasks), ) .route( "/workflow/tasks/{id}/complete", post(task_handler::complete_task), ) .route( "/workflow/tasks/{id}/delegate", post(task_handler::delegate_task), ) .route( "/workflow/tasks/{id}/claim", put(task_handler::claim_task), ) } /// 启动超时检查后台任务。 /// /// 每 60 秒扫描一次 tasks 表,查找 due_date 已过期但仍处于 pending 状态的任务。 /// 发现超时任务时发布 `task.timeout` 事件到事件总线,并记录 warning 日志。 pub fn start_timeout_checker(db: sea_orm::DatabaseConnection, event_bus: EventBus) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); // 首次跳过,等一个完整间隔再执行 interval.tick().await; loop { interval.tick().await; match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks_with_details(&db).await { Ok(overdue) => { if !overdue.is_empty() { tracing::warn!( count = overdue.len(), "发现超时未完成的任务,发布 task.timeout 事件" ); for (task_id, tenant_id, instance_id, assignee_id) in &overdue { // 发布超时事件 let event = erp_core::events::DomainEvent::new( "task.timeout", *tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "task_id": task_id, "instance_id": instance_id, "assignee_id": assignee_id, })), ); event_bus.publish(event, &db).await; } } } Err(e) => { tracing::warn!(error = %e, "超时检查任务执行失败"); } } } }); } } impl Default for WorkflowModule { fn default() -> Self { Self::new() } } #[async_trait::async_trait] impl ErpModule for WorkflowModule { fn name(&self) -> &str { "workflow" } fn version(&self) -> &str { env!("CARGO_PKG_VERSION") } fn dependencies(&self) -> Vec<&str> { vec!["auth"] } fn register_event_handlers(&self, _bus: &EventBus) { // 事件处理器已迁移到 on_startup(需要 DB 连接),此处保留空实现以兼容 trait 签名 } async fn on_startup( &self, ctx: &erp_core::module::ModuleContext, ) -> erp_core::error::AppResult<()> { let db = ctx.db.clone(); let bus = ctx.event_bus.clone(); // 订阅 user. 前缀事件,处理 user.deleted let (mut receiver, _handle) = bus.subscribe_filtered("user.".to_string()); tokio::spawn(async move { loop { match receiver.recv().await { Some(event) if event.event_type == "user.deleted" => { let user_id = match event.payload.get("user_id").and_then(|v| v.as_str()) { Some(id) => match Uuid::parse_str(id) { Ok(u) => u, Err(e) => { tracing::warn!( error = %e, "user.deleted 事件的 user_id 解析失败,跳过" ); continue; } }, _ => { tracing::warn!("user.deleted 事件缺少 user_id 字段,跳过"); continue; } }; tracing::info!( user_id = %user_id, tenant_id = %event.tenant_id, "收到 user.deleted 事件,查找并终止相关流程实例" ); // 查找该用户有活跃任务的流程实例 use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; use chrono::Utc; // 查找该用户作为 assignee 的 pending 任务 let active_tasks = crate::entity::task::Entity::find() .filter(crate::entity::task::Column::TenantId.eq(event.tenant_id)) .filter(crate::entity::task::Column::AssigneeId.eq(user_id)) .filter(crate::entity::task::Column::Status.eq("pending")) .filter(crate::entity::task::Column::DeletedAt.is_null()) .all(&db) .await; match active_tasks { Ok(tasks) if tasks.is_empty() => { tracing::info!( user_id = %user_id, "该用户没有活跃的待办任务,无需终止流程" ); } Ok(tasks) => { // 收集需要终止的实例 ID let instance_ids: std::collections::HashSet = tasks.iter().map(|t| t.instance_id).collect(); for instance_id in &instance_ids { // 将实例状态设置为 terminated let instance = crate::entity::process_instance::Entity::find_by_id(*instance_id) .one(&db) .await; if let Ok(Some(inst)) = instance { if inst.tenant_id == event.tenant_id && inst.deleted_at.is_none() && inst.status == "running" { let ver = inst.version; let mut active: crate::entity::process_instance::ActiveModel = inst.into(); active.status = Set("terminated".to_string()); active.updated_at = Set(Utc::now()); active.version = Set(ver + 1); match active.update(&db).await { Ok(_) => { tracing::info!( instance_id = %instance_id, "流程实例已终止(用户被删除)" ); } Err(e) => { tracing::warn!( instance_id = %instance_id, error = %e, "终止流程实例失败" ); } } } } } tracing::info!( user_id = %user_id, instance_count = instance_ids.len(), task_count = tasks.len(), "用户删除事件处理完成" ); } Err(e) => { tracing::warn!( error = %e, "查询用户活跃任务失败" ); } } } Some(event) => { // 其他 user. 前缀事件,忽略 tracing::debug!( event_type = %event.event_type, "忽略非 user.deleted 事件" ); } None => { // 通道关闭,退出循环 tracing::info!("Workflow 事件订阅通道已关闭"); break; } } } }); tracing::info!(module = "workflow", "Workflow 事件处理器已注册(监听 user.deleted)"); Ok(()) } async fn on_tenant_created( &self, _tenant_id: Uuid, _db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult<()> { Ok(()) } async fn on_tenant_deleted( &self, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult<()> { use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; // Delete in dependency order: variables → tasks → tokens → instances → definitions // process_variables crate::entity::process_variable::Entity::delete_many() .filter(crate::entity::process_variable::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // tasks crate::entity::task::Entity::delete_many() .filter(crate::entity::task::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // tokens crate::entity::token::Entity::delete_many() .filter(crate::entity::token::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // process_instances crate::entity::process_instance::Entity::delete_many() .filter(crate::entity::process_instance::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // process_definitions crate::entity::process_definition::Entity::delete_many() .filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id)) .exec(db) .await?; tracing::info!(%tenant_id, "Workflow data cleaned up for deleted tenant"); Ok(()) } fn as_any(&self) -> &dyn std::any::Any { self } }