use chrono::Utc; use sea_orm::{ConnectionTrait, FromQueryResult, Statement}; use uuid::Uuid; use erp_core::error::AppResult; use erp_core::events::{DomainEvent, EventBus}; /// 启动插件通知监听器 — 订阅 plugin.trigger.* 事件 pub fn start_notification_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) { let (mut rx, _handle) = event_bus.subscribe_filtered("plugin.trigger.".to_string()); tokio::spawn(async move { while let Some(event) = rx.recv().await { if let Err(e) = handle_trigger_event(&event, &db).await { tracing::warn!( event_type = %event.event_type, error = %e, "Failed to handle plugin trigger notification" ); } } tracing::info!("Plugin notification listener stopped"); }); } async fn handle_trigger_event( event: &DomainEvent, db: &sea_orm::DatabaseConnection, ) -> AppResult<()> { let plugin_id = event .payload .get("plugin_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let trigger_name = event .payload .get("trigger_name") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let entity = event .payload .get("entity") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let action = event .payload .get("action") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let title = format!("插件事件: {}.{}", plugin_id, trigger_name); let body = format!( "插件 [{}] 的实体 [{}] 触发了 [{}] 事件", plugin_id, entity, action ); // 查询所有管理员用户 #[derive(FromQueryResult)] struct AdminUser { id: Uuid, } let admins = AdminUser::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, r#"SELECT u.id FROM users u JOIN user_roles ur ON ur.user_id = u.id JOIN roles r ON r.id = ur.role_id WHERE u.tenant_id = $1 AND r.name = 'admin' AND u.deleted_at IS NULL"#, [event.tenant_id.into()], )) .all(db) .await .map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?; // 为每个管理员插入消息记录 let now = Utc::now(); for admin in &admins { let msg_id = Uuid::now_v7(); let sql = r#" INSERT INTO messages (id, tenant_id, sender_type, recipient_id, recipient_type, title, body, priority, is_read, created_at, updated_at, version) VALUES ($1, $2, 'system', $3, 'user', $4, $5, 'normal', false, $6, $7, 1) "#; db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, [ msg_id.into(), event.tenant_id.into(), admin.id.into(), title.clone().into(), body.clone().into(), now.into(), now.into(), ], )) .await .map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?; } tracing::info!( plugin_id = %plugin_id, trigger = %trigger_name, admin_count = admins.len(), "Plugin trigger notification sent" ); Ok(()) }