From c0523e19b456987818abc8e4464833390200f830 Mon Sep 17 00:00:00 2001 From: iven Date: Sat, 11 Apr 2026 12:28:13 +0800 Subject: [PATCH] feat(message): add cross-module event integration (Phase 6) - Message module subscribes to workflow events (process_instance.started) - Auto-generates notifications when workflows start - Added started_by to workflow instance event payload - Event listener runs as background tokio task Co-Authored-By: Claude Opus 4.6 --- crates/erp-message/src/module.rs | 80 +++++++++++++++++++ crates/erp-server/src/main.rs | 4 + .../src/service/instance_service.rs | 8 +- 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/crates/erp-message/src/module.rs b/crates/erp-message/src/module.rs index 83c60f5..cbfe078 100644 --- a/crates/erp-message/src/module.rs +++ b/crates/erp-message/src/module.rs @@ -57,6 +57,43 @@ impl MessageModule { put(subscription_handler::update_subscription), ) } + + /// 启动后台事件监听任务,将工作流事件转化为消息通知。 + /// + /// 在 main.rs 中调用,因为需要 db 连接。 + pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) { + use sea_orm::ConnectionTrait; + + let mut rx = event_bus.subscribe(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => { + let db = db.clone(); + let event_bus = event_bus.clone(); + tokio::spawn(async move { + if let Err(e) = + handle_workflow_event(&event, &db, &event_bus).await + { + tracing::warn!( + event_type = %event.event_type, + error = %e, + "Failed to handle workflow event for messages" + ); + } + }); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(skipped = n, "Event listener lagged, skipping events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + tracing::info!("Event bus closed, stopping message event listener"); + break; + } + } + } + }); + } } impl Default for MessageModule { @@ -97,3 +134,46 @@ impl ErpModule for MessageModule { self } } + +/// 处理工作流事件,生成对应的消息通知。 +async fn handle_workflow_event( + event: &erp_core::events::DomainEvent, + db: &sea_orm::DatabaseConnection, + event_bus: &EventBus, +) -> Result<(), String> { + match event.event_type.as_str() { + "process_instance.started" => { + let instance_id = event.payload.get("instance_id") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let starter_id = event.payload.get("started_by") + .and_then(|v| v.as_str()); + + if let Some(starter) = starter_id { + let recipient = match uuid::Uuid::parse_str(starter) { + Ok(id) => id, + Err(_) => return Ok(()), + }; + let _ = crate::service::message_service::MessageService::send_system( + event.tenant_id, + recipient, + "流程已启动".to_string(), + format!("您的流程实例 {} 已启动执行。", instance_id), + "normal", + Some("workflow_instance".to_string()), + uuid::Uuid::parse_str(instance_id).ok(), + db, + event_bus, + ) + .await + .map_err(|e| e.to_string())?; + } + } + "task.completed" => { + // 任务完成时通知发起人(此处简化处理) + tracing::debug!("Task completed event received, skipping notification for now"); + } + _ => {} + } + Ok(()) +} diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index b3e4d66..3d64f35 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -124,6 +124,10 @@ async fn main() -> anyhow::Result<()> { // Register event handlers registry.register_handlers(&event_bus); + // Start message event listener (workflow events → message notifications) + erp_message::MessageModule::start_event_listener(db.clone(), event_bus.clone()); + tracing::info!("Message event listener started"); + let host = config.server.host.clone(); let port = config.server.port; diff --git a/crates/erp-workflow/src/service/instance_service.rs b/crates/erp-workflow/src/service/instance_service.rs index 5bded70..4eb1964 100644 --- a/crates/erp-workflow/src/service/instance_service.rs +++ b/crates/erp-workflow/src/service/instance_service.rs @@ -54,7 +54,7 @@ impl InstanceService { let mut variables = HashMap::new(); if let Some(vars) = &req.variables { for v in vars { - let var_type = v.var_type.as_deref().unwrap_or("string"); + let _var_type = v.var_type.as_deref().unwrap_or("string"); variables.insert(v.name.clone(), v.value.clone()); } } @@ -127,7 +127,7 @@ impl InstanceService { event_bus.publish(erp_core::events::DomainEvent::new( "process_instance.started", tenant_id, - serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id }), + serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id, "started_by": operator_id }), )); // 查询创建后的实例(包含 token) @@ -169,7 +169,7 @@ impl InstanceService { .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; - let page_index = pagination.page.unwrap_or(1).saturating_sub(1) as u64; + let page_index = pagination.page.unwrap_or(1).saturating_sub(1); let models = paginator .fetch_page(page_index) .await @@ -325,7 +325,7 @@ impl InstanceService { ) -> WorkflowResult<()> { let id = Uuid::now_v7(); - let (value_string, value_number, value_boolean, value_date): (Option, Option, Option, Option>) = match var_type { + let (value_string, value_number, value_boolean, _value_date): (Option, Option, Option, Option>) = match var_type { "string" => (value.as_str().map(|s| s.to_string()), None, None, None), "number" => (None, value.as_f64(), None, None), "boolean" => (None, None, value.as_bool(), None),