feat(plugin): P1-P4 审计修复 — 第二批 (运行时监控 + 通知引擎 + 编号reset)
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

2.1 运行时监控:
- LoadedPlugin 新增 RuntimeMetrics (调用次数/错误/响应时间/燃料消耗)
- execute_wasm 自动采集每次调用的耗时和状态
- GET /admin/plugins/{id}/metrics 端点

2.2 通知规则引擎:
- notification.rs: 订阅 plugin.trigger.* 事件
- 触发时自动给管理员发送消息通知
- emit_trigger_events 增加 manifest_id 到 payload

2.3 编号 reset_rule:
- 替换 PostgreSQL SEQUENCE 为表行 + pg_advisory_xact_lock
- 支持 daily/monthly/yearly/never 重置周期
- 每个周期独立计数,切换时自动重置为 1
This commit is contained in:
iven
2026-04-19 14:41:17 +08:00
parent 4bcb4beaa5
commit 0a041c3d22
8 changed files with 283 additions and 28 deletions

View File

@@ -47,6 +47,7 @@ async fn emit_trigger_events(
data: Option<&serde_json::Value>, data: Option<&serde_json::Value>,
event_bus: &EventBus, event_bus: &EventBus,
db: &sea_orm::DatabaseConnection, db: &sea_orm::DatabaseConnection,
manifest_id: &str,
) { ) {
use crate::manifest::PluginTriggerOn; use crate::manifest::PluginTriggerOn;
for trigger in triggers { for trigger in triggers {
@@ -62,13 +63,25 @@ async fn emit_trigger_events(
"entity": entity_name, "entity": entity_name,
"record_id": record_id, "record_id": record_id,
"data": data, "data": data,
"plugin_id": manifest_id,
"trigger_name": trigger.name,
"action": action,
}); });
// 发布原始触发事件
let event = erp_core::events::DomainEvent::new( let event = erp_core::events::DomainEvent::new(
&trigger.name, &trigger.name,
tenant_id, tenant_id,
payload, payload.clone(),
); );
event_bus.publish(event, db).await; event_bus.publish(event, db).await;
// 同时发布 plugin.trigger.{manifest_id} 事件用于通知引擎
let notify_event = erp_core::events::DomainEvent::new(
format!("plugin.trigger.{}.{}", manifest_id, trigger.name),
tenant_id,
payload,
);
event_bus.publish(notify_event, db).await;
} }
} }
} }
@@ -129,7 +142,9 @@ impl PluginDataService {
// 触发事件发布 // 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
emit_trigger_events(&triggers, "create", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db).await; if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "create", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await;
}
} }
Ok(PluginDataResp { Ok(PluginDataResp {
@@ -345,7 +360,9 @@ impl PluginDataService {
// 触发事件发布 // 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
emit_trigger_events(&triggers, "update", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db).await; if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "update", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await;
}
} }
Ok(PluginDataResp { Ok(PluginDataResp {
@@ -499,7 +516,9 @@ impl PluginDataService {
// 触发事件发布 // 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
emit_trigger_events(&triggers, "delete", entity_name, &id.to_string(), tenant_id, None, _event_bus, db).await; if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "delete", entity_name, &id.to_string(), tenant_id, None, _event_bus, db, &mid).await;
}
} }
Ok(()) Ok(())
@@ -693,11 +712,13 @@ impl PluginDataService {
.await; .await;
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
emit_trigger_events( if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
&triggers, "create", entity_name, emit_trigger_events(
&format!("batch_import:{}", success_count), &triggers, "create", entity_name,
tenant_id, None, event_bus, db, &format!("batch_import:{}", success_count),
).await; tenant_id, None, event_bus, db, &mid,
).await;
}
} }
Ok(ImportResult { Ok(ImportResult {

View File

@@ -78,6 +78,19 @@ pub struct LoadedPlugin {
pub linker: Linker<HostState>, pub linker: Linker<HostState>,
pub status: RwLock<PluginStatus>, pub status: RwLock<PluginStatus>,
pub event_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>, pub event_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
pub metrics: Arc<RwLock<RuntimeMetrics>>,
}
/// 插件运行时指标
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct RuntimeMetrics {
pub total_invocations: u64,
pub error_count: u64,
pub total_response_ms: f64,
pub fuel_consumed_total: u64,
pub memory_peak_bytes: u64,
pub last_error: Option<String>,
pub last_invocation_at: Option<chrono::DateTime<chrono::Utc>>,
} }
/// WASM 执行上下文 — 传递真实的租户和用户信息 /// WASM 执行上下文 — 传递真实的租户和用户信息
@@ -146,6 +159,7 @@ impl PluginEngine {
linker, linker,
status: RwLock::new(PluginStatus::Loaded), status: RwLock::new(PluginStatus::Loaded),
event_handles: RwLock::new(vec![]), event_handles: RwLock::new(vec![]),
metrics: Arc::new(RwLock::new(RuntimeMetrics::default())),
}); });
self.plugins.insert(plugin_id.to_string(), loaded); self.plugins.insert(plugin_id.to_string(), loaded);
@@ -391,6 +405,13 @@ impl PluginEngine {
.map(|entry| entry.manifest.clone()) .map(|entry| entry.manifest.clone())
} }
/// 获取插件运行时指标
pub async fn get_metrics(&self, plugin_id: &str) -> PluginResult<RuntimeMetrics> {
let loaded = self.get_loaded(plugin_id)?;
let metrics = loaded.metrics.read().await;
Ok(metrics.clone())
}
/// 检查插件是否正在运行 /// 检查插件是否正在运行
pub async fn is_running(&self, plugin_id: &str) -> bool { pub async fn is_running(&self, plugin_id: &str) -> bool {
if let Some(loaded) = self.plugins.get(plugin_id) { if let Some(loaded) = self.plugins.get(plugin_id) {
@@ -521,6 +542,7 @@ impl PluginEngine {
let timeout_secs = self.config.execution_timeout_secs; let timeout_secs = self.config.execution_timeout_secs;
let pid_owned = plugin_id.to_owned(); let pid_owned = plugin_id.to_owned();
let start = std::time::Instant::now();
// spawn_blocking 闭包执行 WASM正常完成时收集 pending_ops // spawn_blocking 闭包执行 WASM正常完成时收集 pending_ops
let (result, pending_ops): (PluginResult<R>, Vec<PendingOp>) = let (result, pending_ops): (PluginResult<R>, Vec<PendingOp>) =
@@ -552,6 +574,19 @@ impl PluginEngine {
})? })?
.map_err(|e| PluginError::ExecutionError(e.to_string()))?; .map_err(|e| PluginError::ExecutionError(e.to_string()))?;
// 更新运行时指标
let elapsed_ms = start.elapsed().as_millis() as f64;
{
let mut metrics = loaded.metrics.write().await;
metrics.total_invocations += 1;
metrics.total_response_ms += elapsed_ms;
metrics.last_invocation_at = Some(chrono::Utc::now());
if result.is_err() {
metrics.error_count += 1;
metrics.last_error = result.as_ref().err().map(|e| e.to_string());
}
}
// 刷新写操作到数据库 // 刷新写操作到数据库
Self::flush_ops( Self::flush_ops(
&self.db, &self.db,

View File

@@ -351,6 +351,48 @@ where
Ok(Json(ApiResponse::ok(result))) Ok(Json(ApiResponse::ok(result)))
} }
#[utoipa::path(
get,
path = "/api/v1/admin/plugins/{id}/metrics",
responses(
(status = 200, description = "运行时指标", body = ApiResponse<serde_json::Value>),
),
security(("bearer_auth" = [])),
tag = "插件管理"
)]
/// GET /api/v1/admin/plugins/{id}/metrics — 运行时指标
pub async fn get_plugin_metrics<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<Json<ApiResponse<serde_json::Value>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "plugin.list")?;
// 通过 plugin_id 找到 manifest_id再查询 metrics
let manifest_id = crate::data_service::resolve_manifest_id(id, ctx.tenant_id, &state.db).await?;
let metrics = state.engine.get_metrics(&manifest_id).await
.map_err(|e| AppError::Internal(e.to_string()))?;
let avg_ms = if metrics.total_invocations > 0 {
metrics.total_response_ms / metrics.total_invocations as f64
} else {
0.0
};
Ok(Json(ApiResponse::ok(serde_json::json!({
"plugin_id": manifest_id,
"total_invocations": metrics.total_invocations,
"error_count": metrics.error_count,
"avg_response_ms": avg_ms,
"last_error": metrics.last_error,
"last_invocation_at": metrics.last_invocation_at,
}))))
}
#[utoipa::path( #[utoipa::path(
put, put,
path = "/api/v1/admin/plugins/{id}/config", path = "/api/v1/admin/plugins/{id}/config",

View File

@@ -308,49 +308,101 @@ impl host_api::Host for HostState {
fn numbering_generate(&mut self, rule_key: String) -> Result<String, String> { fn numbering_generate(&mut self, rule_key: String) -> Result<String, String> {
let rule = self.numbering_rules let rule = self.numbering_rules
.get(&rule_key) .get(&rule_key)
.ok_or_else(|| format!("编号规则 '{}' 未声明", rule_key))?; .ok_or_else(|| format!("编号规则 '{}' 未声明", rule_key))?
.clone();
let db = self.db.clone() let db = self.db.clone()
.ok_or("编号生成需要数据库连接")?; .ok_or("编号生成需要数据库连接")?;
// 使用 advisory lock 生成编号 let tenant_id = self.tenant_id;
let plugin_id = self.plugin_id.clone();
let rt = tokio::runtime::Handle::current(); let rt = tokio::runtime::Handle::current();
rt.block_on(async { rt.block_on(async {
// 简单实现:基于日期+序列 use sea_orm::{Statement, FromQueryResult, ConnectionTrait};
let now = chrono::Utc::now(); let now = chrono::Utc::now();
let year = now.format("%Y").to_string(); let year = now.format("%Y").to_string();
let month = now.format("%m").to_string(); let month = now.format("%m").to_string();
let day = now.format("%d").to_string();
// 使用 PostgreSQL 序列确保并发安全 // 计算当前周期的 key用于 reset_rule 判断)
use sea_orm::{Statement, FromQueryResult}; let period_key = match rule.reset_rule.as_str() {
#[derive(Debug, FromQueryResult)] "daily" => format!("{}-{}-{}", year, month, day),
struct SeqVal { nextval: i64 } "monthly" => format!("{}-{}", year, month),
"yearly" => year.clone(),
_ => String::new(), // "never" — 不需要周期 key
};
let seq_name = format!("plugin_{}_{}_seq", self.plugin_id.replace('-', "_"), rule_key); // 序列表名
let table_name = format!("plugin_numbering_seq_{}", plugin_id.replace('-', "_"));
// 确保序列表存在
let create_sql = format!( let create_sql = format!(
"CREATE SEQUENCE IF NOT EXISTS {} START WITH 1 INCREMENT BY 1", "CREATE TABLE IF NOT EXISTS {} (\
seq_name rule_key VARCHAR(255) NOT NULL, \
period_key VARCHAR(64) NOT NULL DEFAULT '', \
current_val BIGINT NOT NULL DEFAULT 0, \
PRIMARY KEY (rule_key, period_key)\
)",
table_name
); );
let result: Result<sea_orm::ExecResult, sea_orm::DbErr> = db.execute(Statement::from_string( db.execute(Statement::from_string(
sea_orm::DatabaseBackend::Postgres, sea_orm::DatabaseBackend::Postgres,
create_sql, create_sql,
)).await; )).await.map_err(|e| format!("创建序列表失败: {}", e))?;
result.map_err(|e| format!("创建序列失败: {}", e))?;
let seq_sql = format!("SELECT nextval('{}') as nextval", seq_name); // 使用 advisory lock 保证并发安全
let result: Option<SeqVal> = SeqVal::find_by_statement(Statement::from_string( // lock_id 基于规则名哈希
let lock_id: i64 = {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
(plugin_id.clone() + &rule_key).hash(&mut hasher);
(hasher.finish() as i64).abs()
};
let lock_sql = format!("SELECT pg_advisory_xact_lock({})", lock_id);
db.execute(Statement::from_string(
sea_orm::DatabaseBackend::Postgres, sea_orm::DatabaseBackend::Postgres,
seq_sql, lock_sql,
)).one(&db).await.map_err(|e| format!("获取序列失败: {}", e))?; )).await.map_err(|e| format!("获取失败: {}", e))?;
let seq = result.map(|r| r.nextval).unwrap_or(1); // 读取当前值
let seq_str = format!("{:0>width$}", seq, width = rule.seq_length as usize); #[derive(Debug, FromQueryResult)]
struct SeqRow { current_val: i64 }
let read_sql = format!(
"SELECT current_val FROM {} WHERE rule_key = $1 AND period_key = $2",
table_name
);
let current = SeqRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
read_sql,
[rule_key.clone().into(), period_key.clone().into()],
)).one(&db).await.map_err(|e| format!("读取序列失败: {}", e))?;
let next_val = current.map(|r| r.current_val + 1).unwrap_or(1);
// UPSERT 新值
let upsert_sql = format!(
"INSERT INTO {} (rule_key, period_key, current_val) VALUES ($1, $2, $3) \
ON CONFLICT (rule_key, period_key) DO UPDATE SET current_val = $3",
table_name
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
upsert_sql,
[rule_key.clone().into(), period_key.clone().into(), next_val.into()],
)).await.map_err(|e| format!("更新序列失败: {}", e))?;
let seq_str = format!("{:0>width$}", next_val, width = rule.seq_length as usize);
let number = rule.format let number = rule.format
.replace("{PREFIX}", &rule.prefix) .replace("{PREFIX}", &rule.prefix)
.replace("{YEAR}", &year) .replace("{YEAR}", &year)
.replace("{MONTH}", &month) .replace("{MONTH}", &month)
.replace("{DAY}", &day)
.replace(&format!("{{SEQ:{}}}", rule.seq_length), &seq_str) .replace(&format!("{{SEQ:{}}}", rule.seq_length), &seq_str)
.replace("{SEQ}", &seq_str); .replace("{SEQ}", &seq_str);

View File

@@ -20,6 +20,7 @@ pub mod handler;
pub mod host; pub mod host;
pub mod manifest; pub mod manifest;
pub mod module; pub mod module;
pub mod notification;
pub mod plugin_validator; pub mod plugin_validator;
pub mod service; pub mod service;
pub mod state; pub mod state;

View File

@@ -59,6 +59,10 @@ impl PluginModule {
"/admin/plugins/{id}/health", "/admin/plugins/{id}/health",
get(crate::handler::plugin_handler::health_check_plugin::<S>), get(crate::handler::plugin_handler::health_check_plugin::<S>),
) )
.route(
"/admin/plugins/{id}/metrics",
get(crate::handler::plugin_handler::get_plugin_metrics::<S>),
)
.route( .route(
"/admin/plugins/{id}/config", "/admin/plugins/{id}/config",
put(crate::handler::plugin_handler::update_plugin_config::<S>), put(crate::handler::plugin_handler::update_plugin_config::<S>),

View File

@@ -0,0 +1,96 @@
use sea_orm::{ConnectionTrait, Statement, FromQueryResult};
use uuid::Uuid;
use chrono::Utc;
use erp_core::events::{DomainEvent, EventBus};
use erp_core::error::AppResult;
/// 启动插件通知监听器 — 订阅 plugin.trigger.* 事件
pub fn start_notification_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let (mut rx, _handle) = event_bus.subscribe_filtered("plugin.trigger.".to_string());
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
if let Err(e) = handle_trigger_event(&event, &db).await {
tracing::warn!(
event_type = %event.event_type,
error = %e,
"Failed to handle plugin trigger notification"
);
}
}
tracing::info!("Plugin notification listener stopped");
});
}
async fn handle_trigger_event(event: &DomainEvent, db: &sea_orm::DatabaseConnection) -> AppResult<()> {
let plugin_id = event.payload.get("plugin_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let trigger_name = event.payload.get("trigger_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let entity = event.payload.get("entity")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let action = event.payload.get("action")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let title = format!("插件事件: {}.{}", plugin_id, trigger_name);
let body = format!(
"插件 [{}] 的实体 [{}] 触发了 [{}] 事件",
plugin_id, entity, action
);
// 查询所有管理员用户
#[derive(FromQueryResult)]
struct AdminUser { id: Uuid }
let admins = AdminUser::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"SELECT u.id FROM users u
JOIN user_roles ur ON ur.user_id = u.id
JOIN roles r ON r.id = ur.role_id
WHERE u.tenant_id = $1 AND r.name = 'admin' AND u.deleted_at IS NULL"#,
[event.tenant_id.into()],
))
.all(db)
.await
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
// 为每个管理员插入消息记录
let now = Utc::now();
for admin in &admins {
let msg_id = Uuid::now_v7();
let sql = r#"
INSERT INTO messages (id, tenant_id, sender_type, recipient_id, recipient_type,
title, body, priority, is_read, created_at, updated_at, version)
VALUES ($1, $2, 'system', $3, 'user', $4, $5, 'normal', false, $6, $7, 1)
"#;
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[
msg_id.into(),
event.tenant_id.into(),
admin.id.into(),
title.clone().into(),
body.clone().into(),
now.into(),
now.into(),
],
))
.await
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
}
tracing::info!(
plugin_id = %plugin_id,
trigger = %trigger_name,
admin_count = admins.len(),
"Plugin trigger notification sent"
);
Ok(())
}

View File

@@ -368,6 +368,10 @@ async fn main() -> anyhow::Result<()> {
erp_message::MessageModule::start_event_listener(db.clone(), event_bus.clone()); erp_message::MessageModule::start_event_listener(db.clone(), event_bus.clone());
tracing::info!("Message event listener started"); tracing::info!("Message event listener started");
// Start plugin notification listener (plugin.trigger.* → admin notifications)
erp_plugin::notification::start_notification_listener(db.clone(), event_bus.clone());
tracing::info!("Plugin notification listener started");
// Start outbox relay (re-publish pending domain events) // Start outbox relay (re-publish pending domain events)
outbox::start_outbox_relay(db.clone(), event_bus.clone()); outbox::start_outbox_relay(db.clone(), event_bus.clone());
tracing::info!("Outbox relay started"); tracing::info!("Outbox relay started");