chore: 干净 ERP 基座 — 删除 health/ai/wechat 业务代码

删除内容:
- 前端: health/(67文件), ai/(2文件), Copilot, MediaPicker, 相关API/Store/Hook
- 后端: wechat_handler, wechat_service, wechat_user entity, analytics handler, ai_workflow_seed
- 配置: WechatConfig, AppConfig.wechat, AuthState wechat 字段
- 启动: 微信凭据检查块, ensure_ai_workflows() 调用
- 迁移: 新增 m20260613_000170_drop_wechat_users.rs
- 脚本: api_test_health_alert.py, api_test_mp.py, mpsync.sh/ps1
- E2E: health-data page, flows/ 目录

保留: erp-core/auth/workflow/message/config/plugin + 基座前端 + 通用组件
This commit is contained in:
iven
2026-06-13 00:32:50 +08:00
commit 3772afd987
438 changed files with 86511 additions and 0 deletions

View File

@@ -0,0 +1,875 @@
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use dashmap::DashMap;
use sea_orm::{
ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, QueryFilter, Statement,
TransactionTrait,
};
use serde_json::json;
use tokio::sync::RwLock;
use uuid::Uuid;
use wasmtime::component::{Component, HasSelf, Linker};
use wasmtime::{Config, Engine, Store};
use erp_core::events::EventBus;
use crate::PluginWorld;
use crate::dynamic_table::DynamicTableManager;
use crate::error::{PluginError, PluginResult};
use crate::host::{HostState, NumberingRule, PendingOp};
use crate::manifest::PluginManifest;
/// 从 manifest 的 numbering 声明构建 HostState 缓存映射
fn numbering_rules_from_manifest(manifest: &PluginManifest) -> HashMap<String, NumberingRule> {
let mut rules = HashMap::new();
if let Some(numbering) = &manifest.numbering {
for n in numbering {
rules.insert(
n.entity.clone(),
NumberingRule {
prefix: n.prefix.clone(),
format: n.format.clone(),
seq_length: n.seq_length,
reset_rule: format!("{:?}", n.reset_rule).to_lowercase(),
},
);
}
}
rules
}
/// 插件引擎配置
#[derive(Debug, Clone)]
pub struct PluginEngineConfig {
/// 默认 Fuel 限制
pub default_fuel: u64,
/// 执行超时(秒)
pub execution_timeout_secs: u64,
}
impl Default for PluginEngineConfig {
fn default() -> Self {
Self {
default_fuel: 10_000_000,
execution_timeout_secs: 30,
}
}
}
/// 插件运行状态
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PluginStatus {
/// 已加载到内存
Loaded,
/// 已初始化init() 已调用)
Initialized,
/// 运行中(事件监听已启动)
Running,
/// 错误状态
Error(String),
/// 已禁用
Disabled,
}
/// 已加载的插件实例
pub struct LoadedPlugin {
pub id: String,
pub manifest: PluginManifest,
pub component: Component,
pub linker: Linker<HostState>,
pub status: RwLock<PluginStatus>,
pub event_handles: RwLock<Vec<tokio::task::JoinHandle<()>>>,
pub metrics: Arc<RwLock<RuntimeMetrics>>,
}
/// 插件运行时指标
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct RuntimeMetrics {
pub total_invocations: u64,
pub error_count: u64,
pub total_response_ms: f64,
pub fuel_consumed_total: u64,
pub memory_peak_bytes: u64,
pub last_error: Option<String>,
pub last_invocation_at: Option<chrono::DateTime<chrono::Utc>>,
}
/// WASM 执行上下文 — 传递真实的租户和用户信息
#[derive(Debug, Clone)]
pub struct ExecutionContext {
pub tenant_id: Uuid,
pub user_id: Uuid,
pub permissions: Vec<String>,
}
/// 插件引擎 — 管理所有已加载插件的 WASM 运行时
#[derive(Clone)]
pub struct PluginEngine {
engine: Arc<Engine>,
db: DatabaseConnection,
event_bus: EventBus,
plugins: Arc<DashMap<String, Arc<LoadedPlugin>>>,
config: PluginEngineConfig,
}
impl PluginEngine {
/// 创建新的插件引擎
pub fn new(
db: DatabaseConnection,
event_bus: EventBus,
config: PluginEngineConfig,
) -> PluginResult<Self> {
let mut wasm_config = Config::new();
wasm_config.wasm_component_model(true);
wasm_config.consume_fuel(true);
let engine = Engine::new(&wasm_config)
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
Ok(Self {
engine: Arc::new(engine),
db,
event_bus,
plugins: Arc::new(DashMap::new()),
config,
})
}
/// 加载插件到内存(不初始化)
pub async fn load(
&self,
plugin_id: &str,
wasm_bytes: &[u8],
manifest: PluginManifest,
) -> PluginResult<()> {
if self.plugins.contains_key(plugin_id) {
return Err(PluginError::AlreadyExists(plugin_id.to_string()));
}
let component = Component::from_binary(&self.engine, wasm_bytes)
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let mut linker = Linker::new(&self.engine);
// 注册 Host API 到 Linker
PluginWorld::add_to_linker::<_, HasSelf<HostState>>(&mut linker, |state| state)
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let loaded = Arc::new(LoadedPlugin {
id: plugin_id.to_string(),
manifest,
component,
linker,
status: RwLock::new(PluginStatus::Loaded),
event_handles: RwLock::new(vec![]),
metrics: Arc::new(RwLock::new(RuntimeMetrics::default())),
});
self.plugins.insert(plugin_id.to_string(), loaded);
tracing::info!(plugin_id, "Plugin loaded into memory");
Ok(())
}
/// 初始化插件(调用 init()
pub async fn initialize(&self, plugin_id: &str) -> PluginResult<()> {
let loaded = self.get_loaded(plugin_id)?;
// 检查状态
{
let status = loaded.status.read().await;
if *status != PluginStatus::Loaded {
return Err(PluginError::InvalidState {
expected: "Loaded".to_string(),
actual: format!("{:?}", *status),
});
}
}
let ctx = ExecutionContext {
tenant_id: Uuid::nil(),
user_id: Uuid::nil(),
permissions: vec![],
};
let result = self
.execute_wasm(plugin_id, &ctx, |store, instance| {
instance
.erp_plugin_plugin_api()
.call_init(store)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(PluginError::ExecutionError)?;
Ok(())
})
.await;
match result {
Ok(()) => {
*loaded.status.write().await = PluginStatus::Initialized;
tracing::info!(plugin_id, "Plugin initialized");
Ok(())
}
Err(e) => {
*loaded.status.write().await = PluginStatus::Error(e.to_string());
Err(e)
}
}
}
/// 启动事件监听
pub async fn start_event_listener(&self, plugin_id: &str) -> PluginResult<()> {
let loaded = self.get_loaded(plugin_id)?;
// 检查状态
{
let status = loaded.status.read().await;
if *status != PluginStatus::Initialized {
return Err(PluginError::InvalidState {
expected: "Initialized".to_string(),
actual: format!("{:?}", *status),
});
}
}
let events_config = &loaded.manifest.events;
if let Some(events) = events_config {
for pattern in &events.subscribe {
let (mut rx, sub_handle) = self.event_bus.subscribe_filtered(pattern.clone());
let pid = plugin_id.to_string();
let engine = self.clone();
let join_handle = tokio::spawn(async move {
// sub_handle 保存在此 task 中task 结束时自动 drop 触发优雅取消
let _sub_guard = sub_handle;
while let Some(event) = rx.recv().await {
if let Err(e) = engine
.handle_event_inner(
&pid,
&event.event_type,
&event.payload,
event.tenant_id,
)
.await
{
tracing::error!(
plugin_id = %pid,
error = %e,
"Plugin event handler failed"
);
}
}
});
loaded.event_handles.write().await.push(join_handle);
}
}
*loaded.status.write().await = PluginStatus::Running;
tracing::info!(plugin_id, "Plugin event listener started");
Ok(())
}
/// 处理单个事件
pub async fn handle_event(
&self,
plugin_id: &str,
event_type: &str,
payload: &serde_json::Value,
tenant_id: Uuid,
) -> PluginResult<()> {
self.handle_event_inner(plugin_id, event_type, payload, tenant_id)
.await
}
async fn handle_event_inner(
&self,
plugin_id: &str,
event_type: &str,
payload: &serde_json::Value,
tenant_id: Uuid,
) -> PluginResult<()> {
let payload_bytes = serde_json::to_vec(payload).unwrap_or_default();
let event_type = event_type.to_owned();
let ctx = ExecutionContext {
tenant_id,
user_id: Uuid::nil(),
permissions: vec![],
};
self.execute_wasm(plugin_id, &ctx, move |store, instance| {
instance
.erp_plugin_plugin_api()
.call_handle_event(store, &event_type, &payload_bytes)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(PluginError::ExecutionError)?;
Ok(())
})
.await
}
/// 租户创建时调用插件的 on_tenant_created
pub async fn on_tenant_created(&self, plugin_id: &str, tenant_id: Uuid) -> PluginResult<()> {
let tenant_id_str = tenant_id.to_string();
let ctx = ExecutionContext {
tenant_id,
user_id: Uuid::nil(),
permissions: vec![],
};
self.execute_wasm(plugin_id, &ctx, move |store, instance| {
instance
.erp_plugin_plugin_api()
.call_on_tenant_created(store, &tenant_id_str)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(PluginError::ExecutionError)?;
Ok(())
})
.await
}
/// 禁用插件(停止事件监听 + 更新状态)
pub async fn disable(&self, plugin_id: &str) -> PluginResult<()> {
let loaded = self.get_loaded(plugin_id)?;
// 取消所有事件监听
let mut handles = loaded.event_handles.write().await;
for handle in handles.drain(..) {
handle.abort();
}
drop(handles);
*loaded.status.write().await = PluginStatus::Disabled;
tracing::info!(plugin_id, "Plugin disabled");
Ok(())
}
/// 从内存卸载插件
pub async fn unload(&self, plugin_id: &str) -> PluginResult<()> {
if self.plugins.contains_key(plugin_id) {
self.disable(plugin_id).await.ok();
}
self.plugins.remove(plugin_id);
tracing::info!(plugin_id, "Plugin unloaded");
Ok(())
}
/// 将插件从一个 key 重命名为另一个 key用于热更新的原子替换
pub async fn rename_plugin(&self, old_id: &str, new_id: &str) -> PluginResult<()> {
let (_, loaded) = self
.plugins
.remove(old_id)
.ok_or_else(|| PluginError::NotFound(old_id.to_string()))?;
let mut loaded = Arc::try_unwrap(loaded)
.map_err(|_| PluginError::ExecutionError("插件仍被引用,无法重命名".to_string()))?;
loaded.id = new_id.to_string();
self.plugins.insert(new_id.to_string(), Arc::new(loaded));
tracing::info!(old_id, new_id, "Plugin renamed");
Ok(())
}
/// 健康检查
pub async fn health_check(&self, plugin_id: &str) -> PluginResult<serde_json::Value> {
let loaded = self.get_loaded(plugin_id)?;
let status = loaded.status.read().await;
match &*status {
PluginStatus::Running => Ok(json!({
"status": "healthy",
"plugin_id": plugin_id,
})),
PluginStatus::Error(e) => Ok(json!({
"status": "error",
"plugin_id": plugin_id,
"error": e,
})),
other => Ok(json!({
"status": "unhealthy",
"plugin_id": plugin_id,
"state": format!("{:?}", other),
})),
}
}
/// 列出所有已加载插件的信息
pub fn list_plugins(&self) -> Vec<PluginInfo> {
self.plugins
.iter()
.map(|entry| {
let loaded = entry.value();
PluginInfo {
id: loaded.id.clone(),
name: loaded.manifest.metadata.name.clone(),
version: loaded.manifest.metadata.version.clone(),
}
})
.collect()
}
/// 获取插件清单
pub fn get_manifest(&self, plugin_id: &str) -> Option<PluginManifest> {
self.plugins
.get(plugin_id)
.map(|entry| entry.manifest.clone())
}
/// 获取插件运行时指标
pub async fn get_metrics(&self, plugin_id: &str) -> PluginResult<RuntimeMetrics> {
let loaded = self.get_loaded(plugin_id)?;
let metrics = loaded.metrics.read().await;
Ok(metrics.clone())
}
/// 刷新插件内存配置(配置变更后调用)
pub async fn refresh_config(&self, plugin_id: &str) -> PluginResult<()> {
// 扫描所有已加载插件,找到匹配 manifest_id 的插件
for entry in self.plugins.iter() {
if entry.value().id == plugin_id {
// 配置会在下次 execute_wasm 时从数据库自动重新加载
// 这里只清理可能缓存的旧配置
tracing::info!(
plugin_id,
"Plugin config refresh scheduled (loaded on next invocation)"
);
return Ok(());
}
}
Ok(())
}
/// 检查插件是否正在运行
pub async fn is_running(&self, plugin_id: &str) -> bool {
if let Some(loaded) = self.plugins.get(plugin_id) {
matches!(*loaded.status.read().await, PluginStatus::Running)
} else {
false
}
}
/// 恢复数据库中状态为 running/enabled 的插件。
///
/// 服务器重启后调用此方法,重新加载 WASM 到内存并启动事件监听。
pub async fn recover_plugins(&self, db: &DatabaseConnection) -> PluginResult<Vec<String>> {
use crate::entity::plugin;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
// 查询所有运行中的插件
let running_plugins = plugin::Entity::find()
.filter(plugin::Column::Status.eq("running"))
.filter(plugin::Column::DeletedAt.is_null())
.all(db)
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
let mut recovered = Vec::new();
for model in running_plugins {
let tenant_id = model.tenant_id;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let plugin_id_str = &manifest.metadata.id;
// 跳过已被其他租户加载的同 ID 插件WASM 二进制相同,数据隔离在 DB 层)
if self.plugins.contains_key(plugin_id_str) {
tracing::info!(
plugin_id = %plugin_id_str,
tenant_id = %tenant_id,
"Plugin already loaded by another tenant, skipping duplicate load"
);
recovered.push(plugin_id_str.clone());
continue;
}
// 加载 WASM 到内存
if let Err(e) = self
.load(plugin_id_str, &model.wasm_binary, manifest.clone())
.await
{
tracing::error!(
plugin_id = %plugin_id_str,
tenant_id = %tenant_id,
error = %e,
"Failed to recover plugin (load)"
);
continue;
}
// 初始化
if let Err(e) = self.initialize(plugin_id_str).await {
tracing::error!(
plugin_id = %plugin_id_str,
tenant_id = %tenant_id,
error = %e,
"Failed to recover plugin (initialize)"
);
continue;
}
// 启动事件监听
if let Err(e) = self.start_event_listener(plugin_id_str).await {
tracing::error!(
plugin_id = %plugin_id_str,
tenant_id = %tenant_id,
error = %e,
"Failed to recover plugin (start_event_listener)"
);
continue;
}
tracing::info!(
plugin_id = %plugin_id_str,
tenant_id = %tenant_id,
"Plugin recovered"
);
recovered.push(plugin_id_str.clone());
}
tracing::info!(count = recovered.len(), "Plugins recovered");
Ok(recovered)
}
// ---- 内部方法 ----
fn get_loaded(&self, plugin_id: &str) -> PluginResult<Arc<LoadedPlugin>> {
self.plugins
.get(plugin_id)
.map(|e| e.value().clone())
.ok_or_else(|| PluginError::NotFound(plugin_id.to_string()))
}
/// 在 spawn_blocking + catch_unwind + fuel + timeout 中执行 WASM 操作,
/// 执行完成后自动刷新 pending_ops 到数据库。
async fn execute_wasm<F, R>(
&self,
plugin_id: &str,
exec_ctx: &ExecutionContext,
operation: F,
) -> PluginResult<R>
where
F: FnOnce(&mut Store<HostState>, &PluginWorld) -> PluginResult<R>
+ Send
+ std::panic::UnwindSafe
+ 'static,
R: Send + 'static,
{
let loaded = self.get_loaded(plugin_id)?;
// 构建跨插件实体映射(从 manifest 的 ref_plugin 字段提取)
let cross_plugin_entities =
Self::build_cross_plugin_map(&loaded.manifest, &self.db, exec_ctx.tenant_id).await;
// 加载插件配置(从数据库)
let plugin_config = Self::load_plugin_config(plugin_id, exec_ctx.tenant_id, &self.db).await;
// 创建新的 Store + HostState使用真实的租户/用户上下文
// 传入 db 和 event_bus 启用混合执行模式(插件可自主查询数据)
let mut state = HostState::new_with_db(
plugin_id.to_string(),
exec_ctx.tenant_id,
exec_ctx.user_id,
exec_ctx.permissions.clone(),
self.db.clone(),
self.event_bus.clone(),
);
state.cross_plugin_entities = cross_plugin_entities;
// 注入编号规则和插件配置
state.numbering_rules = numbering_rules_from_manifest(&loaded.manifest);
state.plugin_config = plugin_config;
let mut store = Store::new(&self.engine, state);
store
.set_fuel(self.config.default_fuel)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?;
store.limiter(|state| &mut state.limits);
// 实例化
let instance =
PluginWorld::instantiate_async(&mut store, &loaded.component, &loaded.linker)
.await
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let timeout_secs = self.config.execution_timeout_secs;
let pid_owned = plugin_id.to_owned();
let start = std::time::Instant::now();
// spawn_blocking 闭包执行 WASM正常完成时收集 pending_ops
let (result, pending_ops): (PluginResult<R>, Vec<PendingOp>) = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking(move || {
match std::panic::catch_unwind(AssertUnwindSafe(|| {
let r = operation(&mut store, &instance);
// catch_unwind 内部不能调用 into_data需要 &mut self
// 但这里 operation 已完成store 仍可用
let ops = std::mem::take(&mut store.data_mut().pending_ops);
(r, ops)
})) {
Ok((r, ops)) => (r, ops),
Err(_) => {
// panic 后丢弃所有 pending_ops避免半完成状态写入数据库
tracing::warn!(plugin = %pid_owned, "WASM panic, discarding pending ops");
(
Err(PluginError::ExecutionError("WASM panic".to_string())),
Vec::new(),
)
}
}
}),
)
.await
.map_err(|_| PluginError::ExecutionError(format!("插件执行超时 ({}s)", timeout_secs)))?
.map_err(|e| PluginError::ExecutionError(e.to_string()))?;
// 更新运行时指标
let elapsed_ms = start.elapsed().as_millis() as f64;
{
let mut metrics = loaded.metrics.write().await;
metrics.total_invocations += 1;
metrics.total_response_ms += elapsed_ms;
metrics.last_invocation_at = Some(chrono::Utc::now());
if result.is_err() {
metrics.error_count += 1;
metrics.last_error = result.as_ref().err().map(|e| e.to_string());
}
}
// 刷新写操作到数据库
Self::flush_ops(
&self.db,
plugin_id,
pending_ops,
exec_ctx.tenant_id,
exec_ctx.user_id,
&self.event_bus,
)
.await?;
result
}
/// 从数据库加载插件配置(通过 manifest metadata.id 匹配)
fn load_plugin_config(
plugin_id: &str,
tenant_id: Uuid,
db: &DatabaseConnection,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = serde_json::Value> + Send + 'static>>
{
let db = db.clone();
let pid = plugin_id.to_string();
Box::pin(async move {
use sea_orm::FromQueryResult;
#[derive(Debug, FromQueryResult)]
struct ConfigRow {
config_json: serde_json::Value,
}
ConfigRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"SELECT config_json FROM plugins WHERE tenant_id = $1\n\
AND deleted_at IS NULL\n\
AND manifest_json->'metadata'->>'id' = $2\n\
LIMIT 1",
[tenant_id.into(), pid.into()],
))
.one(&db)
.await
.ok()
.flatten()
.map(|r| r.config_json)
.unwrap_or_default()
})
}
/// 从 manifest 的 ref_plugin 字段构建跨插件实体映射
/// 返回: { "erp-crm.customer" → "plugin_erp_crm__customer", ... }
async fn build_cross_plugin_map(
manifest: &crate::manifest::PluginManifest,
db: &DatabaseConnection,
tenant_id: Uuid,
) -> HashMap<String, String> {
let mut map = HashMap::new();
let Some(schema) = &manifest.schema else {
return map;
};
for entity in &schema.entities {
for field in &entity.fields {
if let (Some(target_plugin), Some(ref_entity)) =
(&field.ref_plugin, &field.ref_entity)
{
let key = format!("{}.{}", target_plugin, ref_entity);
// 从 plugin_entities 表查找目标表名
let table_name = crate::entity::plugin_entity::Entity::find()
.filter(
crate::entity::plugin_entity::Column::ManifestId
.eq(target_plugin.as_str()),
)
.filter(
crate::entity::plugin_entity::Column::EntityName
.eq(ref_entity.as_str()),
)
.filter(crate::entity::plugin_entity::Column::TenantId.eq(tenant_id))
.filter(crate::entity::plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await
.ok()
.flatten()
.map(|e| e.table_name);
if let Some(tn) = table_name {
map.insert(key, tn);
}
}
}
}
map
}
/// 刷新 HostState 中的 pending_ops 到数据库。
///
/// 使用事务包裹所有数据库操作确保原子性。
/// 事件发布在事务提交后执行best-effort
pub(crate) async fn flush_ops(
db: &DatabaseConnection,
plugin_id: &str,
ops: Vec<PendingOp>,
tenant_id: Uuid,
user_id: Uuid,
event_bus: &EventBus,
) -> PluginResult<()> {
if ops.is_empty() {
return Ok(());
}
// 使用事务确保所有数据库操作的原子性
let txn = db
.begin()
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
for op in &ops {
match op {
PendingOp::Insert { id, entity, data } => {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let parsed_data: serde_json::Value =
serde_json::from_slice(data).unwrap_or_default();
let id_uuid = id
.parse::<Uuid>()
.map_err(|e| PluginError::ExecutionError(format!("无效的 ID: {}", e)))?;
let (sql, values) = DynamicTableManager::build_insert_sql_with_id(
&table_name,
id_uuid,
tenant_id,
user_id,
&parsed_data,
);
txn.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
tracing::debug!(
plugin_id,
entity = %entity,
"Flushed INSERT op"
);
}
PendingOp::Update {
entity,
id,
data,
version,
} => {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let parsed_data: serde_json::Value =
serde_json::from_slice(data).unwrap_or_default();
let id_uuid = id
.parse::<Uuid>()
.map_err(|e| PluginError::ExecutionError(format!("无效的 ID: {}", e)))?;
let (sql, values) = DynamicTableManager::build_update_sql(
&table_name,
id_uuid,
tenant_id,
user_id,
&parsed_data,
*version as i32,
);
txn.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
tracing::debug!(
plugin_id,
entity = %entity,
id = %id,
"Flushed UPDATE op"
);
}
PendingOp::Delete { entity, id } => {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let id_uuid = id
.parse::<Uuid>()
.map_err(|e| PluginError::ExecutionError(format!("无效的 ID: {}", e)))?;
let (sql, values) =
DynamicTableManager::build_delete_sql(&table_name, id_uuid, tenant_id);
txn.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
tracing::debug!(
plugin_id,
entity = %entity,
id = %id,
"Flushed DELETE op"
);
}
PendingOp::PublishEvent { .. } => {
// 事件发布在事务提交后处理
}
}
}
// 提交事务
txn.commit()
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
// 事务提交成功后发布事件best-effort不阻塞主流程
for op in ops {
if let PendingOp::PublishEvent {
event_type,
payload,
} = op
{
let parsed_payload: serde_json::Value =
serde_json::from_slice(&payload).unwrap_or_default();
let event =
erp_core::events::DomainEvent::new(&event_type, tenant_id, parsed_payload);
event_bus.publish(event, db).await;
tracing::debug!(
plugin_id,
event_type = %event_type,
"Flushed PUBLISH_EVENT op"
);
}
}
Ok(())
}
}
/// 插件信息摘要
#[derive(Debug, Clone, serde::Serialize)]
pub struct PluginInfo {
pub id: String,
pub name: String,
pub version: String,
}