use axum::Router; use axum::routing::{get, post, put}; use std::time::Duration; use uuid::Uuid; use erp_core::error::AppResult; use erp_core::events::EventBus; use erp_core::module::{ErpModule, PermissionDescriptor}; use crate::handler::{definition_handler, instance_handler, task_handler}; /// Workflow module implementing the `ErpModule` trait. /// /// Manages workflow definitions, process instances, tasks, /// and the token-driven execution engine. pub struct WorkflowModule; impl WorkflowModule { pub fn new() -> Self { Self } /// Build protected (authenticated) routes for the workflow module. pub fn protected_routes() -> Router where crate::workflow_state::WorkflowState: axum::extract::FromRef, S: Clone + Send + Sync + 'static, { Router::new() // Definition routes .route( "/workflow/definitions", get(definition_handler::list_definitions) .post(definition_handler::create_definition), ) .route( "/workflow/definitions/{id}", get(definition_handler::get_definition).put(definition_handler::update_definition), ) .route( "/workflow/definitions/{id}/publish", post(definition_handler::publish_definition), ) .route( "/workflow/definitions/{id}/deprecate", post(definition_handler::deprecate_definition), ) // Instance routes .route( "/workflow/instances", post(instance_handler::start_instance).get(instance_handler::list_instances), ) .route( "/workflow/instances/{id}", get(instance_handler::get_instance), ) .route( "/workflow/instances/{id}/suspend", post(instance_handler::suspend_instance), ) .route( "/workflow/instances/{id}/resume", post(instance_handler::resume_instance), ) .route( "/workflow/instances/{id}/terminate", post(instance_handler::terminate_instance), ) // Task routes .route( "/workflow/tasks/pending", get(task_handler::list_pending_tasks), ) .route( "/workflow/tasks/completed", get(task_handler::list_completed_tasks), ) .route( "/workflow/tasks/{id}/complete", post(task_handler::complete_task), ) .route( "/workflow/tasks/{id}/delegate", post(task_handler::delegate_task), ) .route("/workflow/tasks/{id}/claim", put(task_handler::claim_task)) } /// 启动超时检查后台任务。 /// /// 每 60 秒扫描一次 tasks 表,查找 due_date 已过期但仍处于 pending 状态的任务。 /// 发现超时任务时发布 `task.timeout` 事件到事件总线,并记录 warning 日志。 pub fn start_timeout_checker(db: sea_orm::DatabaseConnection, event_bus: EventBus) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); // 首次跳过,等一个完整间隔再执行 interval.tick().await; loop { interval.tick().await; match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks_with_details( &db, ) .await { Ok(overdue) => { if !overdue.is_empty() { tracing::warn!( count = overdue.len(), "发现超时未完成的任务,发布 task.timeout 事件" ); for (task_id, tenant_id, instance_id, assignee_id) in &overdue { // 发布超时事件 let event = erp_core::events::DomainEvent::new( "task.timeout", *tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "task_id": task_id, "instance_id": instance_id, "assignee_id": assignee_id, })), ); event_bus.publish(event, &db).await; } } } Err(e) => { tracing::warn!(error = %e, "超时检查任务执行失败"); } } } }); } } /// 处理 AI 行动工作流启动请求 async fn handle_ai_action_start( db: &sea_orm::DatabaseConnection, event_bus: &EventBus, event: &erp_core::events::DomainEvent, ) { let workflow_key = match event.payload.get("workflow_key").and_then(|v| v.as_str()) { Some(k) => k, None => { tracing::warn!("AI 行动工作流事件缺少 workflow_key,跳过"); return; } }; let tenant_id = event.tenant_id; // 查找对应的流程定义 use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; let def = crate::entity::process_definition::Entity::find() .filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id)) .filter(crate::entity::process_definition::Column::Key.eq(workflow_key)) .filter(crate::entity::process_definition::Column::DeletedAt.is_null()) .filter(crate::entity::process_definition::Column::Status.eq("published")) .one(db) .await; let def = match def { Ok(Some(d)) => d, Ok(None) => { tracing::warn!( key = %workflow_key, tenant_id = %tenant_id, "AI 行动工作流定义未找到或未发布,跳过" ); return; } Err(e) => { tracing::warn!(error = %e, "查询工作流定义失败"); return; } }; // 构造启动变量 let risk_level = event .payload .get("risk_level") .and_then(|v| v.as_str()) .unwrap_or("medium") .to_string(); let variables = vec![ crate::dto::SetVariableReq { name: "risk_level".into(), var_type: Some("string".into()), value: serde_json::Value::String(risk_level.clone()), }, crate::dto::SetVariableReq { name: "patient_id".into(), var_type: Some("string".into()), value: event .payload .get("patient_id") .cloned() .unwrap_or(serde_json::Value::Null), }, crate::dto::SetVariableReq { name: "action_type".into(), var_type: Some("string".into()), value: event .payload .get("action_type") .and_then(|v| v.as_str()) .map(|s| serde_json::Value::String(s.to_string())) .unwrap_or(serde_json::Value::Null), }, crate::dto::SetVariableReq { name: "params".into(), var_type: Some("string".into()), value: event .payload .get("params") .cloned() .unwrap_or(serde_json::Value::Null), }, ]; let req = crate::dto::StartInstanceReq { definition_id: def.id, business_key: Some(format!( "ai_action_{}", chrono::Utc::now().timestamp_millis() )), variables: Some(variables), }; let system_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(); match crate::service::instance_service::InstanceService::start( tenant_id, system_id, &req, db, event_bus, ) .await { Ok(instance) => { tracing::info!( key = %workflow_key, instance_id = %instance.id, tenant_id = %tenant_id, risk_level = %risk_level, "AI 行动工作流实例已启动" ); } Err(e) => { tracing::warn!( key = %workflow_key, error = %e, "AI 行动工作流实例启动失败" ); } } } impl Default for WorkflowModule { fn default() -> Self { Self::new() } } #[async_trait::async_trait] impl ErpModule for WorkflowModule { fn name(&self) -> &str { "workflow" } fn version(&self) -> &str { env!("CARGO_PKG_VERSION") } fn dependencies(&self) -> Vec<&str> { vec!["auth"] } fn register_event_handlers(&self, _bus: &EventBus) { // 事件处理器已迁移到 on_startup(需要 DB 连接),此处保留空实现以兼容 trait 签名 } async fn on_startup( &self, ctx: &erp_core::module::ModuleContext, ) -> erp_core::error::AppResult<()> { let db = ctx.db.clone(); let bus = ctx.event_bus.clone(); // 订阅 user. 前缀事件,处理 user.deleted let (mut receiver, _handle) = bus.subscribe_filtered("user.".to_string()); tokio::spawn(async move { loop { match receiver.recv().await { Some(event) if event.event_type == "user.deleted" => { let user_id = match event.payload.get("user_id").and_then(|v| v.as_str()) { Some(id) => match Uuid::parse_str(id) { Ok(u) => u, Err(e) => { tracing::warn!( error = %e, "user.deleted 事件的 user_id 解析失败,跳过" ); continue; } }, _ => { tracing::warn!("user.deleted 事件缺少 user_id 字段,跳过"); continue; } }; tracing::info!( user_id = %user_id, tenant_id = %event.tenant_id, "收到 user.deleted 事件,查找并终止相关流程实例" ); // 查找该用户有活跃任务的流程实例 use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, }; // 查找该用户作为 assignee 的 pending 任务 let active_tasks = crate::entity::task::Entity::find() .filter(crate::entity::task::Column::TenantId.eq(event.tenant_id)) .filter(crate::entity::task::Column::AssigneeId.eq(user_id)) .filter(crate::entity::task::Column::Status.eq("pending")) .filter(crate::entity::task::Column::DeletedAt.is_null()) .all(&db) .await; match active_tasks { Ok(tasks) if tasks.is_empty() => { tracing::info!( user_id = %user_id, "该用户没有活跃的待办任务,无需终止流程" ); } Ok(tasks) => { // 收集需要终止的实例 ID let instance_ids: std::collections::HashSet = tasks.iter().map(|t| t.instance_id).collect(); for instance_id in &instance_ids { // 将实例状态设置为 terminated let instance = crate::entity::process_instance::Entity::find_by_id( *instance_id, ) .one(&db) .await; if let Ok(Some(inst)) = instance && inst.tenant_id == event.tenant_id && inst.deleted_at.is_none() && inst.status == "running" { let ver = inst.version; let mut active: crate::entity::process_instance::ActiveModel = inst.into(); active.status = Set("terminated".to_string()); active.updated_at = Set(Utc::now()); active.version = Set(ver + 1); match active.update(&db).await { Ok(_) => { tracing::info!( instance_id = %instance_id, "流程实例已终止(用户被删除)" ); } Err(e) => { tracing::warn!( instance_id = %instance_id, error = %e, "终止流程实例失败" ); } } } } tracing::info!( user_id = %user_id, instance_count = instance_ids.len(), task_count = tasks.len(), "用户删除事件处理完成" ); } Err(e) => { tracing::warn!( error = %e, "查询用户活跃任务失败" ); } } } Some(event) => { // 其他 user. 前缀事件,忽略 tracing::debug!( event_type = %event.event_type, "忽略非 user.deleted 事件" ); } None => { // 通道关闭,退出循环 tracing::info!("Workflow 事件订阅通道已关闭"); break; } } } }); tracing::info!( module = "workflow", "Workflow 事件处理器已注册(监听 user.deleted)" ); // 订阅 AI 行动工作流启动请求 let (mut ai_rx, _ai_handle) = bus.subscribe_filtered("workflow.ai_action.".to_string()); let ai_db = ctx.db.clone(); let ai_bus = bus.clone(); tokio::spawn(async move { loop { match ai_rx.recv().await { Some(event) if event.event_type == "workflow.ai_action.start_requested" => { handle_ai_action_start(&ai_db, &ai_bus, &event).await; } Some(_) => {} None => { tracing::info!("AI 行动工作流事件订阅通道已关闭"); break; } } } }); Ok(()) } async fn on_tenant_created( &self, _tenant_id: Uuid, _db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult<()> { Ok(()) } async fn on_tenant_deleted( &self, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult<()> { use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; // Delete in dependency order: variables → tasks → tokens → instances → definitions // process_variables crate::entity::process_variable::Entity::delete_many() .filter(crate::entity::process_variable::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // tasks crate::entity::task::Entity::delete_many() .filter(crate::entity::task::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // tokens crate::entity::token::Entity::delete_many() .filter(crate::entity::token::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // process_instances crate::entity::process_instance::Entity::delete_many() .filter(crate::entity::process_instance::Column::TenantId.eq(tenant_id)) .exec(db) .await?; // process_definitions crate::entity::process_definition::Entity::delete_many() .filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id)) .exec(db) .await?; tracing::info!(%tenant_id, "Workflow data cleaned up for deleted tenant"); Ok(()) } fn permissions(&self) -> Vec { vec![ PermissionDescriptor { code: "workflow.create".into(), name: "创建流程".into(), description: "创建流程定义".into(), module: "workflow".into(), }, PermissionDescriptor { code: "workflow.list".into(), name: "查看流程".into(), description: "查看流程列表".into(), module: "workflow".into(), }, PermissionDescriptor { code: "workflow.read".into(), name: "查看流程详情".into(), description: "查看流程定义详情".into(), module: "workflow".into(), }, PermissionDescriptor { code: "workflow.update".into(), name: "编辑流程".into(), description: "编辑流程定义".into(), module: "workflow".into(), }, PermissionDescriptor { code: "workflow.publish".into(), name: "发布流程".into(), description: "发布流程定义".into(), module: "workflow".into(), }, PermissionDescriptor { code: "workflow.start".into(), name: "发起流程".into(), description: "发起流程实例".into(), module: "workflow".into(), }, PermissionDescriptor { code: "workflow.approve".into(), name: "审批任务".into(), description: "审批流程任务".into(), module: "workflow".into(), }, PermissionDescriptor { code: "workflow.delegate".into(), name: "委派任务".into(), description: "委派流程任务".into(), module: "workflow".into(), }, ] } fn as_any(&self) -> &dyn std::any::Any { self } }