# 插件系统增强设计规格 ## Context 插件系统是 ERP 平台的核心差异化能力,当前声明式层面(manifest schema、动态表、前端页面)已达 90% 成熟度。但 WASM 逻辑层存在根本性限制: 1. **插件无法自主查询数据** — `db_query` 的 filter/pagination 参数被忽略,只能使用预填充结果 2. **无读后写一致性** — 延迟刷新模型导致插件在一次调用中无法读取自己刚写入的数据 3. **聚合只有 COUNT** — 缺少 SUM/AVG/MAX/MIN,无法支撑财务、统计类场景 4. **热更新无原子回滚** — 旧版本先卸载再加载新版本,中间失败无保障 5. **Schema 变更只支持新增实体** — 不支持已有实体的字段演进 这些限制使插件系统只能支撑"数据管理+展示"型轻量场景(CRM、简单进销存),无法支撑需要复杂业务逻辑的行业(财务、制造、电商)。 本次增强的目标:**让插件逻辑层从 40% 提升到 80%+,使系统能真正承载不同行业的定制化需求。** --- ## 改动 1:混合执行模型(解决查询和读后写一致性) ### 问题 `host.rs:99-109` — `db_query` 忽略 `_filter` 和 `_pagination` 参数,只从 `query_results` 预填充缓存取数据。插件无法自主构造查询。 ### 方案:读操作走实时 SQL + 写操作保持延迟批量 + 读前自动 flush **核心流程变更:** ``` 当前: WASM 调用 db_insert() → 入队 pending_ops WASM 调用 db_query() → 从预填充缓存读(忽略 filter/pagination) WASM 结束 → flush 全部 pending_ops 改为: WASM 调用 db_insert() → 入队 pending_ops WASM 调用 db_query() → 先 flush pending_ops → 执行真实 SQL 查询 → 返回结果 WASM 结束 → flush 剩余 pending_ops ``` ### 改动文件 #### 1. `crates/erp-plugin/src/host.rs` **HostState 新增字段:** ```rust pub struct HostState { // ... 现有字段保留 ... // 新增:用于实时查询的数据服务引用和数据库连接 pub(crate) db: Option, pub(crate) data_service_ready: bool, } ``` **db_query 实现变更:** ```rust fn db_query(&mut self, entity: String, filter: Vec, pagination: Vec) -> Result, String> { // 如果没有数据库连接(向后兼容预填充模式),走旧路径 if self.db.is_none() { return self.query_results .get(&entity) .cloned() .ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity)); } // 解析 filter 和 pagination 参数 let filter_val: Option = if filter.is_empty() { None } else { serde_json::from_slice(&filter).ok() }; let pagination_val: Option = if pagination.is_empty() { None } else { serde_json::from_slice(&pagination).ok() }; // 先同步 flush pending writes(确保读后写一致性) // 注意:在 WASM 的 spawn_blocking 上下文中,需要同步执行 // 方案:将 pending_ops 暂存到临时变量,由调用方在 execute_wasm 中处理 // 使用 pre_query_ops 标记,让 engine 在 execute_wasm 中间阶段 flush self.pre_query_ops = std::mem::take(&mut self.pending_ops); self.pending_query = Some(PendingQuery { entity, filter_val, pagination_val }); // 返回占位符 — 真正的查询在 execute_wasm 的两阶段执行中完成 Ok(serde_json::to_vec(&serde_json::json!({"status": "query_pending"})).unwrap_or_default()) } ``` **实际实现策略 — 采用回调模式:** 由于 `db_query` 在 `spawn_blocking` 内执行,不能直接 await 异步数据库操作。采用两阶段执行: 1. WASM 执行期间:`db_query` 收集查询参数,设置 `needs_flush_and_query = true` 2. `execute_wasm` 的 `spawn_blocking` 结束后:检查标志,如果需要查询则: - flush pending_ops - 执行查询 - 用查询结果重新调用 WASM(继续执行后续逻辑) **更好的方案 — 分段执行:** 将 `execute_wasm` 改为分段执行模型: ```rust async fn execute_wasm(&self, ...) -> PluginResult { // 阶段 1:执行 WASM,遇到 db_query 时暂停 let (result, pending_ops, pending_queries) = tokio::task::spawn_blocking(move || { // WASM 执行中遇到 db_query 时,收集查询参数并设置标志 // 标志在 HostState 中:self.needs_query = true, self.query_params = ... // WASM 继续执行(db_query 返回空结果集作为占位) // ... }).await?; // 中间阶段:flush writes + execute queries Self::flush_ops(&self.db, plugin_id, pending_ops, ...).await?; let query_results = Self::execute_queries(&self.db, plugin_id, pending_queries, ...).await?; // 阶段 2:如果有待处理的查询,重新执行 WASM(或继续后续逻辑) // ... } ``` **最终推荐方案 — 简化版:** 实际上最简单的做法是:**让 db_query 同步执行真实查询**。在 `spawn_blocking` 中使用 `tokio::runtime::Handle` 来在阻塞线程中执行异步代码。 ```rust fn db_query(&mut self, entity: String, filter: Vec, pagination: Vec) -> Result, String> { let db = self.db.as_ref().ok_or("数据库连接不可用")?; // 先 flush pending writes(通过 tokio handle 在阻塞上下文中执行异步) let rt = tokio::runtime::Handle::current(); let ops = std::mem::take(&mut self.pending_ops); if !ops.is_empty() { rt.block_on(Self::flush_ops_static(db, &self.plugin_id, ops, self.tenant_id, self.user_id, &self.event_bus)) .map_err(|e| format!("flush 失败: {}", e))?; } // 解析 filter let filter_val: Option = if filter.is_empty() { None } else { serde_json::from_slice(&filter).ok() }; // 构建并执行查询 let table_name = DynamicTableManager::table_name(&self.plugin_id, &entity); let (sql, values) = DynamicTableManager::build_query_sql( &table_name, self.tenant_id, filter_val, pagination_val ).map_err(|e| e.to_string())?; let rows: Vec = rt.block_on(async { // 执行查询 }).map_err(|e| e.to_string())?; serde_json::to_vec(&rows).map_err(|e| e.to_string()) } ``` **改动影响:** - `HostState` 增加 `db: Option` 和 `event_bus: Option` 字段 - `execute_wasm` 创建 HostState 时传入 db 和 event_bus - `db_query` 从忽略参数改为实时查询 - `PluginEngine::new` 已持有 db 和 event_bus,无需新增依赖 #### 2. `crates/erp-plugin/src/dynamic_table.rs` 新增 `build_query_sql` 方法,复用现有 `data_service.rs` 的查询构建逻辑: ```rust pub fn build_query_sql( table_name: &str, tenant_id: Uuid, filter: Option, pagination: Option, ) -> Result<(String, Vec)> ``` ### 向后兼容 - `HostState::new()` 不传 db → `db = None` → 走旧的预填充路径 - `execute_wasm()` 传 db → 走新的实时查询路径 - 现有 WASM 插件无需修改(旧路径仍然可用) --- ## 改动 2:扩展聚合查询 ### 问题 `data_service.rs:655` 的 `aggregate` 方法只支持 `GROUP BY + COUNT(*)`,返回 `Vec<(String, i64)>`。 ### 方案 扩展聚合函数支持 SUM/AVG/MAX/MIN。 #### 改动文件 **1. `crates/erp-plugin/src/data_service.rs`** 新增多聚合函数方法: ```rust pub struct AggregateResult { pub key: String, pub metrics: HashMap, // "count" -> 10, "total_amount" -> 5000.0 } pub async fn aggregate_multi( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &DatabaseConnection, group_by_field: &str, aggregations: &[AggregateDef], // [{field: "amount", func: "sum"}, ...] filter: Option, scope: Option, ) -> AppResult> pub struct AggregateDef { pub field: String, pub func: AggregateFunc, } pub enum AggregateFunc { Count, Sum, Avg, Min, Max, } ``` SQL 构建示例: ```sql SELECT _f_status as key, COUNT(*) as count, COALESCE(SUM(_f_amount), 0) as sum_amount, COALESCE(AVG(_f_price), 0) as avg_price FROM plugin_erp_crm__order WHERE tenant_id = $1 AND deleted_at IS NULL GROUP BY _f_status ``` **2. `crates/erp-plugin/src/dynamic_table.rs`** 新增 `build_aggregate_multi_sql` 方法,构建多聚合 SQL。 **3. `crates/erp-plugin/src/data_handler.rs`** 扩展聚合 API 端点,接受 `aggregations` 参数: ```json POST /api/v1/plugins/{pluginId}/data/{entityName}/aggregate { "group_by": "status", "aggregations": [ {"field": "amount", "func": "sum"}, {"field": "price", "func": "avg"} ], "filter": {"status": "active"} } ``` **4. 前端 Dashboard Widget 适配** `PluginDashboardPage.tsx` 中的 `stat_card` 和图表 widget 需要适配新的多聚合返回格式。 --- ## 改动 3:热更新原子回滚 ### 问题 `service.rs:578-585` — 升级时先 `unload(old)` 再 `load(new)`,如果 `load` 失败,旧版本已不在内存中。 ### 方案:先加载新版本,成功后原子替换 #### 改动文件 **`crates/erp-plugin/src/service.rs`** — `upgrade` 方法: ```rust // 当前(有风险): engine.unload(plugin_manifest_id).await.ok(); // 旧版本已卸载 engine.load(plugin_manifest_id, &new_wasm, manifest) // 如果这里失败 → 无回滚 .await?; // 改为(安全): // 1. 先加载新版本(用临时 key) let temp_id = format!("{}__upgrade_{}", plugin_manifest_id, Uuid::now_v7()); engine.load(&temp_id, &new_wasm, new_manifest.clone()).await .map_err(|e| { tracing::error!(error = %e, "新版本 WASM 加载失败,旧版本仍在运行"); e })?; // 2. 卸载旧版本 engine.unload(plugin_manifest_id).await.ok(); // 3. 将新版本从临时 key 改为正式 key engine.rename_plugin(&temp_id, plugin_manifest_id).await?; // 4. 更新数据库记录 ``` **`crates/erp-plugin/src/engine.rs`** — 新增 `rename_plugin` 方法: ```rust pub async fn rename_plugin(&self, old_id: &str, new_id: &str) -> PluginResult<()> { let loaded = self.plugins.remove(old_id) .ok_or_else(|| PluginError::NotFound(old_id.to_string()))?; let mut loaded = Arc::try_unwrap(loaded.1) .map_err(|_| PluginError::ExecutionError("插件仍被引用".to_string()))?; loaded.id = new_id.to_string(); self.plugins.insert(new_id.to_string(), Arc::new(loaded)); Ok(()) } ``` **改进后的安全保证:** - 新版本加载失败 → 旧版本仍在运行,零停机 - 数据库记录只在 WASM 替换成功后才更新 - 事务性:要么完全切换到新版本,要么保持旧版本 --- ## 改动 4:Schema 演进(ALTER TABLE 支持) ### 问题 `service.rs:562-575` — 升级时只处理新增实体(CREATE TABLE),不处理已有实体的字段变更。 ### 方案:利用 JSONB 特性实现轻量级 Schema 演进 由于核心数据在 JSONB 的 `data` 列中,大部分字段变更不需要 DDL: - **新增字段**:JSONB 天然支持,只需更新 manifest - **新增 filterable/sortable 字段**:需要 ALTER TABLE ADD Generated Column + 索引 - **删除字段**:JSONB 中多余字段不影响,Generated Column 可保留(无害) - **重命名字段**:添加新 Generated Column,旧的保留 - **修改字段类型**:Generated Column 需要 DROP + ADD(JSONB 数据不需要改) #### 改动文件 **`crates/erp-plugin/src/service.rs`** — `upgrade` 方法增加 schema diff 逻辑: ```rust // 对比 schema 变更 if let Some(new_schema) = &new_manifest.schema { let old_schema = old_manifest.schema.as_ref(); for new_entity in &new_schema.entities { let old_entity = old_schema .and_then(|s| s.entities.iter().find(|e| e.name == new_entity.name)); match old_entity { None => { // 全新实体 — CREATE TABLE DynamicTableManager::create_table(db, plugin_manifest_id, new_entity).await?; } Some(old) => { // 已有实体 — diff 字段 let diff = diff_entity_fields(old, new_entity); if !diff.new_filterable.is_empty() || !diff.new_sortable.is_empty() { DynamicTableManager::alter_add_generated_columns( db, plugin_manifest_id, new_entity, &diff ).await?; } } } } } ``` **`crates/erp-plugin/src/dynamic_table.rs`** — 新增: ```rust pub struct FieldDiff { pub new_filterable: Vec, // 新增的需要 Generated Column 的字段 pub new_sortable: Vec, pub new_searchable: Vec, // 新增的需要 pg_trgm 索引的字段 } pub fn diff_entity_fields(old: &PluginEntity, new: &PluginEntity) -> FieldDiff pub async fn alter_add_generated_columns( db: &DatabaseConnection, plugin_id: &str, entity: &PluginEntity, diff: &FieldDiff, ) -> PluginResult<()> ``` ALTER TABLE 示例: ```sql -- 新增 filterable 字段 ALTER TABLE plugin_erp_crm__customer ADD COLUMN IF NOT EXISTS _f_source TEXT GENERATED ALWAYS AS (data->>'source') STORED; -- 新增索引 CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source ON plugin_erp_crm__customer (_f_source) WHERE deleted_at IS NULL; -- 新增 searchable 字段的 pg_trgm 索引 CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source_trgm ON plugin_erp_crm__customer USING gin (_f_source gin_trgm_ops) WHERE deleted_at IS NULL; ``` --- ## 实施顺序 | 阶段 | 改动 | 复杂度 | 影响范围 | |------|------|--------|---------| | 1 | 热更新原子回滚 | 低 | engine.rs + service.rs | | 2 | Schema 演进(ALTER TABLE) | 中低 | service.rs + dynamic_table.rs | | 3 | 扩展聚合查询 | 中 | data_service.rs + data_handler.rs + dynamic_table.rs | | 4 | 混合执行模型(查询能力) | 高 | host.rs + engine.rs + dynamic_table.rs | 建议按复杂度从低到高实施,每个阶段独立可验证。 --- ## 验证方案 ### 阶段 1:热更新回滚 1. 准备两个版本的 CRM 插件 WASM(v1.0.0 和 v2.0.0) 2. 上传 v2.0.0 但故意让 WASM 二进制损坏 3. 验证:旧版本 v1.0.0 仍在正常运行 4. 上传正确的 v2.0.0 5. 验证:成功切换到 v2.0.0 ### 阶段 2:Schema 演进 1. 创建 CRM 插件 v1.0.0(含 customer 实体,3 个字段) 2. 升级到 v1.1.0(customer 增加 2 个 filterable 字段 + 1 个新实体 contact) 3. 验证:新字段可以过滤/排序,旧数据不受影响 4. 在已有数据上验证新 Generated Column 的值正确填充 ### 阶段 3:聚合查询 1. 创建测试数据(不同状态的订单,含 amount 字段) 2. 调用聚合 API:group_by=status, aggregations=[sum(amount), avg(amount)] 3. 验证返回结果正确 4. 前端 Dashboard stat_card 展示正确的聚合数据 ### 阶段 4:混合执行模型 1. 在插件 WASM 中调用 db_insert 后立即 db_query 2. 验证能读取到刚插入的数据(读后写一致性) 3. 验证带 filter 参数的 db_query 返回正确过滤结果 4. 验证旧插件(使用预填充模式)仍能正常工作 5. 压力测试:多次连续 db_query 不超过 Fuel 限制 --- ## 关键文件清单 | 文件 | 改动类型 | |------|---------| | `crates/erp-plugin/src/host.rs` | 重构 db_query + 新增 db/事件总线字段 | | `crates/erp-plugin/src/engine.rs` | 调整 execute_wasm + 新增 rename_plugin | | `crates/erp-plugin/src/service.rs` | 升级流程回滚安全 + schema diff | | `crates/erp-plugin/src/dynamic_table.rs` | 新增 build_query_sql + alter_add_generated_columns + diff_entity_fields | | `crates/erp-plugin/src/data_service.rs` | 新增 aggregate_multi + AggregateDef | | `crates/erp-plugin/src/data_handler.rs` | 扩展聚合 API | | `apps/web/src/pages/PluginDashboardPage.tsx` | 适配多聚合返回格式 | ### 可复用的现有函数 - `DynamicTableManager::build_query_sql` — 可复用 `data_service.rs` 中的查询构建逻辑 - `DynamicTableManager::build_insert_sql` — flush 时已有,无需改动 - `sanitize_identifier` — 已有,用于新字段名的安全检查 - `flush_ops` — 已有事务性 flush 逻辑,混合模型中复用