Files
hms/crates/erp-plugin/src/engine.rs
iven e429448c42 feat(plugin): P2-P4 插件平台演进 — 通用服务 + 质量保障 + 市场
P2 平台通用服务:
- manifest 扩展: settings/numbering/templates/trigger_events/importable/exportable 声明
- 插件配置 UI: PluginSettingsForm 自动表单 + 后端校验 + 详情抽屉 Settings 标签页
- 编号规则: Host API numbering-generate + PostgreSQL 序列 + manifest 绑定
- 触发事件: data_service create/update/delete 自动发布 DomainEvent
- WIT 接口: 新增 numbering-generate/setting-get Host API

P3 质量保障:
- plugin_validator.rs: 安全扫描(WASM大小/实体数量/字段校验) + 复杂度评分
- 运行时监控指标: RuntimeMetrics (错误率/响应时间/Fuel/内存)
- 性能基准: BenchmarkResult 阈值定义
- 上传时自动安全扫描 + /validate API 端点

P4 插件市场:
- 数据库迁移: plugin_market_entries + plugin_market_reviews 表
- 前端 PluginMarket 页面: 分类浏览/搜索/详情/评分
- 路由注册: /plugins/market

测试: 269 全通过 (71 erp-plugin + 41 auth + 57 config + 34 core + 50 message + 16 workflow)
2026-04-19 12:16:24 +08:00

778 lines
27 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use dashmap::DashMap;
use sea_orm::{ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, QueryFilter, Statement, TransactionTrait};
use serde_json::json;
use tokio::sync::RwLock;
use uuid::Uuid;
use wasmtime::component::{Component, HasSelf, Linker};
use wasmtime::{Config, Engine, Store};
use erp_core::events::EventBus;
use crate::PluginWorld;
use crate::dynamic_table::DynamicTableManager;
use crate::error::{PluginError, PluginResult};
use crate::host::{HostState, NumberingRule, PendingOp};
use crate::manifest::PluginManifest;
/// 从 manifest 的 numbering 声明构建 HostState 缓存映射
fn numbering_rules_from_manifest(manifest: &PluginManifest) -> HashMap<String, NumberingRule> {
let mut rules = HashMap::new();
if let Some(numbering) = &manifest.numbering {
for n in numbering {
rules.insert(
n.entity.clone(),
NumberingRule {
prefix: n.prefix.clone(),
format: n.format.clone(),
seq_length: n.seq_length,
reset_rule: format!("{:?}", n.reset_rule).to_lowercase(),
},
);
}
}
rules
}
/// 插件引擎配置
#[derive(Debug, Clone)]
pub struct PluginEngineConfig {
/// 默认 Fuel 限制
pub default_fuel: u64,
/// 执行超时(秒)
pub execution_timeout_secs: u64,
}
impl Default for PluginEngineConfig {
fn default() -> Self {
Self {
default_fuel: 10_000_000,
execution_timeout_secs: 30,
}
}
}
/// 插件运行状态
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PluginStatus {
/// 已加载到内存
Loaded,
/// 已初始化init() 已调用)
Initialized,
/// 运行中(事件监听已启动)
Running,
/// 错误状态
Error(String),
/// 已禁用
Disabled,
}
/// 已加载的插件实例
pub struct LoadedPlugin {
pub id: String,
pub manifest: PluginManifest,
pub component: Component,
pub linker: Linker<HostState>,
pub status: RwLock<PluginStatus>,
pub event_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
}
/// WASM 执行上下文 — 传递真实的租户和用户信息
#[derive(Debug, Clone)]
pub struct ExecutionContext {
pub tenant_id: Uuid,
pub user_id: Uuid,
pub permissions: Vec<String>,
}
/// 插件引擎 — 管理所有已加载插件的 WASM 运行时
#[derive(Clone)]
pub struct PluginEngine {
engine: Arc<Engine>,
db: DatabaseConnection,
event_bus: EventBus,
plugins: Arc<DashMap<String, Arc<LoadedPlugin>>>,
config: PluginEngineConfig,
}
impl PluginEngine {
/// 创建新的插件引擎
pub fn new(
db: DatabaseConnection,
event_bus: EventBus,
config: PluginEngineConfig,
) -> PluginResult<Self> {
let mut wasm_config = Config::new();
wasm_config.wasm_component_model(true);
wasm_config.consume_fuel(true);
let engine = Engine::new(&wasm_config)
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
Ok(Self {
engine: Arc::new(engine),
db,
event_bus,
plugins: Arc::new(DashMap::new()),
config,
})
}
/// 加载插件到内存(不初始化)
pub async fn load(
&self,
plugin_id: &str,
wasm_bytes: &[u8],
manifest: PluginManifest,
) -> PluginResult<()> {
if self.plugins.contains_key(plugin_id) {
return Err(PluginError::AlreadyExists(plugin_id.to_string()));
}
let component = Component::from_binary(&self.engine, wasm_bytes)
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let mut linker = Linker::new(&self.engine);
// 注册 Host API 到 Linker
PluginWorld::add_to_linker::<_, HasSelf<HostState>>(&mut linker, |state| state)
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let loaded = Arc::new(LoadedPlugin {
id: plugin_id.to_string(),
manifest,
component,
linker,
status: RwLock::new(PluginStatus::Loaded),
event_handles: RwLock::new(vec![]),
});
self.plugins.insert(plugin_id.to_string(), loaded);
tracing::info!(plugin_id, "Plugin loaded into memory");
Ok(())
}
/// 初始化插件(调用 init()
pub async fn initialize(&self, plugin_id: &str) -> PluginResult<()> {
let loaded = self.get_loaded(plugin_id)?;
// 检查状态
{
let status = loaded.status.read().await;
if *status != PluginStatus::Loaded {
return Err(PluginError::InvalidState {
expected: "Loaded".to_string(),
actual: format!("{:?}", *status),
});
}
}
let ctx = ExecutionContext {
tenant_id: Uuid::nil(),
user_id: Uuid::nil(),
permissions: vec![],
};
let result = self
.execute_wasm(plugin_id, &ctx, |store, instance| {
instance.erp_plugin_plugin_api().call_init(store)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(|e| PluginError::ExecutionError(e))?;
Ok(())
})
.await;
match result {
Ok(()) => {
*loaded.status.write().await = PluginStatus::Initialized;
tracing::info!(plugin_id, "Plugin initialized");
Ok(())
}
Err(e) => {
*loaded.status.write().await = PluginStatus::Error(e.to_string());
Err(e)
}
}
}
/// 启动事件监听
pub async fn start_event_listener(&self, plugin_id: &str) -> PluginResult<()> {
let loaded = self.get_loaded(plugin_id)?;
// 检查状态
{
let status = loaded.status.read().await;
if *status != PluginStatus::Initialized {
return Err(PluginError::InvalidState {
expected: "Initialized".to_string(),
actual: format!("{:?}", *status),
});
}
}
let events_config = &loaded.manifest.events;
if let Some(events) = events_config {
for pattern in &events.subscribe {
let (mut rx, sub_handle) = self.event_bus.subscribe_filtered(pattern.clone());
let pid = plugin_id.to_string();
let engine = self.clone();
let join_handle = tokio::spawn(async move {
// sub_handle 保存在此 task 中task 结束时自动 drop 触发优雅取消
let _sub_guard = sub_handle;
while let Some(event) = rx.recv().await {
if let Err(e) = engine
.handle_event_inner(
&pid,
&event.event_type,
&event.payload,
event.tenant_id,
)
.await
{
tracing::error!(
plugin_id = %pid,
error = %e,
"Plugin event handler failed"
);
}
}
});
loaded.event_handles.write().await.push(join_handle);
}
}
*loaded.status.write().await = PluginStatus::Running;
tracing::info!(plugin_id, "Plugin event listener started");
Ok(())
}
/// 处理单个事件
pub async fn handle_event(
&self,
plugin_id: &str,
event_type: &str,
payload: &serde_json::Value,
tenant_id: Uuid,
) -> PluginResult<()> {
self.handle_event_inner(plugin_id, event_type, payload, tenant_id)
.await
}
async fn handle_event_inner(
&self,
plugin_id: &str,
event_type: &str,
payload: &serde_json::Value,
tenant_id: Uuid,
) -> PluginResult<()> {
let payload_bytes = serde_json::to_vec(payload).unwrap_or_default();
let event_type = event_type.to_owned();
let ctx = ExecutionContext {
tenant_id,
user_id: Uuid::nil(),
permissions: vec![],
};
self.execute_wasm(plugin_id, &ctx, move |store, instance| {
instance
.erp_plugin_plugin_api()
.call_handle_event(store, &event_type, &payload_bytes)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(|e| PluginError::ExecutionError(e))?;
Ok(())
})
.await
}
/// 租户创建时调用插件的 on_tenant_created
pub async fn on_tenant_created(&self, plugin_id: &str, tenant_id: Uuid) -> PluginResult<()> {
let tenant_id_str = tenant_id.to_string();
let ctx = ExecutionContext {
tenant_id,
user_id: Uuid::nil(),
permissions: vec![],
};
self.execute_wasm(plugin_id, &ctx, move |store, instance| {
instance
.erp_plugin_plugin_api()
.call_on_tenant_created(store, &tenant_id_str)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(|e| PluginError::ExecutionError(e))?;
Ok(())
})
.await
}
/// 禁用插件(停止事件监听 + 更新状态)
pub async fn disable(&self, plugin_id: &str) -> PluginResult<()> {
let loaded = self.get_loaded(plugin_id)?;
// 取消所有事件监听
let mut handles = loaded.event_handles.write().await;
for handle in handles.drain(..) {
handle.abort();
}
drop(handles);
*loaded.status.write().await = PluginStatus::Disabled;
tracing::info!(plugin_id, "Plugin disabled");
Ok(())
}
/// 从内存卸载插件
pub async fn unload(&self, plugin_id: &str) -> PluginResult<()> {
if self.plugins.contains_key(plugin_id) {
self.disable(plugin_id).await.ok();
}
self.plugins.remove(plugin_id);
tracing::info!(plugin_id, "Plugin unloaded");
Ok(())
}
/// 将插件从一个 key 重命名为另一个 key用于热更新的原子替换
pub async fn rename_plugin(&self, old_id: &str, new_id: &str) -> PluginResult<()> {
let (_, loaded) = self.plugins.remove(old_id)
.ok_or_else(|| PluginError::NotFound(old_id.to_string()))?;
let mut loaded = Arc::try_unwrap(loaded)
.map_err(|_| PluginError::ExecutionError("插件仍被引用,无法重命名".to_string()))?;
loaded.id = new_id.to_string();
self.plugins.insert(new_id.to_string(), Arc::new(loaded));
tracing::info!(old_id, new_id, "Plugin renamed");
Ok(())
}
/// 健康检查
pub async fn health_check(&self, plugin_id: &str) -> PluginResult<serde_json::Value> {
let loaded = self.get_loaded(plugin_id)?;
let status = loaded.status.read().await;
match &*status {
PluginStatus::Running => Ok(json!({
"status": "healthy",
"plugin_id": plugin_id,
})),
PluginStatus::Error(e) => Ok(json!({
"status": "error",
"plugin_id": plugin_id,
"error": e,
})),
other => Ok(json!({
"status": "unhealthy",
"plugin_id": plugin_id,
"state": format!("{:?}", other),
})),
}
}
/// 列出所有已加载插件的信息
pub fn list_plugins(&self) -> Vec<PluginInfo> {
self.plugins
.iter()
.map(|entry| {
let loaded = entry.value();
PluginInfo {
id: loaded.id.clone(),
name: loaded.manifest.metadata.name.clone(),
version: loaded.manifest.metadata.version.clone(),
}
})
.collect()
}
/// 获取插件清单
pub fn get_manifest(&self, plugin_id: &str) -> Option<PluginManifest> {
self.plugins
.get(plugin_id)
.map(|entry| entry.manifest.clone())
}
/// 检查插件是否正在运行
pub async fn is_running(&self, plugin_id: &str) -> bool {
if let Some(loaded) = self.plugins.get(plugin_id) {
matches!(*loaded.status.read().await, PluginStatus::Running)
} else {
false
}
}
/// 恢复数据库中状态为 running/enabled 的插件。
///
/// 服务器重启后调用此方法,重新加载 WASM 到内存并启动事件监听。
pub async fn recover_plugins(
&self,
db: &DatabaseConnection,
) -> PluginResult<Vec<String>> {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use crate::entity::plugin;
// 查询所有运行中的插件
let running_plugins = plugin::Entity::find()
.filter(plugin::Column::Status.eq("running"))
.filter(plugin::Column::DeletedAt.is_null())
.all(db)
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
let mut recovered = Vec::new();
for model in running_plugins {
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let plugin_id_str = &manifest.metadata.id;
// 加载 WASM 到内存
if let Err(e) = self.load(plugin_id_str, &model.wasm_binary, manifest.clone()).await {
tracing::error!(
plugin_id = %plugin_id_str,
error = %e,
"Failed to recover plugin (load)"
);
continue;
}
// 初始化
if let Err(e) = self.initialize(plugin_id_str).await {
tracing::error!(
plugin_id = %plugin_id_str,
error = %e,
"Failed to recover plugin (initialize)"
);
continue;
}
// 启动事件监听
if let Err(e) = self.start_event_listener(plugin_id_str).await {
tracing::error!(
plugin_id = %plugin_id_str,
error = %e,
"Failed to recover plugin (start_event_listener)"
);
continue;
}
tracing::info!(plugin_id = %plugin_id_str, "Plugin recovered");
recovered.push(plugin_id_str.clone());
}
tracing::info!(count = recovered.len(), "Plugins recovered");
Ok(recovered)
}
// ---- 内部方法 ----
fn get_loaded(&self, plugin_id: &str) -> PluginResult<Arc<LoadedPlugin>> {
self.plugins
.get(plugin_id)
.map(|e| e.value().clone())
.ok_or_else(|| PluginError::NotFound(plugin_id.to_string()))
}
/// 在 spawn_blocking + catch_unwind + fuel + timeout 中执行 WASM 操作,
/// 执行完成后自动刷新 pending_ops 到数据库。
async fn execute_wasm<F, R>(
&self,
plugin_id: &str,
exec_ctx: &ExecutionContext,
operation: F,
) -> PluginResult<R>
where
F: FnOnce(&mut Store<HostState>, &PluginWorld) -> PluginResult<R>
+ Send
+ std::panic::UnwindSafe
+ 'static,
R: Send + 'static,
{
let loaded = self.get_loaded(plugin_id)?;
// 构建跨插件实体映射(从 manifest 的 ref_plugin 字段提取)
let cross_plugin_entities = Self::build_cross_plugin_map(&loaded.manifest, &self.db, exec_ctx.tenant_id).await;
// 加载插件配置(从数据库)
let plugin_config = Self::load_plugin_config(plugin_id, exec_ctx.tenant_id, &self.db).await;
// 创建新的 Store + HostState使用真实的租户/用户上下文
// 传入 db 和 event_bus 启用混合执行模式(插件可自主查询数据)
let mut state = HostState::new_with_db(
plugin_id.to_string(),
exec_ctx.tenant_id,
exec_ctx.user_id,
exec_ctx.permissions.clone(),
self.db.clone(),
self.event_bus.clone(),
);
state.cross_plugin_entities = cross_plugin_entities;
// 注入编号规则和插件配置
state.numbering_rules = numbering_rules_from_manifest(&loaded.manifest);
state.plugin_config = plugin_config;
let mut store = Store::new(&self.engine, state);
store
.set_fuel(self.config.default_fuel)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?;
store.limiter(|state| &mut state.limits);
// 实例化
let instance = PluginWorld::instantiate_async(&mut store, &loaded.component, &loaded.linker)
.await
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let timeout_secs = self.config.execution_timeout_secs;
let pid_owned = plugin_id.to_owned();
// spawn_blocking 闭包执行 WASM正常完成时收集 pending_ops
let (result, pending_ops): (PluginResult<R>, Vec<PendingOp>) =
tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking(move || {
match std::panic::catch_unwind(AssertUnwindSafe(|| {
let r = operation(&mut store, &instance);
// catch_unwind 内部不能调用 into_data需要 &mut self
// 但这里 operation 已完成store 仍可用
let ops = std::mem::take(&mut store.data_mut().pending_ops);
(r, ops)
})) {
Ok((r, ops)) => (r, ops),
Err(_) => {
// panic 后丢弃所有 pending_ops避免半完成状态写入数据库
tracing::warn!(plugin = %pid_owned, "WASM panic, discarding pending ops");
(
Err(PluginError::ExecutionError("WASM panic".to_string())),
Vec::new(),
)
}
}
}),
)
.await
.map_err(|_| {
PluginError::ExecutionError(format!("插件执行超时 ({}s)", timeout_secs))
})?
.map_err(|e| PluginError::ExecutionError(e.to_string()))?;
// 刷新写操作到数据库
Self::flush_ops(
&self.db,
plugin_id,
pending_ops,
exec_ctx.tenant_id,
exec_ctx.user_id,
&self.event_bus,
)
.await?;
result
}
/// 从数据库加载插件配置(通过 manifest metadata.id 匹配)
fn load_plugin_config(
plugin_id: &str,
tenant_id: Uuid,
db: &DatabaseConnection,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = serde_json::Value> + Send + 'static>> {
let db = db.clone();
let pid = plugin_id.to_string();
Box::pin(async move {
use sea_orm::FromQueryResult;
#[derive(Debug, FromQueryResult)]
struct ConfigRow { config_json: serde_json::Value }
let sql = format!(
"SELECT config_json FROM plugins WHERE tenant_id = '{}'\n\
AND deleted_at IS NULL\n\
AND manifest_json->'metadata'->>'id' = '{}'\n\
LIMIT 1",
tenant_id, pid.replace('\'', "''")
);
ConfigRow::find_by_statement(Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
sql,
))
.one(&db)
.await
.ok()
.flatten()
.map(|r| r.config_json)
.unwrap_or_default()
})
}
/// 从 manifest 的 ref_plugin 字段构建跨插件实体映射
/// 返回: { "erp-crm.customer" → "plugin_erp_crm__customer", ... }
async fn build_cross_plugin_map(
manifest: &crate::manifest::PluginManifest,
db: &DatabaseConnection,
tenant_id: Uuid,
) -> HashMap<String, String> {
let mut map = HashMap::new();
let Some(schema) = &manifest.schema else { return map };
for entity in &schema.entities {
for field in &entity.fields {
if let (Some(target_plugin), Some(ref_entity)) = (&field.ref_plugin, &field.ref_entity) {
let key = format!("{}.{}", target_plugin, ref_entity);
// 从 plugin_entities 表查找目标表名
let table_name = crate::entity::plugin_entity::Entity::find()
.filter(crate::entity::plugin_entity::Column::ManifestId.eq(target_plugin.as_str()))
.filter(crate::entity::plugin_entity::Column::EntityName.eq(ref_entity.as_str()))
.filter(crate::entity::plugin_entity::Column::TenantId.eq(tenant_id))
.filter(crate::entity::plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await
.ok()
.flatten()
.map(|e| e.table_name);
if let Some(tn) = table_name {
map.insert(key, tn);
}
}
}
}
map
}
/// 刷新 HostState 中的 pending_ops 到数据库。
///
/// 使用事务包裹所有数据库操作确保原子性。
/// 事件发布在事务提交后执行best-effort
pub(crate) async fn flush_ops(
db: &DatabaseConnection,
plugin_id: &str,
ops: Vec<PendingOp>,
tenant_id: Uuid,
user_id: Uuid,
event_bus: &EventBus,
) -> PluginResult<()> {
if ops.is_empty() {
return Ok(());
}
// 使用事务确保所有数据库操作的原子性
let txn = db.begin().await.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
for op in &ops {
match op {
PendingOp::Insert { id, entity, data } => {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let parsed_data: serde_json::Value =
serde_json::from_slice(data).unwrap_or_default();
let id_uuid = id.parse::<Uuid>().map_err(|e| {
PluginError::ExecutionError(format!("无效的 ID: {}", e))
})?;
let (sql, values) =
DynamicTableManager::build_insert_sql_with_id(&table_name, id_uuid, tenant_id, user_id, &parsed_data);
txn.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
tracing::debug!(
plugin_id,
entity = %entity,
"Flushed INSERT op"
);
}
PendingOp::Update {
entity,
id,
data,
version,
} => {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let parsed_data: serde_json::Value =
serde_json::from_slice(data).unwrap_or_default();
let id_uuid = id.parse::<Uuid>().map_err(|e| {
PluginError::ExecutionError(format!("无效的 ID: {}", e))
})?;
let (sql, values) = DynamicTableManager::build_update_sql(
&table_name,
id_uuid,
tenant_id,
user_id,
&parsed_data,
*version as i32,
);
txn.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
tracing::debug!(
plugin_id,
entity = %entity,
id = %id,
"Flushed UPDATE op"
);
}
PendingOp::Delete { entity, id } => {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let id_uuid = id.parse::<Uuid>().map_err(|e| {
PluginError::ExecutionError(format!("无效的 ID: {}", e))
})?;
let (sql, values) =
DynamicTableManager::build_delete_sql(&table_name, id_uuid, tenant_id);
txn.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
tracing::debug!(
plugin_id,
entity = %entity,
id = %id,
"Flushed DELETE op"
);
}
PendingOp::PublishEvent { .. } => {
// 事件发布在事务提交后处理
}
}
}
// 提交事务
txn.commit().await.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
// 事务提交成功后发布事件best-effort不阻塞主流程
for op in ops {
if let PendingOp::PublishEvent { event_type, payload } = op {
let parsed_payload: serde_json::Value =
serde_json::from_slice(&payload).unwrap_or_default();
let event = erp_core::events::DomainEvent::new(
&event_type,
tenant_id,
parsed_payload,
);
event_bus.publish(event, db).await;
tracing::debug!(
plugin_id,
event_type = %event_type,
"Flushed PUBLISH_EVENT op"
);
}
}
Ok(())
}
}
/// 插件信息摘要
#[derive(Debug, Clone, serde::Serialize)]
pub struct PluginInfo {
pub id: String,
pub name: String,
pub version: String,
}