Files
hms/crates/erp-workflow/src/module.rs
iven 6d5a711d2c
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix: 修复测试发现的 7 个问题 + 全 workspace clippy 清零
功能修复:
1. 患者创建空名称验证:后端添加 name.trim().is_empty() 检查
2. 仪表盘统计容错:单个查询失败返回零值而非 500
3. FHIR 路由修复:从 /fhir 移到 /api/v1/fhir 保持一致
4. 冻结模块后端中间件:新增 frozen_module_middleware 拦截冻结路径
5. 积分端点权限码:health.health-data.list → health.points.list
6. 角色权限迁移:护士补充 devices.list,运营补充 points.list/manage
7. 测试结果文档:R01-R05 角色测试 + T00/T10 结果归档

Clippy 全 workspace 清零(14→0 errors):
- erp-core: 修复 empty doc line、collapsible if、redundant closure 等 9 处
- erp-health: 修复 too_many_arguments、unused var、unnecessary parens 等 58 处
- erp-ai: 修复 dead_code、unused import 等 11 处
- erp-plugin: 修复 too_many_arguments、wildcard pattern 等 11 处
- erp-server-migration: 修复 enum_variant_names 5 处
- erp-auth/config/workflow/message: 各 1-3 处

工程改进:
- lint-staged 配置迁移到 .lintstagedrc.js(函数式避免文件列表传给 clippy)
- cargo fmt 统一格式化
2026-05-07 23:43:14 +08:00

550 lines
20 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::Router;
use axum::routing::{get, post, put};
use std::time::Duration;
use uuid::Uuid;
use erp_core::error::AppResult;
use erp_core::events::EventBus;
use erp_core::module::{ErpModule, PermissionDescriptor};
use crate::handler::{definition_handler, instance_handler, task_handler};
/// Workflow module implementing the `ErpModule` trait.
///
/// Manages workflow definitions, process instances, tasks,
/// and the token-driven execution engine.
pub struct WorkflowModule;
impl WorkflowModule {
pub fn new() -> Self {
Self
}
/// Build protected (authenticated) routes for the workflow module.
pub fn protected_routes<S>() -> Router<S>
where
crate::workflow_state::WorkflowState: axum::extract::FromRef<S>,
S: Clone + Send + Sync + 'static,
{
Router::new()
// Definition routes
.route(
"/workflow/definitions",
get(definition_handler::list_definitions)
.post(definition_handler::create_definition),
)
.route(
"/workflow/definitions/{id}",
get(definition_handler::get_definition).put(definition_handler::update_definition),
)
.route(
"/workflow/definitions/{id}/publish",
post(definition_handler::publish_definition),
)
.route(
"/workflow/definitions/{id}/deprecate",
post(definition_handler::deprecate_definition),
)
// Instance routes
.route(
"/workflow/instances",
post(instance_handler::start_instance).get(instance_handler::list_instances),
)
.route(
"/workflow/instances/{id}",
get(instance_handler::get_instance),
)
.route(
"/workflow/instances/{id}/suspend",
post(instance_handler::suspend_instance),
)
.route(
"/workflow/instances/{id}/resume",
post(instance_handler::resume_instance),
)
.route(
"/workflow/instances/{id}/terminate",
post(instance_handler::terminate_instance),
)
// Task routes
.route(
"/workflow/tasks/pending",
get(task_handler::list_pending_tasks),
)
.route(
"/workflow/tasks/completed",
get(task_handler::list_completed_tasks),
)
.route(
"/workflow/tasks/{id}/complete",
post(task_handler::complete_task),
)
.route(
"/workflow/tasks/{id}/delegate",
post(task_handler::delegate_task),
)
.route("/workflow/tasks/{id}/claim", put(task_handler::claim_task))
}
/// 启动超时检查后台任务。
///
/// 每 60 秒扫描一次 tasks 表,查找 due_date 已过期但仍处于 pending 状态的任务。
/// 发现超时任务时发布 `task.timeout` 事件到事件总线,并记录 warning 日志。
pub fn start_timeout_checker(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
// 首次跳过,等一个完整间隔再执行
interval.tick().await;
loop {
interval.tick().await;
match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks_with_details(
&db,
)
.await
{
Ok(overdue) => {
if !overdue.is_empty() {
tracing::warn!(
count = overdue.len(),
"发现超时未完成的任务,发布 task.timeout 事件"
);
for (task_id, tenant_id, instance_id, assignee_id) in &overdue {
// 发布超时事件
let event = erp_core::events::DomainEvent::new(
"task.timeout",
*tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"task_id": task_id,
"instance_id": instance_id,
"assignee_id": assignee_id,
})),
);
event_bus.publish(event, &db).await;
}
}
}
Err(e) => {
tracing::warn!(error = %e, "超时检查任务执行失败");
}
}
}
});
}
}
/// 处理 AI 行动工作流启动请求
async fn handle_ai_action_start(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
event: &erp_core::events::DomainEvent,
) {
let workflow_key = match event.payload.get("workflow_key").and_then(|v| v.as_str()) {
Some(k) => k,
None => {
tracing::warn!("AI 行动工作流事件缺少 workflow_key跳过");
return;
}
};
let tenant_id = event.tenant_id;
// 查找对应的流程定义
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let def = crate::entity::process_definition::Entity::find()
.filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id))
.filter(crate::entity::process_definition::Column::Key.eq(workflow_key))
.filter(crate::entity::process_definition::Column::DeletedAt.is_null())
.filter(crate::entity::process_definition::Column::Status.eq("published"))
.one(db)
.await;
let def = match def {
Ok(Some(d)) => d,
Ok(None) => {
tracing::warn!(
key = %workflow_key,
tenant_id = %tenant_id,
"AI 行动工作流定义未找到或未发布,跳过"
);
return;
}
Err(e) => {
tracing::warn!(error = %e, "查询工作流定义失败");
return;
}
};
// 构造启动变量
let risk_level = event
.payload
.get("risk_level")
.and_then(|v| v.as_str())
.unwrap_or("medium")
.to_string();
let variables = vec![
crate::dto::SetVariableReq {
name: "risk_level".into(),
var_type: Some("string".into()),
value: serde_json::Value::String(risk_level.clone()),
},
crate::dto::SetVariableReq {
name: "patient_id".into(),
var_type: Some("string".into()),
value: event
.payload
.get("patient_id")
.cloned()
.unwrap_or(serde_json::Value::Null),
},
crate::dto::SetVariableReq {
name: "action_type".into(),
var_type: Some("string".into()),
value: event
.payload
.get("action_type")
.and_then(|v| v.as_str())
.map(|s| serde_json::Value::String(s.to_string()))
.unwrap_or(serde_json::Value::Null),
},
crate::dto::SetVariableReq {
name: "params".into(),
var_type: Some("string".into()),
value: event
.payload
.get("params")
.cloned()
.unwrap_or(serde_json::Value::Null),
},
];
let req = crate::dto::StartInstanceReq {
definition_id: def.id,
business_key: Some(format!(
"ai_action_{}",
chrono::Utc::now().timestamp_millis()
)),
variables: Some(variables),
};
let system_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap();
match crate::service::instance_service::InstanceService::start(
tenant_id, system_id, &req, db, event_bus,
)
.await
{
Ok(instance) => {
tracing::info!(
key = %workflow_key,
instance_id = %instance.id,
tenant_id = %tenant_id,
risk_level = %risk_level,
"AI 行动工作流实例已启动"
);
}
Err(e) => {
tracing::warn!(
key = %workflow_key,
error = %e,
"AI 行动工作流实例启动失败"
);
}
}
}
impl Default for WorkflowModule {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ErpModule for WorkflowModule {
fn name(&self) -> &str {
"workflow"
}
fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
fn dependencies(&self) -> Vec<&str> {
vec!["auth"]
}
fn register_event_handlers(&self, _bus: &EventBus) {
// 事件处理器已迁移到 on_startup需要 DB 连接),此处保留空实现以兼容 trait 签名
}
async fn on_startup(
&self,
ctx: &erp_core::module::ModuleContext,
) -> erp_core::error::AppResult<()> {
let db = ctx.db.clone();
let bus = ctx.event_bus.clone();
// 订阅 user. 前缀事件,处理 user.deleted
let (mut receiver, _handle) = bus.subscribe_filtered("user.".to_string());
tokio::spawn(async move {
loop {
match receiver.recv().await {
Some(event) if event.event_type == "user.deleted" => {
let user_id = match event.payload.get("user_id").and_then(|v| v.as_str()) {
Some(id) => match Uuid::parse_str(id) {
Ok(u) => u,
Err(e) => {
tracing::warn!(
error = %e,
"user.deleted 事件的 user_id 解析失败,跳过"
);
continue;
}
},
_ => {
tracing::warn!("user.deleted 事件缺少 user_id 字段,跳过");
continue;
}
};
tracing::info!(
user_id = %user_id,
tenant_id = %event.tenant_id,
"收到 user.deleted 事件,查找并终止相关流程实例"
);
// 查找该用户有活跃任务的流程实例
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set,
};
// 查找该用户作为 assignee 的 pending 任务
let active_tasks = crate::entity::task::Entity::find()
.filter(crate::entity::task::Column::TenantId.eq(event.tenant_id))
.filter(crate::entity::task::Column::AssigneeId.eq(user_id))
.filter(crate::entity::task::Column::Status.eq("pending"))
.filter(crate::entity::task::Column::DeletedAt.is_null())
.all(&db)
.await;
match active_tasks {
Ok(tasks) if tasks.is_empty() => {
tracing::info!(
user_id = %user_id,
"该用户没有活跃的待办任务,无需终止流程"
);
}
Ok(tasks) => {
// 收集需要终止的实例 ID
let instance_ids: std::collections::HashSet<Uuid> =
tasks.iter().map(|t| t.instance_id).collect();
for instance_id in &instance_ids {
// 将实例状态设置为 terminated
let instance =
crate::entity::process_instance::Entity::find_by_id(
*instance_id,
)
.one(&db)
.await;
if let Ok(Some(inst)) = instance
&& inst.tenant_id == event.tenant_id
&& inst.deleted_at.is_none()
&& inst.status == "running"
{
let ver = inst.version;
let mut active: crate::entity::process_instance::ActiveModel = inst.into();
active.status = Set("terminated".to_string());
active.updated_at = Set(Utc::now());
active.version = Set(ver + 1);
match active.update(&db).await {
Ok(_) => {
tracing::info!(
instance_id = %instance_id,
"流程实例已终止(用户被删除)"
);
}
Err(e) => {
tracing::warn!(
instance_id = %instance_id,
error = %e,
"终止流程实例失败"
);
}
}
}
}
tracing::info!(
user_id = %user_id,
instance_count = instance_ids.len(),
task_count = tasks.len(),
"用户删除事件处理完成"
);
}
Err(e) => {
tracing::warn!(
error = %e,
"查询用户活跃任务失败"
);
}
}
}
Some(event) => {
// 其他 user. 前缀事件,忽略
tracing::debug!(
event_type = %event.event_type,
"忽略非 user.deleted 事件"
);
}
None => {
// 通道关闭,退出循环
tracing::info!("Workflow 事件订阅通道已关闭");
break;
}
}
}
});
tracing::info!(
module = "workflow",
"Workflow 事件处理器已注册(监听 user.deleted"
);
// 订阅 AI 行动工作流启动请求
let (mut ai_rx, _ai_handle) = bus.subscribe_filtered("workflow.ai_action.".to_string());
let ai_db = ctx.db.clone();
let ai_bus = bus.clone();
tokio::spawn(async move {
loop {
match ai_rx.recv().await {
Some(event) if event.event_type == "workflow.ai_action.start_requested" => {
handle_ai_action_start(&ai_db, &ai_bus, &event).await;
}
Some(_) => {}
None => {
tracing::info!("AI 行动工作流事件订阅通道已关闭");
break;
}
}
}
});
Ok(())
}
async fn on_tenant_created(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
Ok(())
}
async fn on_tenant_deleted(
&self,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
// Delete in dependency order: variables → tasks → tokens → instances → definitions
// process_variables
crate::entity::process_variable::Entity::delete_many()
.filter(crate::entity::process_variable::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// tasks
crate::entity::task::Entity::delete_many()
.filter(crate::entity::task::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// tokens
crate::entity::token::Entity::delete_many()
.filter(crate::entity::token::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// process_instances
crate::entity::process_instance::Entity::delete_many()
.filter(crate::entity::process_instance::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
// process_definitions
crate::entity::process_definition::Entity::delete_many()
.filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id))
.exec(db)
.await?;
tracing::info!(%tenant_id, "Workflow data cleaned up for deleted tenant");
Ok(())
}
fn permissions(&self) -> Vec<PermissionDescriptor> {
vec![
PermissionDescriptor {
code: "workflow.create".into(),
name: "创建流程".into(),
description: "创建流程定义".into(),
module: "workflow".into(),
},
PermissionDescriptor {
code: "workflow.list".into(),
name: "查看流程".into(),
description: "查看流程列表".into(),
module: "workflow".into(),
},
PermissionDescriptor {
code: "workflow.read".into(),
name: "查看流程详情".into(),
description: "查看流程定义详情".into(),
module: "workflow".into(),
},
PermissionDescriptor {
code: "workflow.update".into(),
name: "编辑流程".into(),
description: "编辑流程定义".into(),
module: "workflow".into(),
},
PermissionDescriptor {
code: "workflow.publish".into(),
name: "发布流程".into(),
description: "发布流程定义".into(),
module: "workflow".into(),
},
PermissionDescriptor {
code: "workflow.start".into(),
name: "发起流程".into(),
description: "发起流程实例".into(),
module: "workflow".into(),
},
PermissionDescriptor {
code: "workflow.approve".into(),
name: "审批任务".into(),
description: "审批流程任务".into(),
module: "workflow".into(),
},
PermissionDescriptor {
code: "workflow.delegate".into(),
name: "委派任务".into(),
description: "委派流程任务".into(),
module: "workflow".into(),
},
]
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}