Files
hms/crates/erp-workflow/src/module.rs
iven 0a4825be99
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
feat(health+workflow): 行动分发→工作流启动集成 — 事件驱动 BPMN 实例化
- create_pending_action 新增 workflow.ai_action.start_requested 事件发布
- 根据 action_type 映射到对应 BPMN 流程定义 key
- erp-workflow 消费启动请求,自动创建审批流程实例
- 流程变量包含 risk_level/patient_id/action_type/params
2026-05-01 08:53:57 +08:00

495 lines
19 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::Router;
use axum::routing::{get, post, put};
use std::time::Duration;
use uuid::Uuid;
use erp_core::error::AppResult;
use erp_core::events::EventBus;
use erp_core::module::{ErpModule, PermissionDescriptor};
use crate::handler::{definition_handler, instance_handler, task_handler};
/// Workflow module implementing the `ErpModule` trait.
///
/// Manages workflow definitions, process instances, tasks,
/// and the token-driven execution engine.
pub struct WorkflowModule;
impl WorkflowModule {
pub fn new() -> Self {
Self
}
/// Build protected (authenticated) routes for the workflow module.
pub fn protected_routes<S>() -> Router<S>
where
crate::workflow_state::WorkflowState: axum::extract::FromRef<S>,
S: Clone + Send + Sync + 'static,
{
Router::new()
// Definition routes
.route(
"/workflow/definitions",
get(definition_handler::list_definitions)
.post(definition_handler::create_definition),
)
.route(
"/workflow/definitions/{id}",
get(definition_handler::get_definition).put(definition_handler::update_definition),
)
.route(
"/workflow/definitions/{id}/publish",
post(definition_handler::publish_definition),
)
.route(
"/workflow/definitions/{id}/deprecate",
post(definition_handler::deprecate_definition),
)
// Instance routes
.route(
"/workflow/instances",
post(instance_handler::start_instance).get(instance_handler::list_instances),
)
.route(
"/workflow/instances/{id}",
get(instance_handler::get_instance),
)
.route(
"/workflow/instances/{id}/suspend",
post(instance_handler::suspend_instance),
)
.route(
"/workflow/instances/{id}/resume",
post(instance_handler::resume_instance),
)
.route(
"/workflow/instances/{id}/terminate",
post(instance_handler::terminate_instance),
)
// Task routes
.route(
"/workflow/tasks/pending",
get(task_handler::list_pending_tasks),
)
.route(
"/workflow/tasks/completed",
get(task_handler::list_completed_tasks),
)
.route(
"/workflow/tasks/{id}/complete",
post(task_handler::complete_task),
)
.route(
"/workflow/tasks/{id}/delegate",
post(task_handler::delegate_task),
)
.route(
"/workflow/tasks/{id}/claim",
put(task_handler::claim_task),
)
}
/// 启动超时检查后台任务。
///
/// 每 60 秒扫描一次 tasks 表,查找 due_date 已过期但仍处于 pending 状态的任务。
/// 发现超时任务时发布 `task.timeout` 事件到事件总线,并记录 warning 日志。
pub fn start_timeout_checker(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
// 首次跳过,等一个完整间隔再执行
interval.tick().await;
loop {
interval.tick().await;
match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks_with_details(&db).await {
Ok(overdue) => {
if !overdue.is_empty() {
tracing::warn!(
count = overdue.len(),
"发现超时未完成的任务,发布 task.timeout 事件"
);
for (task_id, tenant_id, instance_id, assignee_id) in &overdue {
// 发布超时事件
let event = erp_core::events::DomainEvent::new(
"task.timeout",
*tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"task_id": task_id,
"instance_id": instance_id,
"assignee_id": assignee_id,
})),
);
event_bus.publish(event, &db).await;
}
}
}
Err(e) => {
tracing::warn!(error = %e, "超时检查任务执行失败");
}
}
}
});
}
}
/// 处理 AI 行动工作流启动请求
async fn handle_ai_action_start(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
event: &erp_core::events::DomainEvent,
) {
let workflow_key = match event.payload.get("workflow_key").and_then(|v| v.as_str()) {
Some(k) => k,
None => {
tracing::warn!("AI 行动工作流事件缺少 workflow_key跳过");
return;
}
};
let tenant_id = event.tenant_id;
// 查找对应的流程定义
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let def = crate::entity::process_definition::Entity::find()
.filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id))
.filter(crate::entity::process_definition::Column::Key.eq(workflow_key))
.filter(crate::entity::process_definition::Column::DeletedAt.is_null())
.filter(crate::entity::process_definition::Column::Status.eq("published"))
.one(db)
.await;
let def = match def {
Ok(Some(d)) => d,
Ok(None) => {
tracing::warn!(
key = %workflow_key,
tenant_id = %tenant_id,
"AI 行动工作流定义未找到或未发布,跳过"
);
return;
}
Err(e) => {
tracing::warn!(error = %e, "查询工作流定义失败");
return;
}
};
// 构造启动变量
let risk_level = event.payload.get("risk_level")
.and_then(|v| v.as_str())
.unwrap_or("medium")
.to_string();
let variables = vec![
crate::dto::SetVariableReq {
name: "risk_level".into(),
var_type: Some("string".into()),
value: serde_json::Value::String(risk_level.clone()),
},
crate::dto::SetVariableReq {
name: "patient_id".into(),
var_type: Some("string".into()),
value: event.payload.get("patient_id")
.cloned()
.unwrap_or(serde_json::Value::Null),
},
crate::dto::SetVariableReq {
name: "action_type".into(),
var_type: Some("string".into()),
value: event.payload.get("action_type")
.and_then(|v| v.as_str())
.map(|s| serde_json::Value::String(s.to_string()))
.unwrap_or(serde_json::Value::Null),
},
crate::dto::SetVariableReq {
name: "params".into(),
var_type: Some("string".into()),
value: event.payload.get("params")
.cloned()
.unwrap_or(serde_json::Value::Null),
},
];
let req = crate::dto::StartInstanceReq {
definition_id: def.id,
business_key: Some(format!("ai_action_{}", chrono::Utc::now().timestamp_millis())),
variables: Some(variables),
};
let system_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap();
match crate::service::instance_service::InstanceService::start(
tenant_id,
system_id,
&req,
db,
event_bus,
)
.await
{
Ok(instance) => {
tracing::info!(
key = %workflow_key,
instance_id = %instance.id,
tenant_id = %tenant_id,
risk_level = %risk_level,
"AI 行动工作流实例已启动"
);
}
Err(e) => {
tracing::warn!(
key = %workflow_key,
error = %e,
"AI 行动工作流实例启动失败"
);
}
}
}
impl Default for WorkflowModule {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ErpModule for WorkflowModule {
fn name(&self) -> &str {
"workflow"
}
fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
fn dependencies(&self) -> Vec<&str> {
vec!["auth"]
}
fn register_event_handlers(&self, _bus: &EventBus) {
// 事件处理器已迁移到 on_startup需要 DB 连接),此处保留空实现以兼容 trait 签名
}
async fn on_startup(
&self,
ctx: &erp_core::module::ModuleContext,
) -> erp_core::error::AppResult<()> {
let db = ctx.db.clone();
let bus = ctx.event_bus.clone();
// 订阅 user. 前缀事件,处理 user.deleted
let (mut receiver, _handle) = bus.subscribe_filtered("user.".to_string());
tokio::spawn(async move {
loop {
match receiver.recv().await {
Some(event) if event.event_type == "user.deleted" => {
let user_id = match event.payload.get("user_id").and_then(|v| v.as_str()) {
Some(id) => match Uuid::parse_str(id) {
Ok(u) => u,
Err(e) => {
tracing::warn!(
error = %e,
"user.deleted 事件的 user_id 解析失败,跳过"
);
continue;
}
},
_ => {
tracing::warn!("user.deleted 事件缺少 user_id 字段,跳过");
continue;
}
};
tracing::info!(
user_id = %user_id,
tenant_id = %event.tenant_id,
"收到 user.deleted 事件,查找并终止相关流程实例"
);
// 查找该用户有活跃任务的流程实例
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set};
use chrono::Utc;
// 查找该用户作为 assignee 的 pending 任务
let active_tasks = crate::entity::task::Entity::find()
.filter(crate::entity::task::Column::TenantId.eq(event.tenant_id))
.filter(crate::entity::task::Column::AssigneeId.eq(user_id))
.filter(crate::entity::task::Column::Status.eq("pending"))
.filter(crate::entity::task::Column::DeletedAt.is_null())
.all(&db)
.await;
match active_tasks {
Ok(tasks) if tasks.is_empty() => {
tracing::info!(
user_id = %user_id,
"该用户没有活跃的待办任务,无需终止流程"
);
}
Ok(tasks) => {
// 收集需要终止的实例 ID
let instance_ids: std::collections::HashSet<Uuid> =
tasks.iter().map(|t| t.instance_id).collect();
for instance_id in &instance_ids {
// 将实例状态设置为 terminated
let instance = crate::entity::process_instance::Entity::find_by_id(*instance_id)
.one(&db)
.await;
if let Ok(Some(inst)) = instance {
if inst.tenant_id == event.tenant_id
&& inst.deleted_at.is_none()
&& inst.status == "running"
{
let ver = inst.version;
let mut active: crate::entity::process_instance::ActiveModel = inst.into();
active.status = Set("terminated".to_string());
active.updated_at = Set(Utc::now());
active.version = Set(ver + 1);
match active.update(&db).await {
Ok(_) => {
tracing::info!(
instance_id = %instance_id,
"流程实例已终止(用户被删除)"
);
}
Err(e) => {
tracing::warn!(
instance_id = %instance_id,
error = %e,
"终止流程实例失败"
);
}
}
}
}
}
tracing::info!(
user_id = %user_id,
instance_count = instance_ids.len(),
task_count = tasks.len(),
"用户删除事件处理完成"
);
}
Err(e) => {
tracing::warn!(
error = %e,
"查询用户活跃任务失败"
);
}
}
}
Some(event) => {
// 其他 user. 前缀事件,忽略
tracing::debug!(
event_type = %event.event_type,
"忽略非 user.deleted 事件"
);
}
None => {
// 通道关闭,退出循环
tracing::info!("Workflow 事件订阅通道已关闭");
break;
}
}
}
});
tracing::info!(module = "workflow", "Workflow 事件处理器已注册(监听 user.deleted");
// 订阅 AI 行动工作流启动请求
let (mut ai_rx, _ai_handle) = bus.subscribe_filtered("workflow.ai_action.".to_string());
let ai_db = ctx.db.clone();
let ai_bus = bus.clone();
tokio::spawn(async move {
loop {
match ai_rx.recv().await {
Some(event) if event.event_type == "workflow.ai_action.start_requested" => {
handle_ai_action_start(&ai_db, &ai_bus, &event).await;
}
Some(_) => {}
None => {
tracing::info!("AI 行动工作流事件订阅通道已关闭");
break;
}
}
}
});
Ok(())
}
async fn on_tenant_created(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
Ok(())
}
async fn on_tenant_deleted(
&self,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
// Delete in dependency order: variables → tasks → tokens → instances → definitions
// process_variables
crate::entity::process_variable::Entity::delete_many()
.filter(crate::entity::process_variable::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// tasks
crate::entity::task::Entity::delete_many()
.filter(crate::entity::task::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// tokens
crate::entity::token::Entity::delete_many()
.filter(crate::entity::token::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// process_instances
crate::entity::process_instance::Entity::delete_many()
.filter(crate::entity::process_instance::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// process_definitions
crate::entity::process_definition::Entity::delete_many()
.filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
tracing::info!(%tenant_id, "Workflow data cleaned up for deleted tenant");
Ok(())
}
fn permissions(&self) -> Vec<PermissionDescriptor> {
vec![
PermissionDescriptor { code: "workflow.create".into(), name: "创建流程".into(), description: "创建流程定义".into(), module: "workflow".into() },
PermissionDescriptor { code: "workflow.list".into(), name: "查看流程".into(), description: "查看流程列表".into(), module: "workflow".into() },
PermissionDescriptor { code: "workflow.read".into(), name: "查看流程详情".into(), description: "查看流程定义详情".into(), module: "workflow".into() },
PermissionDescriptor { code: "workflow.update".into(), name: "编辑流程".into(), description: "编辑流程定义".into(), module: "workflow".into() },
PermissionDescriptor { code: "workflow.publish".into(), name: "发布流程".into(), description: "发布流程定义".into(), module: "workflow".into() },
PermissionDescriptor { code: "workflow.start".into(), name: "发起流程".into(), description: "发起流程实例".into(), module: "workflow".into() },
PermissionDescriptor { code: "workflow.approve".into(), name: "审批任务".into(), description: "审批流程任务".into(), module: "workflow".into() },
PermissionDescriptor { code: "workflow.delegate".into(), name: "委派任务".into(), description: "委派流程任务".into(), module: "workflow".into() },
]
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}