489 lines
16 KiB
Markdown
489 lines
16 KiB
Markdown
# 插件系统增强设计规格
|
||
|
||
## Context
|
||
|
||
插件系统是 ERP 平台的核心差异化能力,当前声明式层面(manifest schema、动态表、前端页面)已达 90% 成熟度。但 WASM 逻辑层存在根本性限制:
|
||
|
||
1. **插件无法自主查询数据** — `db_query` 的 filter/pagination 参数被忽略,只能使用预填充结果
|
||
2. **无读后写一致性** — 延迟刷新模型导致插件在一次调用中无法读取自己刚写入的数据
|
||
3. **聚合只有 COUNT** — 缺少 SUM/AVG/MAX/MIN,无法支撑财务、统计类场景
|
||
4. **热更新无原子回滚** — 旧版本先卸载再加载新版本,中间失败无保障
|
||
5. **Schema 变更只支持新增实体** — 不支持已有实体的字段演进
|
||
|
||
这些限制使插件系统只能支撑"数据管理+展示"型轻量场景(CRM、简单进销存),无法支撑需要复杂业务逻辑的行业(财务、制造、电商)。
|
||
|
||
本次增强的目标:**让插件逻辑层从 40% 提升到 80%+,使系统能真正承载不同行业的定制化需求。**
|
||
|
||
---
|
||
|
||
## 改动 1:混合执行模型(解决查询和读后写一致性)
|
||
|
||
### 问题
|
||
|
||
`host.rs:99-109` — `db_query` 忽略 `_filter` 和 `_pagination` 参数,只从 `query_results` 预填充缓存取数据。插件无法自主构造查询。
|
||
|
||
### 方案:读操作走实时 SQL + 写操作保持延迟批量 + 读前自动 flush
|
||
|
||
**核心流程变更:**
|
||
|
||
```
|
||
当前:
|
||
WASM 调用 db_insert() → 入队 pending_ops
|
||
WASM 调用 db_query() → 从预填充缓存读(忽略 filter/pagination)
|
||
WASM 结束 → flush 全部 pending_ops
|
||
|
||
改为:
|
||
WASM 调用 db_insert() → 入队 pending_ops
|
||
WASM 调用 db_query() → 先 flush pending_ops → 执行真实 SQL 查询 → 返回结果
|
||
WASM 结束 → flush 剩余 pending_ops
|
||
```
|
||
|
||
### 改动文件
|
||
|
||
#### 1. `crates/erp-plugin/src/host.rs`
|
||
|
||
**HostState 新增字段:**
|
||
|
||
```rust
|
||
pub struct HostState {
|
||
// ... 现有字段保留 ...
|
||
// 新增:用于实时查询的数据服务引用和数据库连接
|
||
pub(crate) db: Option<DatabaseConnection>,
|
||
pub(crate) data_service_ready: bool,
|
||
}
|
||
```
|
||
|
||
**db_query 实现变更:**
|
||
|
||
```rust
|
||
fn db_query(&mut self, entity: String, filter: Vec<u8>, pagination: Vec<u8>)
|
||
-> Result<Vec<u8>, String>
|
||
{
|
||
// 如果没有数据库连接(向后兼容预填充模式),走旧路径
|
||
if self.db.is_none() {
|
||
return self.query_results
|
||
.get(&entity)
|
||
.cloned()
|
||
.ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity));
|
||
}
|
||
|
||
// 解析 filter 和 pagination 参数
|
||
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
|
||
None
|
||
} else {
|
||
serde_json::from_slice(&filter).ok()
|
||
};
|
||
|
||
let pagination_val: Option<serde_json::Value> = if pagination.is_empty() {
|
||
None
|
||
} else {
|
||
serde_json::from_slice(&pagination).ok()
|
||
};
|
||
|
||
// 先同步 flush pending writes(确保读后写一致性)
|
||
// 注意:在 WASM 的 spawn_blocking 上下文中,需要同步执行
|
||
// 方案:将 pending_ops 暂存到临时变量,由调用方在 execute_wasm 中处理
|
||
|
||
// 使用 pre_query_ops 标记,让 engine 在 execute_wasm 中间阶段 flush
|
||
self.pre_query_ops = std::mem::take(&mut self.pending_ops);
|
||
self.pending_query = Some(PendingQuery { entity, filter_val, pagination_val });
|
||
|
||
// 返回占位符 — 真正的查询在 execute_wasm 的两阶段执行中完成
|
||
Ok(serde_json::to_vec(&serde_json::json!({"status": "query_pending"})).unwrap_or_default())
|
||
}
|
||
```
|
||
|
||
**实际实现策略 — 采用回调模式:**
|
||
|
||
由于 `db_query` 在 `spawn_blocking` 内执行,不能直接 await 异步数据库操作。采用两阶段执行:
|
||
|
||
1. WASM 执行期间:`db_query` 收集查询参数,设置 `needs_flush_and_query = true`
|
||
2. `execute_wasm` 的 `spawn_blocking` 结束后:检查标志,如果需要查询则:
|
||
- flush pending_ops
|
||
- 执行查询
|
||
- 用查询结果重新调用 WASM(继续执行后续逻辑)
|
||
|
||
**更好的方案 — 分段执行:**
|
||
|
||
将 `execute_wasm` 改为分段执行模型:
|
||
|
||
```rust
|
||
async fn execute_wasm(&self, ...) -> PluginResult<R> {
|
||
// 阶段 1:执行 WASM,遇到 db_query 时暂停
|
||
let (result, pending_ops, pending_queries) = tokio::task::spawn_blocking(move || {
|
||
// WASM 执行中遇到 db_query 时,收集查询参数并设置标志
|
||
// 标志在 HostState 中:self.needs_query = true, self.query_params = ...
|
||
// WASM 继续执行(db_query 返回空结果集作为占位)
|
||
// ...
|
||
}).await?;
|
||
|
||
// 中间阶段:flush writes + execute queries
|
||
Self::flush_ops(&self.db, plugin_id, pending_ops, ...).await?;
|
||
let query_results = Self::execute_queries(&self.db, plugin_id, pending_queries, ...).await?;
|
||
|
||
// 阶段 2:如果有待处理的查询,重新执行 WASM(或继续后续逻辑)
|
||
// ...
|
||
}
|
||
```
|
||
|
||
**最终推荐方案 — 简化版:**
|
||
|
||
实际上最简单的做法是:**让 db_query 同步执行真实查询**。在 `spawn_blocking` 中使用 `tokio::runtime::Handle` 来在阻塞线程中执行异步代码。
|
||
|
||
```rust
|
||
fn db_query(&mut self, entity: String, filter: Vec<u8>, pagination: Vec<u8>)
|
||
-> Result<Vec<u8>, String>
|
||
{
|
||
let db = self.db.as_ref().ok_or("数据库连接不可用")?;
|
||
|
||
// 先 flush pending writes(通过 tokio handle 在阻塞上下文中执行异步)
|
||
let rt = tokio::runtime::Handle::current();
|
||
let ops = std::mem::take(&mut self.pending_ops);
|
||
if !ops.is_empty() {
|
||
rt.block_on(Self::flush_ops_static(db, &self.plugin_id, ops,
|
||
self.tenant_id, self.user_id, &self.event_bus))
|
||
.map_err(|e| format!("flush 失败: {}", e))?;
|
||
}
|
||
|
||
// 解析 filter
|
||
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
|
||
None
|
||
} else {
|
||
serde_json::from_slice(&filter).ok()
|
||
};
|
||
|
||
// 构建并执行查询
|
||
let table_name = DynamicTableManager::table_name(&self.plugin_id, &entity);
|
||
let (sql, values) = DynamicTableManager::build_query_sql(
|
||
&table_name, self.tenant_id, filter_val, pagination_val
|
||
).map_err(|e| e.to_string())?;
|
||
|
||
let rows: Vec<serde_json::Value> = rt.block_on(async {
|
||
// 执行查询
|
||
}).map_err(|e| e.to_string())?;
|
||
|
||
serde_json::to_vec(&rows).map_err(|e| e.to_string())
|
||
}
|
||
```
|
||
|
||
**改动影响:**
|
||
- `HostState` 增加 `db: Option<DatabaseConnection>` 和 `event_bus: Option<EventBus>` 字段
|
||
- `execute_wasm` 创建 HostState 时传入 db 和 event_bus
|
||
- `db_query` 从忽略参数改为实时查询
|
||
- `PluginEngine::new` 已持有 db 和 event_bus,无需新增依赖
|
||
|
||
#### 2. `crates/erp-plugin/src/dynamic_table.rs`
|
||
|
||
新增 `build_query_sql` 方法,复用现有 `data_service.rs` 的查询构建逻辑:
|
||
|
||
```rust
|
||
pub fn build_query_sql(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
filter: Option<serde_json::Value>,
|
||
pagination: Option<serde_json::Value>,
|
||
) -> Result<(String, Vec<sea_orm::Value>)>
|
||
```
|
||
|
||
### 向后兼容
|
||
|
||
- `HostState::new()` 不传 db → `db = None` → 走旧的预填充路径
|
||
- `execute_wasm()` 传 db → 走新的实时查询路径
|
||
- 现有 WASM 插件无需修改(旧路径仍然可用)
|
||
|
||
---
|
||
|
||
## 改动 2:扩展聚合查询
|
||
|
||
### 问题
|
||
|
||
`data_service.rs:655` 的 `aggregate` 方法只支持 `GROUP BY + COUNT(*)`,返回 `Vec<(String, i64)>`。
|
||
|
||
### 方案
|
||
|
||
扩展聚合函数支持 SUM/AVG/MAX/MIN。
|
||
|
||
#### 改动文件
|
||
|
||
**1. `crates/erp-plugin/src/data_service.rs`**
|
||
|
||
新增多聚合函数方法:
|
||
|
||
```rust
|
||
pub struct AggregateResult {
|
||
pub key: String,
|
||
pub metrics: HashMap<String, f64>, // "count" -> 10, "total_amount" -> 5000.0
|
||
}
|
||
|
||
pub async fn aggregate_multi(
|
||
plugin_id: Uuid,
|
||
entity_name: &str,
|
||
tenant_id: Uuid,
|
||
db: &DatabaseConnection,
|
||
group_by_field: &str,
|
||
aggregations: &[AggregateDef], // [{field: "amount", func: "sum"}, ...]
|
||
filter: Option<serde_json::Value>,
|
||
scope: Option<DataScopeParams>,
|
||
) -> AppResult<Vec<AggregateResult>>
|
||
|
||
pub struct AggregateDef {
|
||
pub field: String,
|
||
pub func: AggregateFunc,
|
||
}
|
||
|
||
pub enum AggregateFunc {
|
||
Count,
|
||
Sum,
|
||
Avg,
|
||
Min,
|
||
Max,
|
||
}
|
||
```
|
||
|
||
SQL 构建示例:
|
||
|
||
```sql
|
||
SELECT _f_status as key,
|
||
COUNT(*) as count,
|
||
COALESCE(SUM(_f_amount), 0) as sum_amount,
|
||
COALESCE(AVG(_f_price), 0) as avg_price
|
||
FROM plugin_erp_crm__order
|
||
WHERE tenant_id = $1 AND deleted_at IS NULL
|
||
GROUP BY _f_status
|
||
```
|
||
|
||
**2. `crates/erp-plugin/src/dynamic_table.rs`**
|
||
|
||
新增 `build_aggregate_multi_sql` 方法,构建多聚合 SQL。
|
||
|
||
**3. `crates/erp-plugin/src/data_handler.rs`**
|
||
|
||
扩展聚合 API 端点,接受 `aggregations` 参数:
|
||
|
||
```json
|
||
POST /api/v1/plugins/{pluginId}/data/{entityName}/aggregate
|
||
{
|
||
"group_by": "status",
|
||
"aggregations": [
|
||
{"field": "amount", "func": "sum"},
|
||
{"field": "price", "func": "avg"}
|
||
],
|
||
"filter": {"status": "active"}
|
||
}
|
||
```
|
||
|
||
**4. 前端 Dashboard Widget 适配**
|
||
|
||
`PluginDashboardPage.tsx` 中的 `stat_card` 和图表 widget 需要适配新的多聚合返回格式。
|
||
|
||
---
|
||
|
||
## 改动 3:热更新原子回滚
|
||
|
||
### 问题
|
||
|
||
`service.rs:578-585` — 升级时先 `unload(old)` 再 `load(new)`,如果 `load` 失败,旧版本已不在内存中。
|
||
|
||
### 方案:先加载新版本,成功后原子替换
|
||
|
||
#### 改动文件
|
||
|
||
**`crates/erp-plugin/src/service.rs`** — `upgrade` 方法:
|
||
|
||
```rust
|
||
// 当前(有风险):
|
||
engine.unload(plugin_manifest_id).await.ok(); // 旧版本已卸载
|
||
engine.load(plugin_manifest_id, &new_wasm, manifest) // 如果这里失败 → 无回滚
|
||
.await?;
|
||
|
||
// 改为(安全):
|
||
// 1. 先加载新版本(用临时 key)
|
||
let temp_id = format!("{}__upgrade_{}", plugin_manifest_id, Uuid::now_v7());
|
||
engine.load(&temp_id, &new_wasm, new_manifest.clone()).await
|
||
.map_err(|e| {
|
||
tracing::error!(error = %e, "新版本 WASM 加载失败,旧版本仍在运行");
|
||
e
|
||
})?;
|
||
|
||
// 2. 卸载旧版本
|
||
engine.unload(plugin_manifest_id).await.ok();
|
||
|
||
// 3. 将新版本从临时 key 改为正式 key
|
||
engine.rename_plugin(&temp_id, plugin_manifest_id).await?;
|
||
|
||
// 4. 更新数据库记录
|
||
```
|
||
|
||
**`crates/erp-plugin/src/engine.rs`** — 新增 `rename_plugin` 方法:
|
||
|
||
```rust
|
||
pub async fn rename_plugin(&self, old_id: &str, new_id: &str) -> PluginResult<()> {
|
||
let loaded = self.plugins.remove(old_id)
|
||
.ok_or_else(|| PluginError::NotFound(old_id.to_string()))?;
|
||
let mut loaded = Arc::try_unwrap(loaded.1)
|
||
.map_err(|_| PluginError::ExecutionError("插件仍被引用".to_string()))?;
|
||
loaded.id = new_id.to_string();
|
||
self.plugins.insert(new_id.to_string(), Arc::new(loaded));
|
||
Ok(())
|
||
}
|
||
```
|
||
|
||
**改进后的安全保证:**
|
||
- 新版本加载失败 → 旧版本仍在运行,零停机
|
||
- 数据库记录只在 WASM 替换成功后才更新
|
||
- 事务性:要么完全切换到新版本,要么保持旧版本
|
||
|
||
---
|
||
|
||
## 改动 4:Schema 演进(ALTER TABLE 支持)
|
||
|
||
### 问题
|
||
|
||
`service.rs:562-575` — 升级时只处理新增实体(CREATE TABLE),不处理已有实体的字段变更。
|
||
|
||
### 方案:利用 JSONB 特性实现轻量级 Schema 演进
|
||
|
||
由于核心数据在 JSONB 的 `data` 列中,大部分字段变更不需要 DDL:
|
||
|
||
- **新增字段**:JSONB 天然支持,只需更新 manifest
|
||
- **新增 filterable/sortable 字段**:需要 ALTER TABLE ADD Generated Column + 索引
|
||
- **删除字段**:JSONB 中多余字段不影响,Generated Column 可保留(无害)
|
||
- **重命名字段**:添加新 Generated Column,旧的保留
|
||
- **修改字段类型**:Generated Column 需要 DROP + ADD(JSONB 数据不需要改)
|
||
|
||
#### 改动文件
|
||
|
||
**`crates/erp-plugin/src/service.rs`** — `upgrade` 方法增加 schema diff 逻辑:
|
||
|
||
```rust
|
||
// 对比 schema 变更
|
||
if let Some(new_schema) = &new_manifest.schema {
|
||
let old_schema = old_manifest.schema.as_ref();
|
||
|
||
for new_entity in &new_schema.entities {
|
||
let old_entity = old_schema
|
||
.and_then(|s| s.entities.iter().find(|e| e.name == new_entity.name));
|
||
|
||
match old_entity {
|
||
None => {
|
||
// 全新实体 — CREATE TABLE
|
||
DynamicTableManager::create_table(db, plugin_manifest_id, new_entity).await?;
|
||
}
|
||
Some(old) => {
|
||
// 已有实体 — diff 字段
|
||
let diff = diff_entity_fields(old, new_entity);
|
||
if !diff.new_filterable.is_empty() || !diff.new_sortable.is_empty() {
|
||
DynamicTableManager::alter_add_generated_columns(
|
||
db, plugin_manifest_id, new_entity, &diff
|
||
).await?;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**`crates/erp-plugin/src/dynamic_table.rs`** — 新增:
|
||
|
||
```rust
|
||
pub struct FieldDiff {
|
||
pub new_filterable: Vec<PluginField>, // 新增的需要 Generated Column 的字段
|
||
pub new_sortable: Vec<PluginField>,
|
||
pub new_searchable: Vec<PluginField>, // 新增的需要 pg_trgm 索引的字段
|
||
}
|
||
|
||
pub fn diff_entity_fields(old: &PluginEntity, new: &PluginEntity) -> FieldDiff
|
||
|
||
pub async fn alter_add_generated_columns(
|
||
db: &DatabaseConnection,
|
||
plugin_id: &str,
|
||
entity: &PluginEntity,
|
||
diff: &FieldDiff,
|
||
) -> PluginResult<()>
|
||
```
|
||
|
||
ALTER TABLE 示例:
|
||
|
||
```sql
|
||
-- 新增 filterable 字段
|
||
ALTER TABLE plugin_erp_crm__customer
|
||
ADD COLUMN IF NOT EXISTS _f_source TEXT GENERATED ALWAYS AS (data->>'source') STORED;
|
||
|
||
-- 新增索引
|
||
CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source
|
||
ON plugin_erp_crm__customer (_f_source) WHERE deleted_at IS NULL;
|
||
|
||
-- 新增 searchable 字段的 pg_trgm 索引
|
||
CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source_trgm
|
||
ON plugin_erp_crm__customer USING gin (_f_source gin_trgm_ops)
|
||
WHERE deleted_at IS NULL;
|
||
```
|
||
|
||
---
|
||
|
||
## 实施顺序
|
||
|
||
| 阶段 | 改动 | 复杂度 | 影响范围 |
|
||
|------|------|--------|---------|
|
||
| 1 | 热更新原子回滚 | 低 | engine.rs + service.rs |
|
||
| 2 | Schema 演进(ALTER TABLE) | 中低 | service.rs + dynamic_table.rs |
|
||
| 3 | 扩展聚合查询 | 中 | data_service.rs + data_handler.rs + dynamic_table.rs |
|
||
| 4 | 混合执行模型(查询能力) | 高 | host.rs + engine.rs + dynamic_table.rs |
|
||
|
||
建议按复杂度从低到高实施,每个阶段独立可验证。
|
||
|
||
---
|
||
|
||
## 验证方案
|
||
|
||
### 阶段 1:热更新回滚
|
||
|
||
1. 准备两个版本的 CRM 插件 WASM(v1.0.0 和 v2.0.0)
|
||
2. 上传 v2.0.0 但故意让 WASM 二进制损坏
|
||
3. 验证:旧版本 v1.0.0 仍在正常运行
|
||
4. 上传正确的 v2.0.0
|
||
5. 验证:成功切换到 v2.0.0
|
||
|
||
### 阶段 2:Schema 演进
|
||
|
||
1. 创建 CRM 插件 v1.0.0(含 customer 实体,3 个字段)
|
||
2. 升级到 v1.1.0(customer 增加 2 个 filterable 字段 + 1 个新实体 contact)
|
||
3. 验证:新字段可以过滤/排序,旧数据不受影响
|
||
4. 在已有数据上验证新 Generated Column 的值正确填充
|
||
|
||
### 阶段 3:聚合查询
|
||
|
||
1. 创建测试数据(不同状态的订单,含 amount 字段)
|
||
2. 调用聚合 API:group_by=status, aggregations=[sum(amount), avg(amount)]
|
||
3. 验证返回结果正确
|
||
4. 前端 Dashboard stat_card 展示正确的聚合数据
|
||
|
||
### 阶段 4:混合执行模型
|
||
|
||
1. 在插件 WASM 中调用 db_insert 后立即 db_query
|
||
2. 验证能读取到刚插入的数据(读后写一致性)
|
||
3. 验证带 filter 参数的 db_query 返回正确过滤结果
|
||
4. 验证旧插件(使用预填充模式)仍能正常工作
|
||
5. 压力测试:多次连续 db_query 不超过 Fuel 限制
|
||
|
||
---
|
||
|
||
## 关键文件清单
|
||
|
||
| 文件 | 改动类型 |
|
||
|------|---------|
|
||
| `crates/erp-plugin/src/host.rs` | 重构 db_query + 新增 db/事件总线字段 |
|
||
| `crates/erp-plugin/src/engine.rs` | 调整 execute_wasm + 新增 rename_plugin |
|
||
| `crates/erp-plugin/src/service.rs` | 升级流程回滚安全 + schema diff |
|
||
| `crates/erp-plugin/src/dynamic_table.rs` | 新增 build_query_sql + alter_add_generated_columns + diff_entity_fields |
|
||
| `crates/erp-plugin/src/data_service.rs` | 新增 aggregate_multi + AggregateDef |
|
||
| `crates/erp-plugin/src/data_handler.rs` | 扩展聚合 API |
|
||
| `apps/web/src/pages/PluginDashboardPage.tsx` | 适配多聚合返回格式 |
|
||
|
||
### 可复用的现有函数
|
||
|
||
- `DynamicTableManager::build_query_sql` — 可复用 `data_service.rs` 中的查询构建逻辑
|
||
- `DynamicTableManager::build_insert_sql` — flush 时已有,无需改动
|
||
- `sanitize_identifier` — 已有,用于新字段名的安全检查
|
||
- `flush_ops` — 已有事务性 flush 逻辑,混合模型中复用
|