From 0a4825be999851ee2b5adc6b69aa0f88ea40f99c Mon Sep 17 00:00:00 2001 From: iven Date: Fri, 1 May 2026 08:53:57 +0800 Subject: [PATCH] =?UTF-8?q?feat(health+workflow):=20=E8=A1=8C=E5=8A=A8?= =?UTF-8?q?=E5=88=86=E5=8F=91=E2=86=92=E5=B7=A5=E4=BD=9C=E6=B5=81=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E9=9B=86=E6=88=90=20=E2=80=94=20=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E9=A9=B1=E5=8A=A8=20BPMN=20=E5=AE=9E=E4=BE=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - create_pending_action 新增 workflow.ai_action.start_requested 事件发布 - 根据 action_type 映射到对应 BPMN 流程定义 key - erp-workflow 消费启动请求,自动创建审批流程实例 - 流程变量包含 risk_level/patient_id/action_type/params --- .../src/service/ai_action_dispatcher.rs | 23 +++ crates/erp-workflow/src/module.rs | 135 ++++++++++++++++++ 2 files changed, 158 insertions(+) diff --git a/crates/erp-health/src/service/ai_action_dispatcher.rs b/crates/erp-health/src/service/ai_action_dispatcher.rs index db1950b..f279ae3 100644 --- a/crates/erp-health/src/service/ai_action_dispatcher.rs +++ b/crates/erp-health/src/service/ai_action_dispatcher.rs @@ -136,6 +136,7 @@ async fn create_pending_action( risk_level: &str, decision: &DispatchDecision, ) { + // 发布待审批事件(通知/日志用) let event = erp_core::events::DomainEvent::new( "health.ai_action.pending_approval", tenant_id, @@ -149,6 +150,28 @@ async fn create_pending_action( })), ); event_bus.publish(event, db).await; + + // 发布工作流启动请求事件(触发 BPMN 审批流程) + let workflow_key = match action_type { + "followup" => "ai_followup_workflow", + "appointment" => "ai_appointment_workflow", + "alert" => "ai_alert_workflow", + _ => return, + }; + + let workflow_event = erp_core::events::DomainEvent::new( + "workflow.ai_action.start_requested", + tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "workflow_key": workflow_key, + "patient_id": patient_id, + "doctor_id": doctor_id, + "risk_level": risk_level, + "action_type": action_type, + "params": params, + })), + ); + event_bus.publish(workflow_event, db).await; } #[cfg(test)] diff --git a/crates/erp-workflow/src/module.rs b/crates/erp-workflow/src/module.rs index c5e8063..8de8de5 100644 --- a/crates/erp-workflow/src/module.rs +++ b/crates/erp-workflow/src/module.rs @@ -134,6 +134,120 @@ impl WorkflowModule { } } +/// 处理 AI 行动工作流启动请求 +async fn handle_ai_action_start( + db: &sea_orm::DatabaseConnection, + event_bus: &EventBus, + event: &erp_core::events::DomainEvent, +) { + let workflow_key = match event.payload.get("workflow_key").and_then(|v| v.as_str()) { + Some(k) => k, + None => { + tracing::warn!("AI 行动工作流事件缺少 workflow_key,跳过"); + return; + } + }; + + let tenant_id = event.tenant_id; + + // 查找对应的流程定义 + use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; + let def = crate::entity::process_definition::Entity::find() + .filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id)) + .filter(crate::entity::process_definition::Column::Key.eq(workflow_key)) + .filter(crate::entity::process_definition::Column::DeletedAt.is_null()) + .filter(crate::entity::process_definition::Column::Status.eq("published")) + .one(db) + .await; + + let def = match def { + Ok(Some(d)) => d, + Ok(None) => { + tracing::warn!( + key = %workflow_key, + tenant_id = %tenant_id, + "AI 行动工作流定义未找到或未发布,跳过" + ); + return; + } + Err(e) => { + tracing::warn!(error = %e, "查询工作流定义失败"); + return; + } + }; + + // 构造启动变量 + let risk_level = event.payload.get("risk_level") + .and_then(|v| v.as_str()) + .unwrap_or("medium") + .to_string(); + + let variables = vec![ + crate::dto::SetVariableReq { + name: "risk_level".into(), + var_type: Some("string".into()), + value: serde_json::Value::String(risk_level.clone()), + }, + crate::dto::SetVariableReq { + name: "patient_id".into(), + var_type: Some("string".into()), + value: event.payload.get("patient_id") + .cloned() + .unwrap_or(serde_json::Value::Null), + }, + crate::dto::SetVariableReq { + name: "action_type".into(), + var_type: Some("string".into()), + value: event.payload.get("action_type") + .and_then(|v| v.as_str()) + .map(|s| serde_json::Value::String(s.to_string())) + .unwrap_or(serde_json::Value::Null), + }, + crate::dto::SetVariableReq { + name: "params".into(), + var_type: Some("string".into()), + value: event.payload.get("params") + .cloned() + .unwrap_or(serde_json::Value::Null), + }, + ]; + + let req = crate::dto::StartInstanceReq { + definition_id: def.id, + business_key: Some(format!("ai_action_{}", chrono::Utc::now().timestamp_millis())), + variables: Some(variables), + }; + + let system_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(); + + match crate::service::instance_service::InstanceService::start( + tenant_id, + system_id, + &req, + db, + event_bus, + ) + .await + { + Ok(instance) => { + tracing::info!( + key = %workflow_key, + instance_id = %instance.id, + tenant_id = %tenant_id, + risk_level = %risk_level, + "AI 行动工作流实例已启动" + ); + } + Err(e) => { + tracing::warn!( + key = %workflow_key, + error = %e, + "AI 行动工作流实例启动失败" + ); + } + } +} + impl Default for WorkflowModule { fn default() -> Self { Self::new() @@ -286,6 +400,27 @@ impl ErpModule for WorkflowModule { }); tracing::info!(module = "workflow", "Workflow 事件处理器已注册(监听 user.deleted)"); + + // 订阅 AI 行动工作流启动请求 + let (mut ai_rx, _ai_handle) = bus.subscribe_filtered("workflow.ai_action.".to_string()); + let ai_db = ctx.db.clone(); + let ai_bus = bus.clone(); + + tokio::spawn(async move { + loop { + match ai_rx.recv().await { + Some(event) if event.event_type == "workflow.ai_action.start_requested" => { + handle_ai_action_start(&ai_db, &ai_bus, &event).await; + } + Some(_) => {} + None => { + tracing::info!("AI 行动工作流事件订阅通道已关闭"); + break; + } + } + } + }); + Ok(()) }