From 0a9272bcf612b5520168fad77d9db36e473a035c Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 4 May 2026 20:38:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(dialysis+workflow):=20=E9=80=8F=E6=9E=90?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=20BPMN=20=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E9=9B=86=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dialysis_record 新增 workflow_instance_id 列,关联工作流实例 - 种子 dialysis_session BPMN 流程定义:透前评估→上机确认→透中监测→透后评估→医生审核 - 事件驱动编排器:dialysis.record.created → 自动启动 BPMN 工作流 - 工作流启动后自动回写 instance_id 到透析记录 - 编排器在 erp-server 层实现(遵循星型依赖架构) --- .../src/entity/dialysis_record.rs | 2 + .../src/service/dialysis_service.rs | 1 + crates/erp-server/migration/src/lib.rs | 2 + ...4_dialysis_record_add_workflow_instance.rs | 33 +++ crates/erp-server/src/dialysis_workflow.rs | 204 ++++++++++++++++++ crates/erp-server/src/main.rs | 10 + 6 files changed, 252 insertions(+) create mode 100644 crates/erp-server/migration/src/m20260505_000114_dialysis_record_add_workflow_instance.rs create mode 100644 crates/erp-server/src/dialysis_workflow.rs diff --git a/crates/erp-dialysis/src/entity/dialysis_record.rs b/crates/erp-dialysis/src/entity/dialysis_record.rs index c15176d..ea1ddf6 100644 --- a/crates/erp-dialysis/src/entity/dialysis_record.rs +++ b/crates/erp-dialysis/src/entity/dialysis_record.rs @@ -60,6 +60,8 @@ pub struct Model { pub version: i32, #[sea_orm(skip_serializing_if = "Option::is_none")] pub key_version: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub workflow_instance_id: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/erp-dialysis/src/service/dialysis_service.rs b/crates/erp-dialysis/src/service/dialysis_service.rs index 97de4a7..6de5f98 100644 --- a/crates/erp-dialysis/src/service/dialysis_service.rs +++ b/crates/erp-dialysis/src/service/dialysis_service.rs @@ -122,6 +122,7 @@ pub async fn create_dialysis_record( deleted_at: Set(None), version: Set(1), key_version: Set(Some(1)), + workflow_instance_id: Set(None), }; let m = active.insert(&state.db).await?; diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index 2515119..b83215d 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -113,6 +113,7 @@ mod m20260504_000110_alter_critical_alerts_version_i32; mod m20260505_000111_create_care_plan; mod m20260505_000112_create_shift_management; mod m20260505_000113_create_ble_gateways; +mod m20260505_000114_dialysis_record_add_workflow_instance; pub struct Migrator; @@ -233,6 +234,7 @@ impl MigratorTrait for Migrator { Box::new(m20260505_000111_create_care_plan::Migration), Box::new(m20260505_000112_create_shift_management::Migration), Box::new(m20260505_000113_create_ble_gateways::Migration), + Box::new(m20260505_000114_dialysis_record_add_workflow_instance::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260505_000114_dialysis_record_add_workflow_instance.rs b/crates/erp-server/migration/src/m20260505_000114_dialysis_record_add_workflow_instance.rs new file mode 100644 index 0000000..0890184 --- /dev/null +++ b/crates/erp-server/migration/src/m20260505_000114_dialysis_record_add_workflow_instance.rs @@ -0,0 +1,33 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("dialysis_record")) + .add_column( + ColumnDef::new(Alias::new("workflow_instance_id")) + .uuid() + .null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("dialysis_record")) + .drop_column(Alias::new("workflow_instance_id")) + .to_owned(), + ) + .await + } +} diff --git a/crates/erp-server/src/dialysis_workflow.rs b/crates/erp-server/src/dialysis_workflow.rs new file mode 100644 index 0000000..513fe4c --- /dev/null +++ b/crates/erp-server/src/dialysis_workflow.rs @@ -0,0 +1,204 @@ +use erp_core::events::EventBus; +use sea_orm::ActiveValue::Set; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter}; +use uuid::Uuid; + +/// 启动透析会话工作流编排器 +/// 订阅 dialysis.record.created → 自动查找并启动 dialysis_session BPMN 工作流 +pub fn start_dialysis_workflow_orchestrator( + db: sea_orm::DatabaseConnection, + event_bus: EventBus, +) { + let (mut receiver, _handle) = event_bus.subscribe_filtered("dialysis.".to_string()); + + tokio::spawn(async move { + loop { + match receiver.recv().await { + Some(event) if event.event_type == "dialysis.record.created" => { + if let Err(e) = + handle_dialysis_record_created(&db, &event_bus, &event).await + { + tracing::warn!( + error = %e, + record_id = ?event.payload.get("record_id"), + "透析会话工作流启动失败" + ); + } + } + Some(_) => {} + None => { + tracing::debug!("透析工作流事件接收器关闭"); + break; + } + } + } + }); +} + +async fn handle_dialysis_record_created( + db: &sea_orm::DatabaseConnection, + event_bus: &EventBus, + event: &erp_core::events::DomainEvent, +) -> Result<(), Box> { + let record_id = event + .payload + .get("record_id") + .and_then(|v| v.as_str()) + .ok_or("缺少 record_id")?; + let record_uuid = Uuid::parse_str(record_id)?; + + // 查找 dialysis_session 流程定义 + let definition = + erp_workflow::entity::process_definition::Entity::find() + .filter( + erp_workflow::entity::process_definition::Column::Key + .eq("dialysis_session"), + ) + .filter( + erp_workflow::entity::process_definition::Column::TenantId + .eq(event.tenant_id), + ) + .filter( + erp_workflow::entity::process_definition::Column::Status.eq("published"), + ) + .filter( + erp_workflow::entity::process_definition::Column::DeletedAt.is_null(), + ) + .one(db) + .await?; + + let definition = match definition { + Some(d) => d, + None => { + tracing::debug!( + tenant_id = %event.tenant_id, + "未找到 dialysis_session 流程定义,跳过工作流启动" + ); + return Ok(()); + } + }; + + let patient_id = event + .payload + .get("patient_id") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + let req = erp_workflow::dto::StartInstanceReq { + definition_id: definition.id, + business_key: Some(format!("dialysis:{}", record_id)), + variables: Some(vec![ + erp_workflow::dto::SetVariableReq { + name: "patient_id".into(), + var_type: Some("string".into()), + value: patient_id.into(), + }, + erp_workflow::dto::SetVariableReq { + name: "record_id".into(), + var_type: Some("string".into()), + value: record_id.into(), + }, + ]), + }; + + let operator_id = event + .payload + .get("created_by") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()) + .unwrap_or_else(Uuid::nil); + + let result = erp_workflow::service::instance_service::InstanceService::start( + event.tenant_id, + operator_id, + &req, + db, + event_bus, + ) + .await?; + + // 更新 dialysis_record 的 workflow_instance_id + let record = erp_dialysis::entity::dialysis_record::Entity::find_by_id(record_uuid) + .one(db) + .await? + .ok_or("透析记录不存在")?; + + let mut active: erp_dialysis::entity::dialysis_record::ActiveModel = record.into(); + active.workflow_instance_id = Set(Some(result.id)); + active.update(db).await?; + + tracing::info!( + record_id = %record_id, + instance_id = %result.id, + "透析会话工作流已启动" + ); + + Ok(()) +} + +/// 为指定租户种子透析会话 BPMN 流程定义 +/// 流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核 +pub async fn seed_dialysis_session_workflow( + db: &sea_orm::DatabaseConnection, + tenant_id: Uuid, + operator_id: Uuid, +) -> Result<(), sea_orm::DbErr> { + use erp_workflow::entity::process_definition; + + let existing = process_definition::Entity::find() + .filter(process_definition::Column::TenantId.eq(tenant_id)) + .filter(process_definition::Column::Key.eq("dialysis_session")) + .filter(process_definition::Column::DeletedAt.is_null()) + .one(db) + .await?; + + if existing.is_some() { + tracing::debug!(tenant_id = %tenant_id, "dialysis_session 流程定义已存在"); + return Ok(()); + } + + let nodes = serde_json::json!([ + {"id": "start", "type": "StartEvent", "name": "开始"}, + {"id": "pre_assessment", "type": "UserTask", "name": "透前评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, + {"id": "start_dialysis", "type": "UserTask", "name": "上机确认", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, + {"id": "monitoring", "type": "UserTask", "name": "透中监测", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, + {"id": "post_assessment", "type": "UserTask", "name": "透后评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, + {"id": "review", "type": "UserTask", "name": "医生审核", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_doctor"]}, + {"id": "end", "type": "EndEvent", "name": "结束"} + ]); + + let edges = serde_json::json!([ + {"id": "e1", "from": "start", "to": "pre_assessment"}, + {"id": "e2", "from": "pre_assessment", "to": "start_dialysis"}, + {"id": "e3", "from": "start_dialysis", "to": "monitoring"}, + {"id": "e4", "from": "monitoring", "to": "post_assessment"}, + {"id": "e5", "from": "post_assessment", "to": "review"}, + {"id": "e6", "from": "review", "to": "end"} + ]); + + let now = chrono::Utc::now(); + let active = process_definition::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + name: Set("透析会话工作流".into()), + key: Set("dialysis_session".into()), + version: Set(1), + category: Set(Some("dialysis".into())), + description: Set(Some( + "透析会话标准化流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核".into(), + )), + nodes: Set(nodes), + edges: Set(edges), + status: Set("published".into()), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(operator_id), + updated_by: Set(operator_id), + deleted_at: Set(None), + version_field: Set(1), + }; + + active.insert(db).await?; + tracing::info!(tenant_id = %tenant_id, "透析会话工作流模板已创建并发布"); + Ok(()) +} diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index 0282372..ea310d5 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -1,5 +1,6 @@ mod config; mod db; +mod dialysis_workflow; mod handlers; mod middleware; mod outbox; @@ -295,6 +296,11 @@ async fn main() -> anyhow::Result<()> { tracing::warn!(error = %e, "Failed to seed AI workflow definitions"); } + // Seed dialysis session workflow definition + if let Err(e) = dialysis_workflow::seed_dialysis_session_workflow(&db, new_tenant_id, new_tenant_id).await { + tracing::warn!(error = %e, "Failed to seed dialysis session workflow"); + } + new_tenant_id } } @@ -425,6 +431,10 @@ async fn main() -> anyhow::Result<()> { erp_plugin::notification::start_notification_listener(db.clone(), event_bus.clone()); tracing::info!("Plugin notification listener started"); + // Start dialysis workflow orchestrator (dialysis.record.created → BPMN workflow) + dialysis_workflow::start_dialysis_workflow_orchestrator(db.clone(), event_bus.clone()); + tracing::info!("Dialysis workflow orchestrator started"); + // Start outbox relay (LISTEN/NOTIFY + fallback poll for pending domain events) outbox::start_outbox_relay(db.clone(), event_bus.clone(), config.database.url.clone()); tracing::info!("Outbox relay started");