Files
hms/crates/erp-server/src/dialysis_workflow.rs
iven 212c08b7ae feat(health,ai): 后端服务优化 + 媒体文件处理
- erp-health: article/banner/consultation/media 服务层优化
- erp-ai: analysis/insight/prompt 服务增强
- erp-auth: auth/role/token 服务改进
- erp-workflow: executor 执行引擎修复
- erp-plugin: 服务层改进
- 新增媒体上传文件样例
2026-05-13 23:28:57 +08:00

191 lines
7.2 KiB
Rust

use erp_core::events::EventBus;
use sea_orm::ActiveValue::Set;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};
use uuid::Uuid;
/// 启动透析会话工作流编排器
/// 订阅 dialysis.record.created → 自动查找并启动 dialysis_session BPMN 工作流
pub fn start_dialysis_workflow_orchestrator(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let (mut receiver, _handle) = event_bus.subscribe_filtered("dialysis.".to_string());
tokio::spawn(async move {
loop {
match receiver.recv().await {
Some(event) if event.event_type == "dialysis.record.created" => {
if let Err(e) = handle_dialysis_record_created(&db, &event_bus, &event).await {
tracing::warn!(
error = %e,
record_id = ?event.payload.get("record_id"),
"透析会话工作流启动失败"
);
}
}
Some(_) => {}
None => {
tracing::debug!("透析工作流事件接收器关闭");
break;
}
}
}
});
}
async fn handle_dialysis_record_created(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
event: &erp_core::events::DomainEvent,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let record_id = event
.payload
.get("record_id")
.and_then(|v| v.as_str())
.ok_or("缺少 record_id")?;
let record_uuid = Uuid::parse_str(record_id)?;
// 查找 dialysis_session 流程定义
let definition = erp_workflow::entity::process_definition::Entity::find()
.filter(erp_workflow::entity::process_definition::Column::Key.eq("dialysis_session"))
.filter(erp_workflow::entity::process_definition::Column::TenantId.eq(event.tenant_id))
.filter(erp_workflow::entity::process_definition::Column::Status.eq("published"))
.filter(erp_workflow::entity::process_definition::Column::DeletedAt.is_null())
.one(db)
.await?;
let definition = match definition {
Some(d) => d,
None => {
tracing::debug!(
tenant_id = %event.tenant_id,
"未找到 dialysis_session 流程定义,跳过工作流启动"
);
return Ok(());
}
};
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let req = erp_workflow::dto::StartInstanceReq {
definition_id: definition.id,
business_key: Some(format!("dialysis:{}", record_id)),
variables: Some(vec![
erp_workflow::dto::SetVariableReq {
name: "patient_id".into(),
var_type: Some("string".into()),
value: patient_id.into(),
},
erp_workflow::dto::SetVariableReq {
name: "record_id".into(),
var_type: Some("string".into()),
value: record_id.into(),
},
]),
};
let operator_id = event
.payload
.get("created_by")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.unwrap_or_else(Uuid::nil);
let result = erp_workflow::service::instance_service::InstanceService::start(
event.tenant_id,
operator_id,
&req,
db,
event_bus,
)
.await?;
// 更新 dialysis_record 的 workflow_instance_id
let record = erp_dialysis::entity::dialysis_record::Entity::find_by_id(record_uuid)
.one(db)
.await?
.ok_or("透析记录不存在")?;
let mut active: erp_dialysis::entity::dialysis_record::ActiveModel = record.into();
active.version = Set(active.version.unwrap() + 1);
active.workflow_instance_id = Set(Some(result.id));
active.updated_at = Set(chrono::Utc::now());
active.update(db).await?;
tracing::info!(
record_id = %record_id,
instance_id = %result.id,
"透析会话工作流已启动"
);
Ok(())
}
/// 为指定租户种子透析会话 BPMN 流程定义
/// 流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核
pub async fn seed_dialysis_session_workflow(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
operator_id: Uuid,
) -> Result<(), sea_orm::DbErr> {
use erp_workflow::entity::process_definition;
let existing = process_definition::Entity::find()
.filter(process_definition::Column::TenantId.eq(tenant_id))
.filter(process_definition::Column::Key.eq("dialysis_session"))
.filter(process_definition::Column::DeletedAt.is_null())
.one(db)
.await?;
if existing.is_some() {
tracing::debug!(tenant_id = %tenant_id, "dialysis_session 流程定义已存在");
return Ok(());
}
let nodes = serde_json::json!([
{"id": "start", "type": "StartEvent", "name": "开始"},
{"id": "pre_assessment", "type": "UserTask", "name": "透前评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "start_dialysis", "type": "UserTask", "name": "上机确认", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "monitoring", "type": "UserTask", "name": "透中监测", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "post_assessment", "type": "UserTask", "name": "透后评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "review", "type": "UserTask", "name": "医生审核", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_doctor"]},
{"id": "end", "type": "EndEvent", "name": "结束"}
]);
let edges = serde_json::json!([
{"id": "e1", "from": "start", "to": "pre_assessment"},
{"id": "e2", "from": "pre_assessment", "to": "start_dialysis"},
{"id": "e3", "from": "start_dialysis", "to": "monitoring"},
{"id": "e4", "from": "monitoring", "to": "post_assessment"},
{"id": "e5", "from": "post_assessment", "to": "review"},
{"id": "e6", "from": "review", "to": "end"}
]);
let now = chrono::Utc::now();
let active = process_definition::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
name: Set("透析会话工作流".into()),
key: Set("dialysis_session".into()),
version: Set(1),
category: Set(Some("dialysis".into())),
description: Set(Some(
"透析会话标准化流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核".into(),
)),
nodes: Set(nodes),
edges: Set(edges),
status: Set("published".into()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version_field: Set(1),
};
active.insert(db).await?;
tracing::info!(tenant_id = %tenant_id, "透析会话工作流模板已创建并发布");
Ok(())
}