diff --git a/crates/erp-plugin/src/data_service.rs b/crates/erp-plugin/src/data_service.rs index 917d585..adc59ec 100644 --- a/crates/erp-plugin/src/data_service.rs +++ b/crates/erp-plugin/src/data_service.rs @@ -47,6 +47,7 @@ async fn emit_trigger_events( data: Option<&serde_json::Value>, event_bus: &EventBus, db: &sea_orm::DatabaseConnection, + manifest_id: &str, ) { use crate::manifest::PluginTriggerOn; for trigger in triggers { @@ -62,13 +63,25 @@ async fn emit_trigger_events( "entity": entity_name, "record_id": record_id, "data": data, + "plugin_id": manifest_id, + "trigger_name": trigger.name, + "action": action, }); + // 发布原始触发事件 let event = erp_core::events::DomainEvent::new( &trigger.name, tenant_id, - payload, + payload.clone(), ); event_bus.publish(event, db).await; + + // 同时发布 plugin.trigger.{manifest_id} 事件用于通知引擎 + let notify_event = erp_core::events::DomainEvent::new( + format!("plugin.trigger.{}.{}", manifest_id, trigger.name), + tenant_id, + payload, + ); + event_bus.publish(notify_event, db).await; } } } @@ -129,7 +142,9 @@ impl PluginDataService { // 触发事件发布 if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { - emit_trigger_events(&triggers, "create", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db).await; + if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { + emit_trigger_events(&triggers, "create", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await; + } } Ok(PluginDataResp { @@ -345,7 +360,9 @@ impl PluginDataService { // 触发事件发布 if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { - emit_trigger_events(&triggers, "update", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db).await; + if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { + emit_trigger_events(&triggers, "update", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await; + } } Ok(PluginDataResp { @@ -499,7 +516,9 @@ impl PluginDataService { // 触发事件发布 if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { - emit_trigger_events(&triggers, "delete", entity_name, &id.to_string(), tenant_id, None, _event_bus, db).await; + if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { + emit_trigger_events(&triggers, "delete", entity_name, &id.to_string(), tenant_id, None, _event_bus, db, &mid).await; + } } Ok(()) @@ -693,11 +712,13 @@ impl PluginDataService { .await; if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await { - emit_trigger_events( - &triggers, "create", entity_name, - &format!("batch_import:{}", success_count), - tenant_id, None, event_bus, db, - ).await; + if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { + emit_trigger_events( + &triggers, "create", entity_name, + &format!("batch_import:{}", success_count), + tenant_id, None, event_bus, db, &mid, + ).await; + } } Ok(ImportResult { diff --git a/crates/erp-plugin/src/engine.rs b/crates/erp-plugin/src/engine.rs index fcbc637..a0674d5 100644 --- a/crates/erp-plugin/src/engine.rs +++ b/crates/erp-plugin/src/engine.rs @@ -78,6 +78,19 @@ pub struct LoadedPlugin { pub linker: Linker, pub status: RwLock, pub event_handles: RwLock>>, + pub metrics: Arc>, +} + +/// 插件运行时指标 +#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct RuntimeMetrics { + pub total_invocations: u64, + pub error_count: u64, + pub total_response_ms: f64, + pub fuel_consumed_total: u64, + pub memory_peak_bytes: u64, + pub last_error: Option, + pub last_invocation_at: Option>, } /// WASM 执行上下文 — 传递真实的租户和用户信息 @@ -146,6 +159,7 @@ impl PluginEngine { linker, status: RwLock::new(PluginStatus::Loaded), event_handles: RwLock::new(vec![]), + metrics: Arc::new(RwLock::new(RuntimeMetrics::default())), }); self.plugins.insert(plugin_id.to_string(), loaded); @@ -391,6 +405,13 @@ impl PluginEngine { .map(|entry| entry.manifest.clone()) } + /// 获取插件运行时指标 + pub async fn get_metrics(&self, plugin_id: &str) -> PluginResult { + let loaded = self.get_loaded(plugin_id)?; + let metrics = loaded.metrics.read().await; + Ok(metrics.clone()) + } + /// 检查插件是否正在运行 pub async fn is_running(&self, plugin_id: &str) -> bool { if let Some(loaded) = self.plugins.get(plugin_id) { @@ -521,6 +542,7 @@ impl PluginEngine { let timeout_secs = self.config.execution_timeout_secs; let pid_owned = plugin_id.to_owned(); + let start = std::time::Instant::now(); // spawn_blocking 闭包执行 WASM,正常完成时收集 pending_ops let (result, pending_ops): (PluginResult, Vec) = @@ -552,6 +574,19 @@ impl PluginEngine { })? .map_err(|e| PluginError::ExecutionError(e.to_string()))?; + // 更新运行时指标 + let elapsed_ms = start.elapsed().as_millis() as f64; + { + let mut metrics = loaded.metrics.write().await; + metrics.total_invocations += 1; + metrics.total_response_ms += elapsed_ms; + metrics.last_invocation_at = Some(chrono::Utc::now()); + if result.is_err() { + metrics.error_count += 1; + metrics.last_error = result.as_ref().err().map(|e| e.to_string()); + } + } + // 刷新写操作到数据库 Self::flush_ops( &self.db, diff --git a/crates/erp-plugin/src/handler/plugin_handler.rs b/crates/erp-plugin/src/handler/plugin_handler.rs index 24cb669..0fb2af7 100644 --- a/crates/erp-plugin/src/handler/plugin_handler.rs +++ b/crates/erp-plugin/src/handler/plugin_handler.rs @@ -351,6 +351,48 @@ where Ok(Json(ApiResponse::ok(result))) } +#[utoipa::path( + get, + path = "/api/v1/admin/plugins/{id}/metrics", + responses( + (status = 200, description = "运行时指标", body = ApiResponse), + ), + security(("bearer_auth" = [])), + tag = "插件管理" +)] +/// GET /api/v1/admin/plugins/{id}/metrics — 运行时指标 +pub async fn get_plugin_metrics( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result>, AppError> +where + PluginState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "plugin.list")?; + + // 通过 plugin_id 找到 manifest_id,再查询 metrics + let manifest_id = crate::data_service::resolve_manifest_id(id, ctx.tenant_id, &state.db).await?; + let metrics = state.engine.get_metrics(&manifest_id).await + .map_err(|e| AppError::Internal(e.to_string()))?; + + let avg_ms = if metrics.total_invocations > 0 { + metrics.total_response_ms / metrics.total_invocations as f64 + } else { + 0.0 + }; + + Ok(Json(ApiResponse::ok(serde_json::json!({ + "plugin_id": manifest_id, + "total_invocations": metrics.total_invocations, + "error_count": metrics.error_count, + "avg_response_ms": avg_ms, + "last_error": metrics.last_error, + "last_invocation_at": metrics.last_invocation_at, + })))) +} + #[utoipa::path( put, path = "/api/v1/admin/plugins/{id}/config", diff --git a/crates/erp-plugin/src/host.rs b/crates/erp-plugin/src/host.rs index 1bbce82..4227fed 100644 --- a/crates/erp-plugin/src/host.rs +++ b/crates/erp-plugin/src/host.rs @@ -308,49 +308,101 @@ impl host_api::Host for HostState { fn numbering_generate(&mut self, rule_key: String) -> Result { let rule = self.numbering_rules .get(&rule_key) - .ok_or_else(|| format!("编号规则 '{}' 未声明", rule_key))?; + .ok_or_else(|| format!("编号规则 '{}' 未声明", rule_key))? + .clone(); let db = self.db.clone() .ok_or("编号生成需要数据库连接")?; - // 使用 advisory lock 生成编号 + let tenant_id = self.tenant_id; + let plugin_id = self.plugin_id.clone(); + let rt = tokio::runtime::Handle::current(); rt.block_on(async { - // 简单实现:基于日期+序列 + use sea_orm::{Statement, FromQueryResult, ConnectionTrait}; + let now = chrono::Utc::now(); let year = now.format("%Y").to_string(); let month = now.format("%m").to_string(); + let day = now.format("%d").to_string(); - // 使用 PostgreSQL 序列确保并发安全 - use sea_orm::{Statement, FromQueryResult}; - #[derive(Debug, FromQueryResult)] - struct SeqVal { nextval: i64 } + // 计算当前周期的 key(用于 reset_rule 判断) + let period_key = match rule.reset_rule.as_str() { + "daily" => format!("{}-{}-{}", year, month, day), + "monthly" => format!("{}-{}", year, month), + "yearly" => year.clone(), + _ => String::new(), // "never" — 不需要周期 key + }; - let seq_name = format!("plugin_{}_{}_seq", self.plugin_id.replace('-', "_"), rule_key); + // 序列表名 + let table_name = format!("plugin_numbering_seq_{}", plugin_id.replace('-', "_")); + + // 确保序列表存在 let create_sql = format!( - "CREATE SEQUENCE IF NOT EXISTS {} START WITH 1 INCREMENT BY 1", - seq_name + "CREATE TABLE IF NOT EXISTS {} (\ + rule_key VARCHAR(255) NOT NULL, \ + period_key VARCHAR(64) NOT NULL DEFAULT '', \ + current_val BIGINT NOT NULL DEFAULT 0, \ + PRIMARY KEY (rule_key, period_key)\ + )", + table_name ); - let result: Result = db.execute(Statement::from_string( + db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, create_sql, - )).await; - result.map_err(|e| format!("创建序列失败: {}", e))?; + )).await.map_err(|e| format!("创建序列表失败: {}", e))?; - let seq_sql = format!("SELECT nextval('{}') as nextval", seq_name); - let result: Option = SeqVal::find_by_statement(Statement::from_string( + // 使用 advisory lock 保证并发安全 + // lock_id 基于规则名哈希 + let lock_id: i64 = { + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + (plugin_id.clone() + &rule_key).hash(&mut hasher); + (hasher.finish() as i64).abs() + }; + + let lock_sql = format!("SELECT pg_advisory_xact_lock({})", lock_id); + db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, - seq_sql, - )).one(&db).await.map_err(|e| format!("获取序列失败: {}", e))?; + lock_sql, + )).await.map_err(|e| format!("获取锁失败: {}", e))?; - let seq = result.map(|r| r.nextval).unwrap_or(1); - let seq_str = format!("{:0>width$}", seq, width = rule.seq_length as usize); + // 读取当前值 + #[derive(Debug, FromQueryResult)] + struct SeqRow { current_val: i64 } + + let read_sql = format!( + "SELECT current_val FROM {} WHERE rule_key = $1 AND period_key = $2", + table_name + ); + let current = SeqRow::find_by_statement(Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + read_sql, + [rule_key.clone().into(), period_key.clone().into()], + )).one(&db).await.map_err(|e| format!("读取序列失败: {}", e))?; + + let next_val = current.map(|r| r.current_val + 1).unwrap_or(1); + + // UPSERT 新值 + let upsert_sql = format!( + "INSERT INTO {} (rule_key, period_key, current_val) VALUES ($1, $2, $3) \ + ON CONFLICT (rule_key, period_key) DO UPDATE SET current_val = $3", + table_name + ); + db.execute(Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + upsert_sql, + [rule_key.clone().into(), period_key.clone().into(), next_val.into()], + )).await.map_err(|e| format!("更新序列失败: {}", e))?; + + let seq_str = format!("{:0>width$}", next_val, width = rule.seq_length as usize); let number = rule.format .replace("{PREFIX}", &rule.prefix) .replace("{YEAR}", &year) .replace("{MONTH}", &month) + .replace("{DAY}", &day) .replace(&format!("{{SEQ:{}}}", rule.seq_length), &seq_str) .replace("{SEQ}", &seq_str); diff --git a/crates/erp-plugin/src/lib.rs b/crates/erp-plugin/src/lib.rs index ae307e9..92a917e 100644 --- a/crates/erp-plugin/src/lib.rs +++ b/crates/erp-plugin/src/lib.rs @@ -20,6 +20,7 @@ pub mod handler; pub mod host; pub mod manifest; pub mod module; +pub mod notification; pub mod plugin_validator; pub mod service; pub mod state; diff --git a/crates/erp-plugin/src/module.rs b/crates/erp-plugin/src/module.rs index 189bafc..6f6aae3 100644 --- a/crates/erp-plugin/src/module.rs +++ b/crates/erp-plugin/src/module.rs @@ -59,6 +59,10 @@ impl PluginModule { "/admin/plugins/{id}/health", get(crate::handler::plugin_handler::health_check_plugin::), ) + .route( + "/admin/plugins/{id}/metrics", + get(crate::handler::plugin_handler::get_plugin_metrics::), + ) .route( "/admin/plugins/{id}/config", put(crate::handler::plugin_handler::update_plugin_config::), diff --git a/crates/erp-plugin/src/notification.rs b/crates/erp-plugin/src/notification.rs new file mode 100644 index 0000000..4c37edb --- /dev/null +++ b/crates/erp-plugin/src/notification.rs @@ -0,0 +1,96 @@ +use sea_orm::{ConnectionTrait, Statement, FromQueryResult}; +use uuid::Uuid; +use chrono::Utc; + +use erp_core::events::{DomainEvent, EventBus}; +use erp_core::error::AppResult; + +/// 启动插件通知监听器 — 订阅 plugin.trigger.* 事件 +pub fn start_notification_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) { + let (mut rx, _handle) = event_bus.subscribe_filtered("plugin.trigger.".to_string()); + + tokio::spawn(async move { + while let Some(event) = rx.recv().await { + if let Err(e) = handle_trigger_event(&event, &db).await { + tracing::warn!( + event_type = %event.event_type, + error = %e, + "Failed to handle plugin trigger notification" + ); + } + } + tracing::info!("Plugin notification listener stopped"); + }); +} + +async fn handle_trigger_event(event: &DomainEvent, db: &sea_orm::DatabaseConnection) -> AppResult<()> { + let plugin_id = event.payload.get("plugin_id") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let trigger_name = event.payload.get("trigger_name") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let entity = event.payload.get("entity") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let action = event.payload.get("action") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + let title = format!("插件事件: {}.{}", plugin_id, trigger_name); + let body = format!( + "插件 [{}] 的实体 [{}] 触发了 [{}] 事件", + plugin_id, entity, action + ); + + // 查询所有管理员用户 + #[derive(FromQueryResult)] + struct AdminUser { id: Uuid } + + let admins = AdminUser::find_by_statement(Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + r#"SELECT u.id FROM users u + JOIN user_roles ur ON ur.user_id = u.id + JOIN roles r ON r.id = ur.role_id + WHERE u.tenant_id = $1 AND r.name = 'admin' AND u.deleted_at IS NULL"#, + [event.tenant_id.into()], + )) + .all(db) + .await + .map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?; + + // 为每个管理员插入消息记录 + let now = Utc::now(); + for admin in &admins { + let msg_id = Uuid::now_v7(); + let sql = r#" + INSERT INTO messages (id, tenant_id, sender_type, recipient_id, recipient_type, + title, body, priority, is_read, created_at, updated_at, version) + VALUES ($1, $2, 'system', $3, 'user', $4, $5, 'normal', false, $6, $7, 1) + "#; + db.execute(Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + sql, + [ + msg_id.into(), + event.tenant_id.into(), + admin.id.into(), + title.clone().into(), + body.clone().into(), + now.into(), + now.into(), + ], + )) + .await + .map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?; + } + + tracing::info!( + plugin_id = %plugin_id, + trigger = %trigger_name, + admin_count = admins.len(), + "Plugin trigger notification sent" + ); + + Ok(()) +} diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index 59dbd0a..388b698 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -368,6 +368,10 @@ async fn main() -> anyhow::Result<()> { erp_message::MessageModule::start_event_listener(db.clone(), event_bus.clone()); tracing::info!("Message event listener started"); + // Start plugin notification listener (plugin.trigger.* → admin notifications) + erp_plugin::notification::start_notification_listener(db.clone(), event_bus.clone()); + tracing::info!("Plugin notification listener started"); + // Start outbox relay (re-publish pending domain events) outbox::start_outbox_relay(db.clone(), event_bus.clone()); tracing::info!("Outbox relay started");