Files
nj/crates/erp-plugin/src/notification.rs
iven c539e6fd83 feat: initialize Nuanji (Warm Notes) project
- Base platform from base.git (ERP base: auth, core, config, message, workflow, plugin)
- Created erp-diary module skeleton (lib.rs, dto.rs, error.rs, event.rs, state.rs)
- Integrated erp-diary into workspace and erp-server
- Added DiaryModule registration in main.rs
- Added DiaryState FromRef in state.rs
- Diary routes mounted (empty routes, ready for implementation)
- Product design spec v1.2 preserved in docs/
- Implementation plan preserved in plans/

Cargo check: OK
Cargo test: OK (78+ base tests passing)
2026-05-31 20:52:19 +08:00

110 lines
3.3 KiB
Rust

use chrono::Utc;
use sea_orm::{ConnectionTrait, FromQueryResult, Statement};
use uuid::Uuid;
use erp_core::error::AppResult;
use erp_core::events::{DomainEvent, EventBus};
/// 启动插件通知监听器 — 订阅 plugin.trigger.* 事件
pub fn start_notification_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let (mut rx, _handle) = event_bus.subscribe_filtered("plugin.trigger.".to_string());
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
if let Err(e) = handle_trigger_event(&event, &db).await {
tracing::warn!(
event_type = %event.event_type,
error = %e,
"Failed to handle plugin trigger notification"
);
}
}
tracing::info!("Plugin notification listener stopped");
});
}
async fn handle_trigger_event(
event: &DomainEvent,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
let plugin_id = event
.payload
.get("plugin_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let trigger_name = event
.payload
.get("trigger_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let entity = event
.payload
.get("entity")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let action = event
.payload
.get("action")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let title = format!("插件事件: {}.{}", plugin_id, trigger_name);
let body = format!(
"插件 [{}] 的实体 [{}] 触发了 [{}] 事件",
plugin_id, entity, action
);
// 查询所有管理员用户
#[derive(FromQueryResult)]
struct AdminUser {
id: Uuid,
}
let admins = AdminUser::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"SELECT u.id FROM users u
JOIN user_roles ur ON ur.user_id = u.id
JOIN roles r ON r.id = ur.role_id
WHERE u.tenant_id = $1 AND r.name = 'admin' AND u.deleted_at IS NULL"#,
[event.tenant_id.into()],
))
.all(db)
.await
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
// 为每个管理员插入消息记录
let now = Utc::now();
for admin in &admins {
let msg_id = Uuid::now_v7();
let sql = r#"
INSERT INTO messages (id, tenant_id, sender_type, recipient_id, recipient_type,
title, body, priority, is_read, created_at, updated_at, version)
VALUES ($1, $2, 'system', $3, 'user', $4, $5, 'normal', false, $6, $7, 1)
"#;
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[
msg_id.into(),
event.tenant_id.into(),
admin.id.into(),
title.clone().into(),
body.clone().into(),
now.into(),
now.into(),
],
))
.await
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
}
tracing::info!(
plugin_id = %plugin_id,
trigger = %trigger_name,
admin_count = admins.len(),
"Plugin trigger notification sent"
);
Ok(())
}