use std::panic::AssertUnwindSafe; use std::sync::Arc; use dashmap::DashMap; use sea_orm::{ConnectionTrait, DatabaseConnection, Statement, TransactionTrait}; use serde_json::json; use tokio::sync::RwLock; use uuid::Uuid; use wasmtime::component::{Component, HasSelf, Linker}; use wasmtime::{Config, Engine, Store}; use erp_core::events::EventBus; use crate::PluginWorld; use crate::dynamic_table::DynamicTableManager; use crate::error::{PluginError, PluginResult}; use crate::host::{HostState, PendingOp}; use crate::manifest::PluginManifest; /// 插件引擎配置 #[derive(Debug, Clone)] pub struct PluginEngineConfig { /// 默认 Fuel 限制 pub default_fuel: u64, /// 执行超时(秒) pub execution_timeout_secs: u64, } impl Default for PluginEngineConfig { fn default() -> Self { Self { default_fuel: 10_000_000, execution_timeout_secs: 30, } } } /// 插件运行状态 #[derive(Debug, Clone, PartialEq, Eq)] pub enum PluginStatus { /// 已加载到内存 Loaded, /// 已初始化(init() 已调用) Initialized, /// 运行中(事件监听已启动) Running, /// 错误状态 Error(String), /// 已禁用 Disabled, } /// 已加载的插件实例 pub struct LoadedPlugin { pub id: String, pub manifest: PluginManifest, pub component: Component, pub linker: Linker, pub status: RwLock, pub event_handles: RwLock>>, } /// WASM 执行上下文 — 传递真实的租户和用户信息 #[derive(Debug, Clone)] pub struct ExecutionContext { pub tenant_id: Uuid, pub user_id: Uuid, pub permissions: Vec, } /// 插件引擎 — 管理所有已加载插件的 WASM 运行时 #[derive(Clone)] pub struct PluginEngine { engine: Arc, db: DatabaseConnection, event_bus: EventBus, plugins: Arc>>, config: PluginEngineConfig, } impl PluginEngine { /// 创建新的插件引擎 pub fn new( db: DatabaseConnection, event_bus: EventBus, config: PluginEngineConfig, ) -> PluginResult { let mut wasm_config = Config::new(); wasm_config.wasm_component_model(true); wasm_config.consume_fuel(true); let engine = Engine::new(&wasm_config) .map_err(|e| PluginError::InstantiationError(e.to_string()))?; Ok(Self { engine: Arc::new(engine), db, event_bus, plugins: Arc::new(DashMap::new()), config, }) } /// 加载插件到内存(不初始化) pub async fn load( &self, plugin_id: &str, wasm_bytes: &[u8], manifest: PluginManifest, ) -> PluginResult<()> { if self.plugins.contains_key(plugin_id) { return Err(PluginError::AlreadyExists(plugin_id.to_string())); } let component = Component::from_binary(&self.engine, wasm_bytes) .map_err(|e| PluginError::InstantiationError(e.to_string()))?; let mut linker = Linker::new(&self.engine); // 注册 Host API 到 Linker PluginWorld::add_to_linker::<_, HasSelf>(&mut linker, |state| state) .map_err(|e| PluginError::InstantiationError(e.to_string()))?; let loaded = Arc::new(LoadedPlugin { id: plugin_id.to_string(), manifest, component, linker, status: RwLock::new(PluginStatus::Loaded), event_handles: RwLock::new(vec![]), }); self.plugins.insert(plugin_id.to_string(), loaded); tracing::info!(plugin_id, "Plugin loaded into memory"); Ok(()) } /// 初始化插件(调用 init()) pub async fn initialize(&self, plugin_id: &str) -> PluginResult<()> { let loaded = self.get_loaded(plugin_id)?; // 检查状态 { let status = loaded.status.read().await; if *status != PluginStatus::Loaded { return Err(PluginError::InvalidState { expected: "Loaded".to_string(), actual: format!("{:?}", *status), }); } } let ctx = ExecutionContext { tenant_id: Uuid::nil(), user_id: Uuid::nil(), permissions: vec![], }; let result = self .execute_wasm(plugin_id, &ctx, |store, instance| { instance.erp_plugin_plugin_api().call_init(store) .map_err(|e| PluginError::ExecutionError(e.to_string()))? .map_err(|e| PluginError::ExecutionError(e))?; Ok(()) }) .await; match result { Ok(()) => { *loaded.status.write().await = PluginStatus::Initialized; tracing::info!(plugin_id, "Plugin initialized"); Ok(()) } Err(e) => { *loaded.status.write().await = PluginStatus::Error(e.to_string()); Err(e) } } } /// 启动事件监听 pub async fn start_event_listener(&self, plugin_id: &str) -> PluginResult<()> { let loaded = self.get_loaded(plugin_id)?; // 检查状态 { let status = loaded.status.read().await; if *status != PluginStatus::Initialized { return Err(PluginError::InvalidState { expected: "Initialized".to_string(), actual: format!("{:?}", *status), }); } } let events_config = &loaded.manifest.events; if let Some(events) = events_config { for pattern in &events.subscribe { let (mut rx, sub_handle) = self.event_bus.subscribe_filtered(pattern.clone()); let pid = plugin_id.to_string(); let engine = self.clone(); let join_handle = tokio::spawn(async move { // sub_handle 保存在此 task 中,task 结束时自动 drop 触发优雅取消 let _sub_guard = sub_handle; while let Some(event) = rx.recv().await { if let Err(e) = engine .handle_event_inner( &pid, &event.event_type, &event.payload, event.tenant_id, ) .await { tracing::error!( plugin_id = %pid, error = %e, "Plugin event handler failed" ); } } }); loaded.event_handles.write().await.push(join_handle); } } *loaded.status.write().await = PluginStatus::Running; tracing::info!(plugin_id, "Plugin event listener started"); Ok(()) } /// 处理单个事件 pub async fn handle_event( &self, plugin_id: &str, event_type: &str, payload: &serde_json::Value, tenant_id: Uuid, ) -> PluginResult<()> { self.handle_event_inner(plugin_id, event_type, payload, tenant_id) .await } async fn handle_event_inner( &self, plugin_id: &str, event_type: &str, payload: &serde_json::Value, tenant_id: Uuid, ) -> PluginResult<()> { let payload_bytes = serde_json::to_vec(payload).unwrap_or_default(); let event_type = event_type.to_owned(); let ctx = ExecutionContext { tenant_id, user_id: Uuid::nil(), permissions: vec![], }; self.execute_wasm(plugin_id, &ctx, move |store, instance| { instance .erp_plugin_plugin_api() .call_handle_event(store, &event_type, &payload_bytes) .map_err(|e| PluginError::ExecutionError(e.to_string()))? .map_err(|e| PluginError::ExecutionError(e))?; Ok(()) }) .await } /// 租户创建时调用插件的 on_tenant_created pub async fn on_tenant_created(&self, plugin_id: &str, tenant_id: Uuid) -> PluginResult<()> { let tenant_id_str = tenant_id.to_string(); let ctx = ExecutionContext { tenant_id, user_id: Uuid::nil(), permissions: vec![], }; self.execute_wasm(plugin_id, &ctx, move |store, instance| { instance .erp_plugin_plugin_api() .call_on_tenant_created(store, &tenant_id_str) .map_err(|e| PluginError::ExecutionError(e.to_string()))? .map_err(|e| PluginError::ExecutionError(e))?; Ok(()) }) .await } /// 禁用插件(停止事件监听 + 更新状态) pub async fn disable(&self, plugin_id: &str) -> PluginResult<()> { let loaded = self.get_loaded(plugin_id)?; // 取消所有事件监听 let mut handles = loaded.event_handles.write().await; for handle in handles.drain(..) { handle.abort(); } drop(handles); *loaded.status.write().await = PluginStatus::Disabled; tracing::info!(plugin_id, "Plugin disabled"); Ok(()) } /// 从内存卸载插件 pub async fn unload(&self, plugin_id: &str) -> PluginResult<()> { if self.plugins.contains_key(plugin_id) { self.disable(plugin_id).await.ok(); } self.plugins.remove(plugin_id); tracing::info!(plugin_id, "Plugin unloaded"); Ok(()) } /// 健康检查 pub async fn health_check(&self, plugin_id: &str) -> PluginResult { let loaded = self.get_loaded(plugin_id)?; let status = loaded.status.read().await; match &*status { PluginStatus::Running => Ok(json!({ "status": "healthy", "plugin_id": plugin_id, })), PluginStatus::Error(e) => Ok(json!({ "status": "error", "plugin_id": plugin_id, "error": e, })), other => Ok(json!({ "status": "unhealthy", "plugin_id": plugin_id, "state": format!("{:?}", other), })), } } /// 列出所有已加载插件的信息 pub fn list_plugins(&self) -> Vec { self.plugins .iter() .map(|entry| { let loaded = entry.value(); PluginInfo { id: loaded.id.clone(), name: loaded.manifest.metadata.name.clone(), version: loaded.manifest.metadata.version.clone(), } }) .collect() } /// 获取插件清单 pub fn get_manifest(&self, plugin_id: &str) -> Option { self.plugins .get(plugin_id) .map(|entry| entry.manifest.clone()) } /// 检查插件是否正在运行 pub async fn is_running(&self, plugin_id: &str) -> bool { if let Some(loaded) = self.plugins.get(plugin_id) { matches!(*loaded.status.read().await, PluginStatus::Running) } else { false } } /// 恢复数据库中状态为 running/enabled 的插件。 /// /// 服务器重启后调用此方法,重新加载 WASM 到内存并启动事件监听。 pub async fn recover_plugins( &self, db: &DatabaseConnection, ) -> PluginResult> { use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use crate::entity::plugin; // 查询所有运行中的插件 let running_plugins = plugin::Entity::find() .filter(plugin::Column::Status.eq("running")) .filter(plugin::Column::DeletedAt.is_null()) .all(db) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; let mut recovered = Vec::new(); for model in running_plugins { let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let plugin_id_str = &manifest.metadata.id; // 加载 WASM 到内存 if let Err(e) = self.load(plugin_id_str, &model.wasm_binary, manifest.clone()).await { tracing::error!( plugin_id = %plugin_id_str, error = %e, "Failed to recover plugin (load)" ); continue; } // 初始化 if let Err(e) = self.initialize(plugin_id_str).await { tracing::error!( plugin_id = %plugin_id_str, error = %e, "Failed to recover plugin (initialize)" ); continue; } // 启动事件监听 if let Err(e) = self.start_event_listener(plugin_id_str).await { tracing::error!( plugin_id = %plugin_id_str, error = %e, "Failed to recover plugin (start_event_listener)" ); continue; } tracing::info!(plugin_id = %plugin_id_str, "Plugin recovered"); recovered.push(plugin_id_str.clone()); } tracing::info!(count = recovered.len(), "Plugins recovered"); Ok(recovered) } // ---- 内部方法 ---- fn get_loaded(&self, plugin_id: &str) -> PluginResult> { self.plugins .get(plugin_id) .map(|e| e.value().clone()) .ok_or_else(|| PluginError::NotFound(plugin_id.to_string())) } /// 在 spawn_blocking + catch_unwind + fuel + timeout 中执行 WASM 操作, /// 执行完成后自动刷新 pending_ops 到数据库。 async fn execute_wasm( &self, plugin_id: &str, exec_ctx: &ExecutionContext, operation: F, ) -> PluginResult where F: FnOnce(&mut Store, &PluginWorld) -> PluginResult + Send + std::panic::UnwindSafe + 'static, R: Send + 'static, { let loaded = self.get_loaded(plugin_id)?; // 创建新的 Store + HostState,使用真实的租户/用户上下文 let state = HostState::new( plugin_id.to_string(), exec_ctx.tenant_id, exec_ctx.user_id, exec_ctx.permissions.clone(), ); let mut store = Store::new(&self.engine, state); store .set_fuel(self.config.default_fuel) .map_err(|e| PluginError::ExecutionError(e.to_string()))?; store.limiter(|state| &mut state.limits); // 实例化 let instance = PluginWorld::instantiate_async(&mut store, &loaded.component, &loaded.linker) .await .map_err(|e| PluginError::InstantiationError(e.to_string()))?; let timeout_secs = self.config.execution_timeout_secs; let pid_owned = plugin_id.to_owned(); // spawn_blocking 闭包执行 WASM,正常完成时收集 pending_ops let (result, pending_ops): (PluginResult, Vec) = tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), tokio::task::spawn_blocking(move || { match std::panic::catch_unwind(AssertUnwindSafe(|| { let r = operation(&mut store, &instance); // catch_unwind 内部不能调用 into_data(需要 &mut self), // 但这里 operation 已完成,store 仍可用 let ops = std::mem::take(&mut store.data_mut().pending_ops); (r, ops) })) { Ok((r, ops)) => (r, ops), Err(_) => { // panic 后丢弃所有 pending_ops,避免半完成状态写入数据库 tracing::warn!(plugin = %pid_owned, "WASM panic, discarding pending ops"); ( Err(PluginError::ExecutionError("WASM panic".to_string())), Vec::new(), ) } } }), ) .await .map_err(|_| { PluginError::ExecutionError(format!("插件执行超时 ({}s)", timeout_secs)) })? .map_err(|e| PluginError::ExecutionError(e.to_string()))?; // 刷新写操作到数据库 Self::flush_ops( &self.db, plugin_id, pending_ops, exec_ctx.tenant_id, exec_ctx.user_id, &self.event_bus, ) .await?; result } /// 刷新 HostState 中的 pending_ops 到数据库。 /// /// 使用事务包裹所有数据库操作确保原子性。 /// 事件发布在事务提交后执行(best-effort)。 pub(crate) async fn flush_ops( db: &DatabaseConnection, plugin_id: &str, ops: Vec, tenant_id: Uuid, user_id: Uuid, event_bus: &EventBus, ) -> PluginResult<()> { if ops.is_empty() { return Ok(()); } // 使用事务确保所有数据库操作的原子性 let txn = db.begin().await.map_err(|e| PluginError::DatabaseError(e.to_string()))?; for op in &ops { match op { PendingOp::Insert { id, entity, data } => { let table_name = DynamicTableManager::table_name(plugin_id, entity); let parsed_data: serde_json::Value = serde_json::from_slice(data).unwrap_or_default(); let id_uuid = id.parse::().map_err(|e| { PluginError::ExecutionError(format!("无效的 ID: {}", e)) })?; let (sql, values) = DynamicTableManager::build_insert_sql_with_id(&table_name, id_uuid, tenant_id, user_id, &parsed_data); txn.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; tracing::debug!( plugin_id, entity = %entity, "Flushed INSERT op" ); } PendingOp::Update { entity, id, data, version, } => { let table_name = DynamicTableManager::table_name(plugin_id, entity); let parsed_data: serde_json::Value = serde_json::from_slice(data).unwrap_or_default(); let id_uuid = id.parse::().map_err(|e| { PluginError::ExecutionError(format!("无效的 ID: {}", e)) })?; let (sql, values) = DynamicTableManager::build_update_sql( &table_name, id_uuid, tenant_id, user_id, &parsed_data, *version as i32, ); txn.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; tracing::debug!( plugin_id, entity = %entity, id = %id, "Flushed UPDATE op" ); } PendingOp::Delete { entity, id } => { let table_name = DynamicTableManager::table_name(plugin_id, entity); let id_uuid = id.parse::().map_err(|e| { PluginError::ExecutionError(format!("无效的 ID: {}", e)) })?; let (sql, values) = DynamicTableManager::build_delete_sql(&table_name, id_uuid, tenant_id); txn.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; tracing::debug!( plugin_id, entity = %entity, id = %id, "Flushed DELETE op" ); } PendingOp::PublishEvent { .. } => { // 事件发布在事务提交后处理 } } } // 提交事务 txn.commit().await.map_err(|e| PluginError::DatabaseError(e.to_string()))?; // 事务提交成功后发布事件(best-effort,不阻塞主流程) for op in ops { if let PendingOp::PublishEvent { event_type, payload } = op { let parsed_payload: serde_json::Value = serde_json::from_slice(&payload).unwrap_or_default(); let event = erp_core::events::DomainEvent::new( &event_type, tenant_id, parsed_payload, ); event_bus.publish(event, db).await; tracing::debug!( plugin_id, event_type = %event_type, "Flushed PUBLISH_EVENT op" ); } } Ok(()) } } /// 插件信息摘要 #[derive(Debug, Clone, serde::Serialize)] pub struct PluginInfo { pub id: String, pub name: String, pub version: String, }