Files
hms/crates/erp-plugin/src/notification.rs
iven 0a041c3d22 feat(plugin): P1-P4 审计修复 — 第二批 (运行时监控 + 通知引擎 + 编号reset)
2.1 运行时监控:
- LoadedPlugin 新增 RuntimeMetrics (调用次数/错误/响应时间/燃料消耗)
- execute_wasm 自动采集每次调用的耗时和状态
- GET /admin/plugins/{id}/metrics 端点

2.2 通知规则引擎:
- notification.rs: 订阅 plugin.trigger.* 事件
- 触发时自动给管理员发送消息通知
- emit_trigger_events 增加 manifest_id 到 payload

2.3 编号 reset_rule:
- 替换 PostgreSQL SEQUENCE 为表行 + pg_advisory_xact_lock
- 支持 daily/monthly/yearly/never 重置周期
- 每个周期独立计数,切换时自动重置为 1
2026-04-19 14:41:17 +08:00

97 lines
3.3 KiB
Rust

use sea_orm::{ConnectionTrait, Statement, FromQueryResult};
use uuid::Uuid;
use chrono::Utc;
use erp_core::events::{DomainEvent, EventBus};
use erp_core::error::AppResult;
/// 启动插件通知监听器 — 订阅 plugin.trigger.* 事件
pub fn start_notification_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let (mut rx, _handle) = event_bus.subscribe_filtered("plugin.trigger.".to_string());
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
if let Err(e) = handle_trigger_event(&event, &db).await {
tracing::warn!(
event_type = %event.event_type,
error = %e,
"Failed to handle plugin trigger notification"
);
}
}
tracing::info!("Plugin notification listener stopped");
});
}
async fn handle_trigger_event(event: &DomainEvent, db: &sea_orm::DatabaseConnection) -> AppResult<()> {
let plugin_id = event.payload.get("plugin_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let trigger_name = event.payload.get("trigger_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let entity = event.payload.get("entity")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let action = event.payload.get("action")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let title = format!("插件事件: {}.{}", plugin_id, trigger_name);
let body = format!(
"插件 [{}] 的实体 [{}] 触发了 [{}] 事件",
plugin_id, entity, action
);
// 查询所有管理员用户
#[derive(FromQueryResult)]
struct AdminUser { id: Uuid }
let admins = AdminUser::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"SELECT u.id FROM users u
JOIN user_roles ur ON ur.user_id = u.id
JOIN roles r ON r.id = ur.role_id
WHERE u.tenant_id = $1 AND r.name = 'admin' AND u.deleted_at IS NULL"#,
[event.tenant_id.into()],
))
.all(db)
.await
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
// 为每个管理员插入消息记录
let now = Utc::now();
for admin in &admins {
let msg_id = Uuid::now_v7();
let sql = r#"
INSERT INTO messages (id, tenant_id, sender_type, recipient_id, recipient_type,
title, body, priority, is_read, created_at, updated_at, version)
VALUES ($1, $2, 'system', $3, 'user', $4, $5, 'normal', false, $6, $7, 1)
"#;
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[
msg_id.into(),
event.tenant_id.into(),
admin.id.into(),
title.clone().into(),
body.clone().into(),
now.into(),
now.into(),
],
))
.await
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
}
tracing::info!(
plugin_id = %plugin_id,
trigger = %trigger_name,
admin_count = admins.len(),
"Plugin trigger notification sent"
);
Ok(())
}