use axum::Router; use axum::routing::{delete, get, put}; use uuid::Uuid; use erp_core::error::AppResult; use erp_core::events::EventBus; use erp_core::module::ErpModule; use crate::handler::{ message_handler, subscription_handler, template_handler, }; /// 消息中心模块,实现 ErpModule trait。 pub struct MessageModule; impl MessageModule { pub fn new() -> Self { Self } /// 构建需要认证的路由。 pub fn protected_routes() -> Router where crate::message_state::MessageState: axum::extract::FromRef, S: Clone + Send + Sync + 'static, { Router::new() // 消息路由 .route( "/messages", get(message_handler::list_messages).post(message_handler::send_message), ) .route( "/messages/unread-count", get(message_handler::unread_count), ) .route( "/messages/{id}/read", put(message_handler::mark_read), ) .route( "/messages/read-all", put(message_handler::mark_all_read), ) .route( "/messages/{id}", delete(message_handler::delete_message), ) // 模板路由 .route( "/message-templates", get(template_handler::list_templates).post(template_handler::create_template), ) // 订阅偏好路由 .route( "/message-subscriptions", put(subscription_handler::update_subscription), ) } /// 启动后台事件监听任务,将工作流事件转化为消息通知。 /// /// 在 main.rs 中调用,因为需要 db 连接。 pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) { let mut rx = event_bus.subscribe(); tokio::spawn(async move { loop { match rx.recv().await { Ok(event) => { let db = db.clone(); let event_bus = event_bus.clone(); tokio::spawn(async move { if let Err(e) = handle_workflow_event(&event, &db, &event_bus).await { tracing::warn!( event_type = %event.event_type, error = %e, "Failed to handle workflow event for messages" ); } }); } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(skipped = n, "Event listener lagged, skipping events"); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { tracing::info!("Event bus closed, stopping message event listener"); break; } } } }); } } impl Default for MessageModule { fn default() -> Self { Self::new() } } #[async_trait::async_trait] impl ErpModule for MessageModule { fn name(&self) -> &str { "message" } fn version(&self) -> &str { env!("CARGO_PKG_VERSION") } fn dependencies(&self) -> Vec<&str> { vec!["auth"] } fn register_routes(&self, router: Router) -> Router { router } fn register_event_handlers(&self, _bus: &EventBus) {} async fn on_tenant_created(&self, _tenant_id: Uuid) -> AppResult<()> { Ok(()) } async fn on_tenant_deleted(&self, _tenant_id: Uuid) -> AppResult<()> { Ok(()) } fn as_any(&self) -> &dyn std::any::Any { self } } /// 处理工作流事件,生成对应的消息通知。 async fn handle_workflow_event( event: &erp_core::events::DomainEvent, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> Result<(), String> { match event.event_type.as_str() { "process_instance.started" => { let instance_id = event.payload.get("instance_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let starter_id = event.payload.get("started_by") .and_then(|v| v.as_str()); if let Some(starter) = starter_id { let recipient = match uuid::Uuid::parse_str(starter) { Ok(id) => id, Err(_) => return Ok(()), }; let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, recipient, "流程已启动".to_string(), format!("您的流程实例 {} 已启动执行。", instance_id), "normal", Some("workflow_instance".to_string()), uuid::Uuid::parse_str(instance_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } "task.completed" => { // 任务完成时通知发起人(此处简化处理) tracing::debug!("Task completed event received, skipping notification for now"); } _ => {} } Ok(()) }