feat(health+workflow): 行动分发→工作流启动集成 — 事件驱动 BPMN 实例化
- create_pending_action 新增 workflow.ai_action.start_requested 事件发布 - 根据 action_type 映射到对应 BPMN 流程定义 key - erp-workflow 消费启动请求,自动创建审批流程实例 - 流程变量包含 risk_level/patient_id/action_type/params
This commit is contained in:
@@ -136,6 +136,7 @@ async fn create_pending_action(
|
||||
risk_level: &str,
|
||||
decision: &DispatchDecision,
|
||||
) {
|
||||
// 发布待审批事件(通知/日志用)
|
||||
let event = erp_core::events::DomainEvent::new(
|
||||
"health.ai_action.pending_approval",
|
||||
tenant_id,
|
||||
@@ -149,6 +150,28 @@ async fn create_pending_action(
|
||||
})),
|
||||
);
|
||||
event_bus.publish(event, db).await;
|
||||
|
||||
// 发布工作流启动请求事件(触发 BPMN 审批流程)
|
||||
let workflow_key = match action_type {
|
||||
"followup" => "ai_followup_workflow",
|
||||
"appointment" => "ai_appointment_workflow",
|
||||
"alert" => "ai_alert_workflow",
|
||||
_ => return,
|
||||
};
|
||||
|
||||
let workflow_event = erp_core::events::DomainEvent::new(
|
||||
"workflow.ai_action.start_requested",
|
||||
tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"workflow_key": workflow_key,
|
||||
"patient_id": patient_id,
|
||||
"doctor_id": doctor_id,
|
||||
"risk_level": risk_level,
|
||||
"action_type": action_type,
|
||||
"params": params,
|
||||
})),
|
||||
);
|
||||
event_bus.publish(workflow_event, db).await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -134,6 +134,120 @@ impl WorkflowModule {
|
||||
}
|
||||
}
|
||||
|
||||
/// 处理 AI 行动工作流启动请求
|
||||
async fn handle_ai_action_start(
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
event_bus: &EventBus,
|
||||
event: &erp_core::events::DomainEvent,
|
||||
) {
|
||||
let workflow_key = match event.payload.get("workflow_key").and_then(|v| v.as_str()) {
|
||||
Some(k) => k,
|
||||
None => {
|
||||
tracing::warn!("AI 行动工作流事件缺少 workflow_key,跳过");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let tenant_id = event.tenant_id;
|
||||
|
||||
// 查找对应的流程定义
|
||||
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
|
||||
let def = crate::entity::process_definition::Entity::find()
|
||||
.filter(crate::entity::process_definition::Column::TenantId.eq(tenant_id))
|
||||
.filter(crate::entity::process_definition::Column::Key.eq(workflow_key))
|
||||
.filter(crate::entity::process_definition::Column::DeletedAt.is_null())
|
||||
.filter(crate::entity::process_definition::Column::Status.eq("published"))
|
||||
.one(db)
|
||||
.await;
|
||||
|
||||
let def = match def {
|
||||
Ok(Some(d)) => d,
|
||||
Ok(None) => {
|
||||
tracing::warn!(
|
||||
key = %workflow_key,
|
||||
tenant_id = %tenant_id,
|
||||
"AI 行动工作流定义未找到或未发布,跳过"
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "查询工作流定义失败");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// 构造启动变量
|
||||
let risk_level = event.payload.get("risk_level")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("medium")
|
||||
.to_string();
|
||||
|
||||
let variables = vec![
|
||||
crate::dto::SetVariableReq {
|
||||
name: "risk_level".into(),
|
||||
var_type: Some("string".into()),
|
||||
value: serde_json::Value::String(risk_level.clone()),
|
||||
},
|
||||
crate::dto::SetVariableReq {
|
||||
name: "patient_id".into(),
|
||||
var_type: Some("string".into()),
|
||||
value: event.payload.get("patient_id")
|
||||
.cloned()
|
||||
.unwrap_or(serde_json::Value::Null),
|
||||
},
|
||||
crate::dto::SetVariableReq {
|
||||
name: "action_type".into(),
|
||||
var_type: Some("string".into()),
|
||||
value: event.payload.get("action_type")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| serde_json::Value::String(s.to_string()))
|
||||
.unwrap_or(serde_json::Value::Null),
|
||||
},
|
||||
crate::dto::SetVariableReq {
|
||||
name: "params".into(),
|
||||
var_type: Some("string".into()),
|
||||
value: event.payload.get("params")
|
||||
.cloned()
|
||||
.unwrap_or(serde_json::Value::Null),
|
||||
},
|
||||
];
|
||||
|
||||
let req = crate::dto::StartInstanceReq {
|
||||
definition_id: def.id,
|
||||
business_key: Some(format!("ai_action_{}", chrono::Utc::now().timestamp_millis())),
|
||||
variables: Some(variables),
|
||||
};
|
||||
|
||||
let system_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap();
|
||||
|
||||
match crate::service::instance_service::InstanceService::start(
|
||||
tenant_id,
|
||||
system_id,
|
||||
&req,
|
||||
db,
|
||||
event_bus,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(instance) => {
|
||||
tracing::info!(
|
||||
key = %workflow_key,
|
||||
instance_id = %instance.id,
|
||||
tenant_id = %tenant_id,
|
||||
risk_level = %risk_level,
|
||||
"AI 行动工作流实例已启动"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
key = %workflow_key,
|
||||
error = %e,
|
||||
"AI 行动工作流实例启动失败"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WorkflowModule {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
@@ -286,6 +400,27 @@ impl ErpModule for WorkflowModule {
|
||||
});
|
||||
|
||||
tracing::info!(module = "workflow", "Workflow 事件处理器已注册(监听 user.deleted)");
|
||||
|
||||
// 订阅 AI 行动工作流启动请求
|
||||
let (mut ai_rx, _ai_handle) = bus.subscribe_filtered("workflow.ai_action.".to_string());
|
||||
let ai_db = ctx.db.clone();
|
||||
let ai_bus = bus.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match ai_rx.recv().await {
|
||||
Some(event) if event.event_type == "workflow.ai_action.start_requested" => {
|
||||
handle_ai_action_start(&ai_db, &ai_bus, &event).await;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => {
|
||||
tracing::info!("AI 行动工作流事件订阅通道已关闭");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user