16 KiB
插件系统增强设计规格
Context
插件系统是 ERP 平台的核心差异化能力,当前声明式层面(manifest schema、动态表、前端页面)已达 90% 成熟度。但 WASM 逻辑层存在根本性限制:
- 插件无法自主查询数据 —
db_query的 filter/pagination 参数被忽略,只能使用预填充结果 - 无读后写一致性 — 延迟刷新模型导致插件在一次调用中无法读取自己刚写入的数据
- 聚合只有 COUNT — 缺少 SUM/AVG/MAX/MIN,无法支撑财务、统计类场景
- 热更新无原子回滚 — 旧版本先卸载再加载新版本,中间失败无保障
- Schema 变更只支持新增实体 — 不支持已有实体的字段演进
这些限制使插件系统只能支撑"数据管理+展示"型轻量场景(CRM、简单进销存),无法支撑需要复杂业务逻辑的行业(财务、制造、电商)。
本次增强的目标:让插件逻辑层从 40% 提升到 80%+,使系统能真正承载不同行业的定制化需求。
改动 1:混合执行模型(解决查询和读后写一致性)
问题
host.rs:99-109 — db_query 忽略 _filter 和 _pagination 参数,只从 query_results 预填充缓存取数据。插件无法自主构造查询。
方案:读操作走实时 SQL + 写操作保持延迟批量 + 读前自动 flush
核心流程变更:
当前:
WASM 调用 db_insert() → 入队 pending_ops
WASM 调用 db_query() → 从预填充缓存读(忽略 filter/pagination)
WASM 结束 → flush 全部 pending_ops
改为:
WASM 调用 db_insert() → 入队 pending_ops
WASM 调用 db_query() → 先 flush pending_ops → 执行真实 SQL 查询 → 返回结果
WASM 结束 → flush 剩余 pending_ops
改动文件
1. crates/erp-plugin/src/host.rs
HostState 新增字段:
pub struct HostState {
// ... 现有字段保留 ...
// 新增:用于实时查询的数据服务引用和数据库连接
pub(crate) db: Option<DatabaseConnection>,
pub(crate) data_service_ready: bool,
}
db_query 实现变更:
fn db_query(&mut self, entity: String, filter: Vec<u8>, pagination: Vec<u8>)
-> Result<Vec<u8>, String>
{
// 如果没有数据库连接(向后兼容预填充模式),走旧路径
if self.db.is_none() {
return self.query_results
.get(&entity)
.cloned()
.ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity));
}
// 解析 filter 和 pagination 参数
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
None
} else {
serde_json::from_slice(&filter).ok()
};
let pagination_val: Option<serde_json::Value> = if pagination.is_empty() {
None
} else {
serde_json::from_slice(&pagination).ok()
};
// 先同步 flush pending writes(确保读后写一致性)
// 注意:在 WASM 的 spawn_blocking 上下文中,需要同步执行
// 方案:将 pending_ops 暂存到临时变量,由调用方在 execute_wasm 中处理
// 使用 pre_query_ops 标记,让 engine 在 execute_wasm 中间阶段 flush
self.pre_query_ops = std::mem::take(&mut self.pending_ops);
self.pending_query = Some(PendingQuery { entity, filter_val, pagination_val });
// 返回占位符 — 真正的查询在 execute_wasm 的两阶段执行中完成
Ok(serde_json::to_vec(&serde_json::json!({"status": "query_pending"})).unwrap_or_default())
}
实际实现策略 — 采用回调模式:
由于 db_query 在 spawn_blocking 内执行,不能直接 await 异步数据库操作。采用两阶段执行:
- WASM 执行期间:
db_query收集查询参数,设置needs_flush_and_query = true execute_wasm的spawn_blocking结束后:检查标志,如果需要查询则:- flush pending_ops
- 执行查询
- 用查询结果重新调用 WASM(继续执行后续逻辑)
更好的方案 — 分段执行:
将 execute_wasm 改为分段执行模型:
async fn execute_wasm(&self, ...) -> PluginResult<R> {
// 阶段 1:执行 WASM,遇到 db_query 时暂停
let (result, pending_ops, pending_queries) = tokio::task::spawn_blocking(move || {
// WASM 执行中遇到 db_query 时,收集查询参数并设置标志
// 标志在 HostState 中:self.needs_query = true, self.query_params = ...
// WASM 继续执行(db_query 返回空结果集作为占位)
// ...
}).await?;
// 中间阶段:flush writes + execute queries
Self::flush_ops(&self.db, plugin_id, pending_ops, ...).await?;
let query_results = Self::execute_queries(&self.db, plugin_id, pending_queries, ...).await?;
// 阶段 2:如果有待处理的查询,重新执行 WASM(或继续后续逻辑)
// ...
}
最终推荐方案 — 简化版:
实际上最简单的做法是:让 db_query 同步执行真实查询。在 spawn_blocking 中使用 tokio::runtime::Handle 来在阻塞线程中执行异步代码。
fn db_query(&mut self, entity: String, filter: Vec<u8>, pagination: Vec<u8>)
-> Result<Vec<u8>, String>
{
let db = self.db.as_ref().ok_or("数据库连接不可用")?;
// 先 flush pending writes(通过 tokio handle 在阻塞上下文中执行异步)
let rt = tokio::runtime::Handle::current();
let ops = std::mem::take(&mut self.pending_ops);
if !ops.is_empty() {
rt.block_on(Self::flush_ops_static(db, &self.plugin_id, ops,
self.tenant_id, self.user_id, &self.event_bus))
.map_err(|e| format!("flush 失败: {}", e))?;
}
// 解析 filter
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
None
} else {
serde_json::from_slice(&filter).ok()
};
// 构建并执行查询
let table_name = DynamicTableManager::table_name(&self.plugin_id, &entity);
let (sql, values) = DynamicTableManager::build_query_sql(
&table_name, self.tenant_id, filter_val, pagination_val
).map_err(|e| e.to_string())?;
let rows: Vec<serde_json::Value> = rt.block_on(async {
// 执行查询
}).map_err(|e| e.to_string())?;
serde_json::to_vec(&rows).map_err(|e| e.to_string())
}
改动影响:
HostState增加db: Option<DatabaseConnection>和event_bus: Option<EventBus>字段execute_wasm创建 HostState 时传入 db 和 event_busdb_query从忽略参数改为实时查询PluginEngine::new已持有 db 和 event_bus,无需新增依赖
2. crates/erp-plugin/src/dynamic_table.rs
新增 build_query_sql 方法,复用现有 data_service.rs 的查询构建逻辑:
pub fn build_query_sql(
table_name: &str,
tenant_id: Uuid,
filter: Option<serde_json::Value>,
pagination: Option<serde_json::Value>,
) -> Result<(String, Vec<sea_orm::Value>)>
向后兼容
HostState::new()不传 db →db = None→ 走旧的预填充路径execute_wasm()传 db → 走新的实时查询路径- 现有 WASM 插件无需修改(旧路径仍然可用)
改动 2:扩展聚合查询
问题
data_service.rs:655 的 aggregate 方法只支持 GROUP BY + COUNT(*),返回 Vec<(String, i64)>。
方案
扩展聚合函数支持 SUM/AVG/MAX/MIN。
改动文件
1. crates/erp-plugin/src/data_service.rs
新增多聚合函数方法:
pub struct AggregateResult {
pub key: String,
pub metrics: HashMap<String, f64>, // "count" -> 10, "total_amount" -> 5000.0
}
pub async fn aggregate_multi(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &DatabaseConnection,
group_by_field: &str,
aggregations: &[AggregateDef], // [{field: "amount", func: "sum"}, ...]
filter: Option<serde_json::Value>,
scope: Option<DataScopeParams>,
) -> AppResult<Vec<AggregateResult>>
pub struct AggregateDef {
pub field: String,
pub func: AggregateFunc,
}
pub enum AggregateFunc {
Count,
Sum,
Avg,
Min,
Max,
}
SQL 构建示例:
SELECT _f_status as key,
COUNT(*) as count,
COALESCE(SUM(_f_amount), 0) as sum_amount,
COALESCE(AVG(_f_price), 0) as avg_price
FROM plugin_erp_crm__order
WHERE tenant_id = $1 AND deleted_at IS NULL
GROUP BY _f_status
2. crates/erp-plugin/src/dynamic_table.rs
新增 build_aggregate_multi_sql 方法,构建多聚合 SQL。
3. crates/erp-plugin/src/data_handler.rs
扩展聚合 API 端点,接受 aggregations 参数:
POST /api/v1/plugins/{pluginId}/data/{entityName}/aggregate
{
"group_by": "status",
"aggregations": [
{"field": "amount", "func": "sum"},
{"field": "price", "func": "avg"}
],
"filter": {"status": "active"}
}
4. 前端 Dashboard Widget 适配
PluginDashboardPage.tsx 中的 stat_card 和图表 widget 需要适配新的多聚合返回格式。
改动 3:热更新原子回滚
问题
service.rs:578-585 — 升级时先 unload(old) 再 load(new),如果 load 失败,旧版本已不在内存中。
方案:先加载新版本,成功后原子替换
改动文件
crates/erp-plugin/src/service.rs — upgrade 方法:
// 当前(有风险):
engine.unload(plugin_manifest_id).await.ok(); // 旧版本已卸载
engine.load(plugin_manifest_id, &new_wasm, manifest) // 如果这里失败 → 无回滚
.await?;
// 改为(安全):
// 1. 先加载新版本(用临时 key)
let temp_id = format!("{}__upgrade_{}", plugin_manifest_id, Uuid::now_v7());
engine.load(&temp_id, &new_wasm, new_manifest.clone()).await
.map_err(|e| {
tracing::error!(error = %e, "新版本 WASM 加载失败,旧版本仍在运行");
e
})?;
// 2. 卸载旧版本
engine.unload(plugin_manifest_id).await.ok();
// 3. 将新版本从临时 key 改为正式 key
engine.rename_plugin(&temp_id, plugin_manifest_id).await?;
// 4. 更新数据库记录
crates/erp-plugin/src/engine.rs — 新增 rename_plugin 方法:
pub async fn rename_plugin(&self, old_id: &str, new_id: &str) -> PluginResult<()> {
let loaded = self.plugins.remove(old_id)
.ok_or_else(|| PluginError::NotFound(old_id.to_string()))?;
let mut loaded = Arc::try_unwrap(loaded.1)
.map_err(|_| PluginError::ExecutionError("插件仍被引用".to_string()))?;
loaded.id = new_id.to_string();
self.plugins.insert(new_id.to_string(), Arc::new(loaded));
Ok(())
}
改进后的安全保证:
- 新版本加载失败 → 旧版本仍在运行,零停机
- 数据库记录只在 WASM 替换成功后才更新
- 事务性:要么完全切换到新版本,要么保持旧版本
改动 4:Schema 演进(ALTER TABLE 支持)
问题
service.rs:562-575 — 升级时只处理新增实体(CREATE TABLE),不处理已有实体的字段变更。
方案:利用 JSONB 特性实现轻量级 Schema 演进
由于核心数据在 JSONB 的 data 列中,大部分字段变更不需要 DDL:
- 新增字段:JSONB 天然支持,只需更新 manifest
- 新增 filterable/sortable 字段:需要 ALTER TABLE ADD Generated Column + 索引
- 删除字段:JSONB 中多余字段不影响,Generated Column 可保留(无害)
- 重命名字段:添加新 Generated Column,旧的保留
- 修改字段类型:Generated Column 需要 DROP + ADD(JSONB 数据不需要改)
改动文件
crates/erp-plugin/src/service.rs — upgrade 方法增加 schema diff 逻辑:
// 对比 schema 变更
if let Some(new_schema) = &new_manifest.schema {
let old_schema = old_manifest.schema.as_ref();
for new_entity in &new_schema.entities {
let old_entity = old_schema
.and_then(|s| s.entities.iter().find(|e| e.name == new_entity.name));
match old_entity {
None => {
// 全新实体 — CREATE TABLE
DynamicTableManager::create_table(db, plugin_manifest_id, new_entity).await?;
}
Some(old) => {
// 已有实体 — diff 字段
let diff = diff_entity_fields(old, new_entity);
if !diff.new_filterable.is_empty() || !diff.new_sortable.is_empty() {
DynamicTableManager::alter_add_generated_columns(
db, plugin_manifest_id, new_entity, &diff
).await?;
}
}
}
}
}
crates/erp-plugin/src/dynamic_table.rs — 新增:
pub struct FieldDiff {
pub new_filterable: Vec<PluginField>, // 新增的需要 Generated Column 的字段
pub new_sortable: Vec<PluginField>,
pub new_searchable: Vec<PluginField>, // 新增的需要 pg_trgm 索引的字段
}
pub fn diff_entity_fields(old: &PluginEntity, new: &PluginEntity) -> FieldDiff
pub async fn alter_add_generated_columns(
db: &DatabaseConnection,
plugin_id: &str,
entity: &PluginEntity,
diff: &FieldDiff,
) -> PluginResult<()>
ALTER TABLE 示例:
-- 新增 filterable 字段
ALTER TABLE plugin_erp_crm__customer
ADD COLUMN IF NOT EXISTS _f_source TEXT GENERATED ALWAYS AS (data->>'source') STORED;
-- 新增索引
CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source
ON plugin_erp_crm__customer (_f_source) WHERE deleted_at IS NULL;
-- 新增 searchable 字段的 pg_trgm 索引
CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source_trgm
ON plugin_erp_crm__customer USING gin (_f_source gin_trgm_ops)
WHERE deleted_at IS NULL;
实施顺序
| 阶段 | 改动 | 复杂度 | 影响范围 |
|---|---|---|---|
| 1 | 热更新原子回滚 | 低 | engine.rs + service.rs |
| 2 | Schema 演进(ALTER TABLE) | 中低 | service.rs + dynamic_table.rs |
| 3 | 扩展聚合查询 | 中 | data_service.rs + data_handler.rs + dynamic_table.rs |
| 4 | 混合执行模型(查询能力) | 高 | host.rs + engine.rs + dynamic_table.rs |
建议按复杂度从低到高实施,每个阶段独立可验证。
验证方案
阶段 1:热更新回滚
- 准备两个版本的 CRM 插件 WASM(v1.0.0 和 v2.0.0)
- 上传 v2.0.0 但故意让 WASM 二进制损坏
- 验证:旧版本 v1.0.0 仍在正常运行
- 上传正确的 v2.0.0
- 验证:成功切换到 v2.0.0
阶段 2:Schema 演进
- 创建 CRM 插件 v1.0.0(含 customer 实体,3 个字段)
- 升级到 v1.1.0(customer 增加 2 个 filterable 字段 + 1 个新实体 contact)
- 验证:新字段可以过滤/排序,旧数据不受影响
- 在已有数据上验证新 Generated Column 的值正确填充
阶段 3:聚合查询
- 创建测试数据(不同状态的订单,含 amount 字段)
- 调用聚合 API:group_by=status, aggregations=[sum(amount), avg(amount)]
- 验证返回结果正确
- 前端 Dashboard stat_card 展示正确的聚合数据
阶段 4:混合执行模型
- 在插件 WASM 中调用 db_insert 后立即 db_query
- 验证能读取到刚插入的数据(读后写一致性)
- 验证带 filter 参数的 db_query 返回正确过滤结果
- 验证旧插件(使用预填充模式)仍能正常工作
- 压力测试:多次连续 db_query 不超过 Fuel 限制
关键文件清单
| 文件 | 改动类型 |
|---|---|
crates/erp-plugin/src/host.rs |
重构 db_query + 新增 db/事件总线字段 |
crates/erp-plugin/src/engine.rs |
调整 execute_wasm + 新增 rename_plugin |
crates/erp-plugin/src/service.rs |
升级流程回滚安全 + schema diff |
crates/erp-plugin/src/dynamic_table.rs |
新增 build_query_sql + alter_add_generated_columns + diff_entity_fields |
crates/erp-plugin/src/data_service.rs |
新增 aggregate_multi + AggregateDef |
crates/erp-plugin/src/data_handler.rs |
扩展聚合 API |
apps/web/src/pages/PluginDashboardPage.tsx |
适配多聚合返回格式 |
可复用的现有函数
DynamicTableManager::build_query_sql— 可复用data_service.rs中的查询构建逻辑DynamicTableManager::build_insert_sql— flush 时已有,无需改动sanitize_identifier— 已有,用于新字段名的安全检查flush_ops— 已有事务性 flush 逻辑,混合模型中复用