use erp_core::events::EventBus; use sea_orm::ActiveValue::Set; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter}; use uuid::Uuid; /// 启动透析会话工作流编排器 /// 订阅 dialysis.record.created → 自动查找并启动 dialysis_session BPMN 工作流 pub fn start_dialysis_workflow_orchestrator(db: sea_orm::DatabaseConnection, event_bus: EventBus) { let (mut receiver, _handle) = event_bus.subscribe_filtered("dialysis.".to_string()); tokio::spawn(async move { loop { match receiver.recv().await { Some(event) if event.event_type == "dialysis.record.created" => { if let Err(e) = handle_dialysis_record_created(&db, &event_bus, &event).await { tracing::warn!( error = %e, record_id = ?event.payload.get("record_id"), "透析会话工作流启动失败" ); } } Some(_) => {} None => { tracing::debug!("透析工作流事件接收器关闭"); break; } } } }); } async fn handle_dialysis_record_created( db: &sea_orm::DatabaseConnection, event_bus: &EventBus, event: &erp_core::events::DomainEvent, ) -> Result<(), Box> { let record_id = event .payload .get("record_id") .and_then(|v| v.as_str()) .ok_or("缺少 record_id")?; let record_uuid = Uuid::parse_str(record_id)?; // 查找 dialysis_session 流程定义 let definition = erp_workflow::entity::process_definition::Entity::find() .filter(erp_workflow::entity::process_definition::Column::Key.eq("dialysis_session")) .filter(erp_workflow::entity::process_definition::Column::TenantId.eq(event.tenant_id)) .filter(erp_workflow::entity::process_definition::Column::Status.eq("published")) .filter(erp_workflow::entity::process_definition::Column::DeletedAt.is_null()) .one(db) .await?; let definition = match definition { Some(d) => d, None => { tracing::debug!( tenant_id = %event.tenant_id, "未找到 dialysis_session 流程定义,跳过工作流启动" ); return Ok(()); } }; let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let req = erp_workflow::dto::StartInstanceReq { definition_id: definition.id, business_key: Some(format!("dialysis:{}", record_id)), variables: Some(vec![ erp_workflow::dto::SetVariableReq { name: "patient_id".into(), var_type: Some("string".into()), value: patient_id.into(), }, erp_workflow::dto::SetVariableReq { name: "record_id".into(), var_type: Some("string".into()), value: record_id.into(), }, ]), }; let operator_id = event .payload .get("created_by") .and_then(|v| v.as_str()) .and_then(|s| Uuid::parse_str(s).ok()) .unwrap_or_else(Uuid::nil); let result = erp_workflow::service::instance_service::InstanceService::start( event.tenant_id, operator_id, &req, db, event_bus, ) .await?; // 更新 dialysis_record 的 workflow_instance_id let record = erp_dialysis::entity::dialysis_record::Entity::find_by_id(record_uuid) .one(db) .await? .ok_or("透析记录不存在")?; let mut active: erp_dialysis::entity::dialysis_record::ActiveModel = record.into(); active.version = Set(active.version.take().unwrap_or(0) + 1); active.workflow_instance_id = Set(Some(result.id)); active.updated_at = Set(chrono::Utc::now()); active.update(db).await?; tracing::info!( record_id = %record_id, instance_id = %result.id, "透析会话工作流已启动" ); Ok(()) } /// 为指定租户种子透析会话 BPMN 流程定义 /// 流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核 pub async fn seed_dialysis_session_workflow( db: &sea_orm::DatabaseConnection, tenant_id: Uuid, operator_id: Uuid, ) -> Result<(), sea_orm::DbErr> { use erp_workflow::entity::process_definition; let existing = process_definition::Entity::find() .filter(process_definition::Column::TenantId.eq(tenant_id)) .filter(process_definition::Column::Key.eq("dialysis_session")) .filter(process_definition::Column::DeletedAt.is_null()) .one(db) .await?; if existing.is_some() { tracing::debug!(tenant_id = %tenant_id, "dialysis_session 流程定义已存在"); return Ok(()); } let nodes = serde_json::json!([ {"id": "start", "type": "StartEvent", "name": "开始"}, {"id": "pre_assessment", "type": "UserTask", "name": "透前评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, {"id": "start_dialysis", "type": "UserTask", "name": "上机确认", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, {"id": "monitoring", "type": "UserTask", "name": "透中监测", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, {"id": "post_assessment", "type": "UserTask", "name": "透后评估", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_nurse"]}, {"id": "review", "type": "UserTask", "name": "医生审核", "assignee_type": "candidate_groups", "candidate_groups": ["dialysis_doctor"]}, {"id": "end", "type": "EndEvent", "name": "结束"} ]); let edges = serde_json::json!([ {"id": "e1", "from": "start", "to": "pre_assessment"}, {"id": "e2", "from": "pre_assessment", "to": "start_dialysis"}, {"id": "e3", "from": "start_dialysis", "to": "monitoring"}, {"id": "e4", "from": "monitoring", "to": "post_assessment"}, {"id": "e5", "from": "post_assessment", "to": "review"}, {"id": "e6", "from": "review", "to": "end"} ]); let now = chrono::Utc::now(); let active = process_definition::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), name: Set("透析会话工作流".into()), key: Set("dialysis_session".into()), version: Set(1), category: Set(Some("dialysis".into())), description: Set(Some( "透析会话标准化流程:透前评估 → 上机确认 → 透中监测 → 透后评估 → 医生审核".into(), )), nodes: Set(nodes), edges: Set(edges), status: Set("published".into()), created_at: Set(now), updated_at: Set(now), created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), version_field: Set(1), }; active.insert(db).await?; tracing::info!(tenant_id = %tenant_id, "透析会话工作流模板已创建并发布"); Ok(()) }