feat(dialysis+workflow): 透析会话 BPMN 工作流集成
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

- dialysis_record 新增 workflow_instance_id 列,关联工作流实例
- 种子 dialysis_session BPMN 流程定义:透前评估→上机确认→透中监测→透后评估→医生审核
- 事件驱动编排器:dialysis.record.created → 自动启动 BPMN 工作流
- 工作流启动后自动回写 instance_id 到透析记录
- 编排器在 erp-server 层实现(遵循星型依赖架构)
This commit is contained in:
iven
2026-05-04 20:38:56 +08:00
parent 7e57565ecd
commit 0a9272bcf6
6 changed files with 252 additions and 0 deletions

View File

@@ -60,6 +60,8 @@ pub struct Model {
pub version: i32,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub key_version: Option<i32>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub workflow_instance_id: Option<Uuid>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -122,6 +122,7 @@ pub async fn create_dialysis_record(
deleted_at: Set(None),
version: Set(1),
key_version: Set(Some(1)),
workflow_instance_id: Set(None),
};
let m = active.insert(&state.db).await?;

View File

@@ -113,6 +113,7 @@ mod m20260504_000110_alter_critical_alerts_version_i32;
mod m20260505_000111_create_care_plan;
mod m20260505_000112_create_shift_management;
mod m20260505_000113_create_ble_gateways;
mod m20260505_000114_dialysis_record_add_workflow_instance;
pub struct Migrator;
@@ -233,6 +234,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260505_000111_create_care_plan::Migration),
Box::new(m20260505_000112_create_shift_management::Migration),
Box::new(m20260505_000113_create_ble_gateways::Migration),
Box::new(m20260505_000114_dialysis_record_add_workflow_instance::Migration),
]
}
}

View File

@@ -0,0 +1,33 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Alias::new("dialysis_record"))
.add_column(
ColumnDef::new(Alias::new("workflow_instance_id"))
.uuid()
.null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Alias::new("dialysis_record"))
.drop_column(Alias::new("workflow_instance_id"))
.to_owned(),
)
.await
}
}

View File

@@ -0,0 +1,204 @@
use erp_core::events::EventBus;
use sea_orm::ActiveValue::Set;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};
use uuid::Uuid;
/// 启动透析会话工作流编排器
/// 订阅 dialysis.record.created → 自动查找并启动 dialysis_session BPMN 工作流
pub fn start_dialysis_workflow_orchestrator(
db: sea_orm::DatabaseConnection,
event_bus: EventBus,
) {
let (mut receiver, _handle) = event_bus.subscribe_filtered("dialysis.".to_string());
tokio::spawn(async move {
loop {
match receiver.recv().await {
Some(event) if event.event_type == "dialysis.record.created" => {
if let Err(e) =
handle_dialysis_record_created(&db, &event_bus, &event).await
{
tracing::warn!(
error = %e,
record_id = ?event.payload.get("record_id"),
"透析会话工作流启动失败"
);
}
}
Some(_) => {}
None => {
tracing::debug!("透析工作流事件接收器关闭");
break;
}
}
}
});
}
async fn handle_dialysis_record_created(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
event: &erp_core::events::DomainEvent,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let record_id = event
.payload
.get("record_id")
.and_then(|v| v.as_str())
.ok_or("缺少 record_id")?;
let record_uuid = Uuid::parse_str(record_id)?;
// 查找 dialysis_session 流程定义
let definition =
erp_workflow::entity::process_definition::Entity::find()
.filter(
erp_workflow::entity::process_definition::Column::Key
.eq("dialysis_session"),
)
.filter(
erp_workflow::entity::process_definition::Column::TenantId
.eq(event.tenant_id),
)
.filter(
erp_workflow::entity::process_definition::Column::Status.eq("published"),
)
.filter(
erp_workflow::entity::process_definition::Column::DeletedAt.is_null(),
)
.one(db)
.await?;
let definition = match definition {
Some(d) => d,
None => {
tracing::debug!(
tenant_id = %event.tenant_id,
"未找到 dialysis_session 流程定义,跳过工作流启动"
);
return Ok(());
}
};
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let req = erp_workflow::dto::StartInstanceReq {
definition_id: definition.id,
business_key: Some(format!("dialysis:{}", record_id)),
variables: Some(vec![
erp_workflow::dto::SetVariableReq {
name: "patient_id".into(),
var_type: Some("string".into()),
value: patient_id.into(),
},
erp_workflow::dto::SetVariableReq {
name: "record_id".into(),
var_type: Some("string".into()),
value: record_id.into(),
},
]),
};
let operator_id = event
.payload
.get("created_by")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.unwrap_or_else(Uuid::nil);
let result = erp_workflow::service::instance_service::InstanceService::start(
event.tenant_id,
operator_id,
&req,
db,
event_bus,
)
.await?;
// 更新 dialysis_record 的 workflow_instance_id
let record = erp_dialysis::entity::dialysis_record::Entity::find_by_id(record_uuid)
.one(db)
.await?
.ok_or("透析记录不存在")?;
let mut active: erp_dialysis::entity::dialysis_record::ActiveModel = record.into();
active.workflow_instance_id = Set(Some(result.id));
active.update(db).await?;
tracing::info!(
record_id = %record_id,
instance_id = %result.id,
"透析会话工作流已启动"
);
Ok(())
}
/// 为指定租户种子透析会话 BPMN 流程定义
/// 流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核
pub async fn seed_dialysis_session_workflow(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
operator_id: Uuid,
) -> Result<(), sea_orm::DbErr> {
use erp_workflow::entity::process_definition;
let existing = process_definition::Entity::find()
.filter(process_definition::Column::TenantId.eq(tenant_id))
.filter(process_definition::Column::Key.eq("dialysis_session"))
.filter(process_definition::Column::DeletedAt.is_null())
.one(db)
.await?;
if existing.is_some() {
tracing::debug!(tenant_id = %tenant_id, "dialysis_session 流程定义已存在");
return Ok(());
}
let nodes = serde_json::json!([
{"id": "start", "type": "StartEvent", "name": "开始"},
{"id": "pre_assessment", "type": "UserTask", "name": "透前评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "start_dialysis", "type": "UserTask", "name": "上机确认", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "monitoring", "type": "UserTask", "name": "透中监测", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "post_assessment", "type": "UserTask", "name": "透后评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]},
{"id": "review", "type": "UserTask", "name": "医生审核", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_doctor"]},
{"id": "end", "type": "EndEvent", "name": "结束"}
]);
let edges = serde_json::json!([
{"id": "e1", "from": "start", "to": "pre_assessment"},
{"id": "e2", "from": "pre_assessment", "to": "start_dialysis"},
{"id": "e3", "from": "start_dialysis", "to": "monitoring"},
{"id": "e4", "from": "monitoring", "to": "post_assessment"},
{"id": "e5", "from": "post_assessment", "to": "review"},
{"id": "e6", "from": "review", "to": "end"}
]);
let now = chrono::Utc::now();
let active = process_definition::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
name: Set("透析会话工作流".into()),
key: Set("dialysis_session".into()),
version: Set(1),
category: Set(Some("dialysis".into())),
description: Set(Some(
"透析会话标准化流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核".into(),
)),
nodes: Set(nodes),
edges: Set(edges),
status: Set("published".into()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version_field: Set(1),
};
active.insert(db).await?;
tracing::info!(tenant_id = %tenant_id, "透析会话工作流模板已创建并发布");
Ok(())
}

View File

@@ -1,5 +1,6 @@
mod config;
mod db;
mod dialysis_workflow;
mod handlers;
mod middleware;
mod outbox;
@@ -295,6 +296,11 @@ async fn main() -> anyhow::Result<()> {
tracing::warn!(error = %e, "Failed to seed AI workflow definitions");
}
// Seed dialysis session workflow definition
if let Err(e) = dialysis_workflow::seed_dialysis_session_workflow(&db, new_tenant_id, new_tenant_id).await {
tracing::warn!(error = %e, "Failed to seed dialysis session workflow");
}
new_tenant_id
}
}
@@ -425,6 +431,10 @@ async fn main() -> anyhow::Result<()> {
erp_plugin::notification::start_notification_listener(db.clone(), event_bus.clone());
tracing::info!("Plugin notification listener started");
// Start dialysis workflow orchestrator (dialysis.record.created → BPMN workflow)
dialysis_workflow::start_dialysis_workflow_orchestrator(db.clone(), event_bus.clone());
tracing::info!("Dialysis workflow orchestrator started");
// Start outbox relay (LISTEN/NOTIFY + fallback poll for pending domain events)
outbox::start_outbox_relay(db.clone(), event_bus.clone(), config.database.url.clone());
tracing::info!("Outbox relay started");