fix(用户管理): 修复用户列表页面加载失败问题
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

修复用户列表页面加载失败导致测试超时的问题,确保页面元素正确渲染
This commit is contained in:
iven
2026-04-19 08:46:28 +08:00
parent 0ee9d22634
commit 841766b168
174 changed files with 26366 additions and 675 deletions

View File

@@ -0,0 +1,488 @@
# 插件系统增强设计规格
## Context
插件系统是 ERP 平台的核心差异化能力当前声明式层面manifest schema、动态表、前端页面已达 90% 成熟度。但 WASM 逻辑层存在根本性限制:
1. **插件无法自主查询数据**`db_query` 的 filter/pagination 参数被忽略,只能使用预填充结果
2. **无读后写一致性** — 延迟刷新模型导致插件在一次调用中无法读取自己刚写入的数据
3. **聚合只有 COUNT** — 缺少 SUM/AVG/MAX/MIN无法支撑财务、统计类场景
4. **热更新无原子回滚** — 旧版本先卸载再加载新版本,中间失败无保障
5. **Schema 变更只支持新增实体** — 不支持已有实体的字段演进
这些限制使插件系统只能支撑"数据管理+展示"型轻量场景CRM、简单进销存无法支撑需要复杂业务逻辑的行业财务、制造、电商
本次增强的目标:**让插件逻辑层从 40% 提升到 80%+,使系统能真正承载不同行业的定制化需求。**
---
## 改动 1混合执行模型解决查询和读后写一致性
### 问题
`host.rs:99-109``db_query` 忽略 `_filter``_pagination` 参数,只从 `query_results` 预填充缓存取数据。插件无法自主构造查询。
### 方案:读操作走实时 SQL + 写操作保持延迟批量 + 读前自动 flush
**核心流程变更:**
```
当前:
WASM 调用 db_insert() → 入队 pending_ops
WASM 调用 db_query() → 从预填充缓存读(忽略 filter/pagination
WASM 结束 → flush 全部 pending_ops
改为:
WASM 调用 db_insert() → 入队 pending_ops
WASM 调用 db_query() → 先 flush pending_ops → 执行真实 SQL 查询 → 返回结果
WASM 结束 → flush 剩余 pending_ops
```
### 改动文件
#### 1. `crates/erp-plugin/src/host.rs`
**HostState 新增字段:**
```rust
pub struct HostState {
// ... 现有字段保留 ...
// 新增:用于实时查询的数据服务引用和数据库连接
pub(crate) db: Option<DatabaseConnection>,
pub(crate) data_service_ready: bool,
}
```
**db_query 实现变更:**
```rust
fn db_query(&mut self, entity: String, filter: Vec<u8>, pagination: Vec<u8>)
-> Result<Vec<u8>, String>
{
// 如果没有数据库连接(向后兼容预填充模式),走旧路径
if self.db.is_none() {
return self.query_results
.get(&entity)
.cloned()
.ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity));
}
// 解析 filter 和 pagination 参数
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
None
} else {
serde_json::from_slice(&filter).ok()
};
let pagination_val: Option<serde_json::Value> = if pagination.is_empty() {
None
} else {
serde_json::from_slice(&pagination).ok()
};
// 先同步 flush pending writes确保读后写一致性
// 注意:在 WASM 的 spawn_blocking 上下文中,需要同步执行
// 方案:将 pending_ops 暂存到临时变量,由调用方在 execute_wasm 中处理
// 使用 pre_query_ops 标记,让 engine 在 execute_wasm 中间阶段 flush
self.pre_query_ops = std::mem::take(&mut self.pending_ops);
self.pending_query = Some(PendingQuery { entity, filter_val, pagination_val });
// 返回占位符 — 真正的查询在 execute_wasm 的两阶段执行中完成
Ok(serde_json::to_vec(&serde_json::json!({"status": "query_pending"})).unwrap_or_default())
}
```
**实际实现策略 — 采用回调模式:**
由于 `db_query``spawn_blocking` 内执行,不能直接 await 异步数据库操作。采用两阶段执行:
1. WASM 执行期间:`db_query` 收集查询参数,设置 `needs_flush_and_query = true`
2. `execute_wasm``spawn_blocking` 结束后:检查标志,如果需要查询则:
- flush pending_ops
- 执行查询
- 用查询结果重新调用 WASM继续执行后续逻辑
**更好的方案 — 分段执行:**
`execute_wasm` 改为分段执行模型:
```rust
async fn execute_wasm(&self, ...) -> PluginResult<R> {
// 阶段 1执行 WASM遇到 db_query 时暂停
let (result, pending_ops, pending_queries) = tokio::task::spawn_blocking(move || {
// WASM 执行中遇到 db_query 时,收集查询参数并设置标志
// 标志在 HostState 中self.needs_query = true, self.query_params = ...
// WASM 继续执行db_query 返回空结果集作为占位)
// ...
}).await?;
// 中间阶段flush writes + execute queries
Self::flush_ops(&self.db, plugin_id, pending_ops, ...).await?;
let query_results = Self::execute_queries(&self.db, plugin_id, pending_queries, ...).await?;
// 阶段 2如果有待处理的查询重新执行 WASM或继续后续逻辑
// ...
}
```
**最终推荐方案 — 简化版:**
实际上最简单的做法是:**让 db_query 同步执行真实查询**。在 `spawn_blocking` 中使用 `tokio::runtime::Handle` 来在阻塞线程中执行异步代码。
```rust
fn db_query(&mut self, entity: String, filter: Vec<u8>, pagination: Vec<u8>)
-> Result<Vec<u8>, String>
{
let db = self.db.as_ref().ok_or("数据库连接不可用")?;
// 先 flush pending writes通过 tokio handle 在阻塞上下文中执行异步)
let rt = tokio::runtime::Handle::current();
let ops = std::mem::take(&mut self.pending_ops);
if !ops.is_empty() {
rt.block_on(Self::flush_ops_static(db, &self.plugin_id, ops,
self.tenant_id, self.user_id, &self.event_bus))
.map_err(|e| format!("flush 失败: {}", e))?;
}
// 解析 filter
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
None
} else {
serde_json::from_slice(&filter).ok()
};
// 构建并执行查询
let table_name = DynamicTableManager::table_name(&self.plugin_id, &entity);
let (sql, values) = DynamicTableManager::build_query_sql(
&table_name, self.tenant_id, filter_val, pagination_val
).map_err(|e| e.to_string())?;
let rows: Vec<serde_json::Value> = rt.block_on(async {
// 执行查询
}).map_err(|e| e.to_string())?;
serde_json::to_vec(&rows).map_err(|e| e.to_string())
}
```
**改动影响:**
- `HostState` 增加 `db: Option<DatabaseConnection>``event_bus: Option<EventBus>` 字段
- `execute_wasm` 创建 HostState 时传入 db 和 event_bus
- `db_query` 从忽略参数改为实时查询
- `PluginEngine::new` 已持有 db 和 event_bus无需新增依赖
#### 2. `crates/erp-plugin/src/dynamic_table.rs`
新增 `build_query_sql` 方法,复用现有 `data_service.rs` 的查询构建逻辑:
```rust
pub fn build_query_sql(
table_name: &str,
tenant_id: Uuid,
filter: Option<serde_json::Value>,
pagination: Option<serde_json::Value>,
) -> Result<(String, Vec<sea_orm::Value>)>
```
### 向后兼容
- `HostState::new()` 不传 db → `db = None` → 走旧的预填充路径
- `execute_wasm()` 传 db → 走新的实时查询路径
- 现有 WASM 插件无需修改(旧路径仍然可用)
---
## 改动 2扩展聚合查询
### 问题
`data_service.rs:655``aggregate` 方法只支持 `GROUP BY + COUNT(*)`,返回 `Vec<(String, i64)>`
### 方案
扩展聚合函数支持 SUM/AVG/MAX/MIN。
#### 改动文件
**1. `crates/erp-plugin/src/data_service.rs`**
新增多聚合函数方法:
```rust
pub struct AggregateResult {
pub key: String,
pub metrics: HashMap<String, f64>, // "count" -> 10, "total_amount" -> 5000.0
}
pub async fn aggregate_multi(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &DatabaseConnection,
group_by_field: &str,
aggregations: &[AggregateDef], // [{field: "amount", func: "sum"}, ...]
filter: Option<serde_json::Value>,
scope: Option<DataScopeParams>,
) -> AppResult<Vec<AggregateResult>>
pub struct AggregateDef {
pub field: String,
pub func: AggregateFunc,
}
pub enum AggregateFunc {
Count,
Sum,
Avg,
Min,
Max,
}
```
SQL 构建示例:
```sql
SELECT _f_status as key,
COUNT(*) as count,
COALESCE(SUM(_f_amount), 0) as sum_amount,
COALESCE(AVG(_f_price), 0) as avg_price
FROM plugin_erp_crm__order
WHERE tenant_id = $1 AND deleted_at IS NULL
GROUP BY _f_status
```
**2. `crates/erp-plugin/src/dynamic_table.rs`**
新增 `build_aggregate_multi_sql` 方法,构建多聚合 SQL。
**3. `crates/erp-plugin/src/data_handler.rs`**
扩展聚合 API 端点,接受 `aggregations` 参数:
```json
POST /api/v1/plugins/{pluginId}/data/{entityName}/aggregate
{
"group_by": "status",
"aggregations": [
{"field": "amount", "func": "sum"},
{"field": "price", "func": "avg"}
],
"filter": {"status": "active"}
}
```
**4. 前端 Dashboard Widget 适配**
`PluginDashboardPage.tsx` 中的 `stat_card` 和图表 widget 需要适配新的多聚合返回格式。
---
## 改动 3热更新原子回滚
### 问题
`service.rs:578-585` — 升级时先 `unload(old)``load(new)`,如果 `load` 失败,旧版本已不在内存中。
### 方案:先加载新版本,成功后原子替换
#### 改动文件
**`crates/erp-plugin/src/service.rs`** — `upgrade` 方法:
```rust
// 当前(有风险):
engine.unload(plugin_manifest_id).await.ok(); // 旧版本已卸载
engine.load(plugin_manifest_id, &new_wasm, manifest) // 如果这里失败 → 无回滚
.await?;
// 改为(安全):
// 1. 先加载新版本(用临时 key
let temp_id = format!("{}__upgrade_{}", plugin_manifest_id, Uuid::now_v7());
engine.load(&temp_id, &new_wasm, new_manifest.clone()).await
.map_err(|e| {
tracing::error!(error = %e, "新版本 WASM 加载失败,旧版本仍在运行");
e
})?;
// 2. 卸载旧版本
engine.unload(plugin_manifest_id).await.ok();
// 3. 将新版本从临时 key 改为正式 key
engine.rename_plugin(&temp_id, plugin_manifest_id).await?;
// 4. 更新数据库记录
```
**`crates/erp-plugin/src/engine.rs`** — 新增 `rename_plugin` 方法:
```rust
pub async fn rename_plugin(&self, old_id: &str, new_id: &str) -> PluginResult<()> {
let loaded = self.plugins.remove(old_id)
.ok_or_else(|| PluginError::NotFound(old_id.to_string()))?;
let mut loaded = Arc::try_unwrap(loaded.1)
.map_err(|_| PluginError::ExecutionError("插件仍被引用".to_string()))?;
loaded.id = new_id.to_string();
self.plugins.insert(new_id.to_string(), Arc::new(loaded));
Ok(())
}
```
**改进后的安全保证:**
- 新版本加载失败 → 旧版本仍在运行,零停机
- 数据库记录只在 WASM 替换成功后才更新
- 事务性:要么完全切换到新版本,要么保持旧版本
---
## 改动 4Schema 演进ALTER TABLE 支持)
### 问题
`service.rs:562-575` — 升级时只处理新增实体CREATE TABLE不处理已有实体的字段变更。
### 方案:利用 JSONB 特性实现轻量级 Schema 演进
由于核心数据在 JSONB 的 `data` 列中,大部分字段变更不需要 DDL
- **新增字段**JSONB 天然支持,只需更新 manifest
- **新增 filterable/sortable 字段**:需要 ALTER TABLE ADD Generated Column + 索引
- **删除字段**JSONB 中多余字段不影响Generated Column 可保留(无害)
- **重命名字段**:添加新 Generated Column旧的保留
- **修改字段类型**Generated Column 需要 DROP + ADDJSONB 数据不需要改)
#### 改动文件
**`crates/erp-plugin/src/service.rs`** — `upgrade` 方法增加 schema diff 逻辑:
```rust
// 对比 schema 变更
if let Some(new_schema) = &new_manifest.schema {
let old_schema = old_manifest.schema.as_ref();
for new_entity in &new_schema.entities {
let old_entity = old_schema
.and_then(|s| s.entities.iter().find(|e| e.name == new_entity.name));
match old_entity {
None => {
// 全新实体 — CREATE TABLE
DynamicTableManager::create_table(db, plugin_manifest_id, new_entity).await?;
}
Some(old) => {
// 已有实体 — diff 字段
let diff = diff_entity_fields(old, new_entity);
if !diff.new_filterable.is_empty() || !diff.new_sortable.is_empty() {
DynamicTableManager::alter_add_generated_columns(
db, plugin_manifest_id, new_entity, &diff
).await?;
}
}
}
}
}
```
**`crates/erp-plugin/src/dynamic_table.rs`** — 新增:
```rust
pub struct FieldDiff {
pub new_filterable: Vec<PluginField>, // 新增的需要 Generated Column 的字段
pub new_sortable: Vec<PluginField>,
pub new_searchable: Vec<PluginField>, // 新增的需要 pg_trgm 索引的字段
}
pub fn diff_entity_fields(old: &PluginEntity, new: &PluginEntity) -> FieldDiff
pub async fn alter_add_generated_columns(
db: &DatabaseConnection,
plugin_id: &str,
entity: &PluginEntity,
diff: &FieldDiff,
) -> PluginResult<()>
```
ALTER TABLE 示例:
```sql
-- 新增 filterable 字段
ALTER TABLE plugin_erp_crm__customer
ADD COLUMN IF NOT EXISTS _f_source TEXT GENERATED ALWAYS AS (data->>'source') STORED;
-- 新增索引
CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source
ON plugin_erp_crm__customer (_f_source) WHERE deleted_at IS NULL;
-- 新增 searchable 字段的 pg_trgm 索引
CREATE INDEX IF NOT EXISTS idx_plugin_erp_crm__customer__f_source_trgm
ON plugin_erp_crm__customer USING gin (_f_source gin_trgm_ops)
WHERE deleted_at IS NULL;
```
---
## 实施顺序
| 阶段 | 改动 | 复杂度 | 影响范围 |
|------|------|--------|---------|
| 1 | 热更新原子回滚 | 低 | engine.rs + service.rs |
| 2 | Schema 演进ALTER TABLE | 中低 | service.rs + dynamic_table.rs |
| 3 | 扩展聚合查询 | 中 | data_service.rs + data_handler.rs + dynamic_table.rs |
| 4 | 混合执行模型(查询能力) | 高 | host.rs + engine.rs + dynamic_table.rs |
建议按复杂度从低到高实施,每个阶段独立可验证。
---
## 验证方案
### 阶段 1热更新回滚
1. 准备两个版本的 CRM 插件 WASMv1.0.0 和 v2.0.0
2. 上传 v2.0.0 但故意让 WASM 二进制损坏
3. 验证:旧版本 v1.0.0 仍在正常运行
4. 上传正确的 v2.0.0
5. 验证:成功切换到 v2.0.0
### 阶段 2Schema 演进
1. 创建 CRM 插件 v1.0.0(含 customer 实体3 个字段)
2. 升级到 v1.1.0customer 增加 2 个 filterable 字段 + 1 个新实体 contact
3. 验证:新字段可以过滤/排序,旧数据不受影响
4. 在已有数据上验证新 Generated Column 的值正确填充
### 阶段 3聚合查询
1. 创建测试数据(不同状态的订单,含 amount 字段)
2. 调用聚合 APIgroup_by=status, aggregations=[sum(amount), avg(amount)]
3. 验证返回结果正确
4. 前端 Dashboard stat_card 展示正确的聚合数据
### 阶段 4混合执行模型
1. 在插件 WASM 中调用 db_insert 后立即 db_query
2. 验证能读取到刚插入的数据(读后写一致性)
3. 验证带 filter 参数的 db_query 返回正确过滤结果
4. 验证旧插件(使用预填充模式)仍能正常工作
5. 压力测试:多次连续 db_query 不超过 Fuel 限制
---
## 关键文件清单
| 文件 | 改动类型 |
|------|---------|
| `crates/erp-plugin/src/host.rs` | 重构 db_query + 新增 db/事件总线字段 |
| `crates/erp-plugin/src/engine.rs` | 调整 execute_wasm + 新增 rename_plugin |
| `crates/erp-plugin/src/service.rs` | 升级流程回滚安全 + schema diff |
| `crates/erp-plugin/src/dynamic_table.rs` | 新增 build_query_sql + alter_add_generated_columns + diff_entity_fields |
| `crates/erp-plugin/src/data_service.rs` | 新增 aggregate_multi + AggregateDef |
| `crates/erp-plugin/src/data_handler.rs` | 扩展聚合 API |
| `apps/web/src/pages/PluginDashboardPage.tsx` | 适配多聚合返回格式 |
### 可复用的现有函数
- `DynamicTableManager::build_query_sql` — 可复用 `data_service.rs` 中的查询构建逻辑
- `DynamicTableManager::build_insert_sql` — flush 时已有,无需改动
- `sanitize_identifier` — 已有,用于新字段名的安全检查
- `flush_ops` — 已有事务性 flush 逻辑,混合模型中复用