feat(plugin): P1 跨插件数据引用系统 — 后端 Phase 1-3

实现跨插件实体引用的基础后端能力:

Phase 1 — Manifest 扩展 + Entity Registry 数据层:
- PluginField 新增 ref_plugin/ref_fallback_label 支持跨插件引用声明
- PluginRelation 新增 name/relation_type/display_field(CRM 已在用的字段)
- PluginEntity 新增 is_public 标记可被其他插件引用的实体
- 数据库迁移:plugin_entities 新增 manifest_id + is_public 列 + 索引
- SeaORM Entity 和 install 流程同步更新

Phase 2 — 后端跨插件引用解析 + 校验:
- data_service: 新增 resolve_cross_plugin_entity/is_plugin_active 函数
- validate_ref_entities: 支持 ref_plugin 字段,目标插件未安装时跳过校验(软警告)
- host.rs: HostState 新增 cross_plugin_entities 映射,db_query 支持点分记号
- engine.rs: execute_wasm 自动构建跨插件实体映射

Phase 3 — API 端点:
- POST /plugins/{id}/{entity}/resolve-labels 批量标签解析
- GET /plugin-registry/entities 公开实体注册表查询
This commit is contained in:
iven
2026-04-19 00:49:00 +08:00
parent 1dbda4c1e8
commit ef89ed38a1
12 changed files with 1425 additions and 24 deletions

View File

@@ -1,9 +1,12 @@
use std::collections::HashMap;
use sea_orm::DatabaseConnection;
use uuid::Uuid;
use wasmtime::StoreLimits;
use crate::erp::plugin::host_api;
use crate::dynamic_table::DynamicTableManager;
use crate::engine::PluginEngine;
/// 待刷新的写操作
#[derive(Debug)]
@@ -31,10 +34,9 @@ pub enum PendingOp {
/// Host 端状态 — 绑定到每个 WASM Store 实例
///
/// 采用延迟执行模式:
/// - 读操作 (db_query, config_get, current_user) → 调用前预填充
/// - 写操作 (db_insert, db_update, db_delete, event_publish) → 入队 pending_ops
/// - WASM 调用结束后由 engine 刷新 pending_ops 执行真实 DB 操作
/// 支持两种执行模式
/// - **预填充模式**db = None读操作从预填充缓存取向后兼容
/// - **混合执行模式**db = Some读操作走实时 SQL + 写操作保持延迟批量
pub struct HostState {
pub(crate) limits: StoreLimits,
#[allow(dead_code)]
@@ -43,7 +45,7 @@ pub struct HostState {
pub(crate) user_id: Uuid,
pub(crate) permissions: Vec<String>,
pub(crate) plugin_id: String,
// 预填充的读取缓存
// 预填充的读取缓存(向后兼容)
pub(crate) query_results: HashMap<String, Vec<u8>>,
pub(crate) config_cache: HashMap<String, Vec<u8>>,
pub(crate) current_user_json: Vec<u8>,
@@ -51,6 +53,11 @@ pub struct HostState {
pub(crate) pending_ops: Vec<PendingOp>,
// 日志
pub(crate) logs: Vec<(String, String)>,
// 混合执行模式:数据库连接和事件总线
pub(crate) db: Option<DatabaseConnection>,
pub(crate) event_bus: Option<erp_core::events::EventBus>,
// 跨插件实体映射:"erp-crm.customer" → "plugin_erp_crm__customer"
pub(crate) cross_plugin_entities: HashMap<String, String>,
}
impl HostState {
@@ -75,8 +82,26 @@ impl HostState {
current_user_json: serde_json::to_vec(&current_user).unwrap_or_default(),
pending_ops: Vec::new(),
logs: Vec::new(),
db: None,
event_bus: None,
cross_plugin_entities: HashMap::new(),
}
}
/// 创建带数据库连接的 HostState混合执行模式
pub fn new_with_db(
plugin_id: String,
tenant_id: Uuid,
user_id: Uuid,
permissions: Vec<String>,
db: DatabaseConnection,
event_bus: erp_core::events::EventBus,
) -> Self {
let mut state = Self::new(plugin_id, tenant_id, user_id, permissions);
state.db = Some(db);
state.event_bus = Some(event_bus);
state
}
}
// 实现 bindgen 生成的 Host trait — 插件调用 Host API 的入口
@@ -99,13 +124,110 @@ impl host_api::Host for HostState {
fn db_query(
&mut self,
entity: String,
_filter: Vec<u8>,
_pagination: Vec<u8>,
filter: Vec<u8>,
pagination: Vec<u8>,
) -> Result<Vec<u8>, String> {
self.query_results
.get(&entity)
.cloned()
.ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity))
// 预填充模式(向后兼容)
if self.db.is_none() {
return self.query_results
.get(&entity)
.cloned()
.ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity));
}
let db = self.db.clone().unwrap();
let event_bus = self.event_bus.clone()
.ok_or("事件总线不可用")?;
// 先 flush pending writes确保读后写一致性
let ops = std::mem::take(&mut self.pending_ops);
if !ops.is_empty() {
let rt = tokio::runtime::Handle::current();
rt.block_on(PluginEngine::flush_ops(
&db,
&self.plugin_id,
ops,
self.tenant_id,
self.user_id,
&event_bus,
))
.map_err(|e| format!("flush pending ops 失败: {}", e))?;
}
// 解析 filter 和 pagination
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
None
} else {
serde_json::from_slice(&filter).ok()
};
let pagination_val: Option<serde_json::Value> = if pagination.is_empty() {
None
} else {
serde_json::from_slice(&pagination).ok()
};
// 构建查询 — 支持点分记号跨插件查询(如 "erp-crm.customer"
let table_name = if entity.contains('.') {
self.cross_plugin_entities
.get(&entity)
.cloned()
.ok_or_else(|| format!("跨插件实体 '{}' 未注册", entity))?
} else {
DynamicTableManager::table_name(&self.plugin_id, &entity)
};
let limit = pagination_val
.as_ref()
.and_then(|p| p.get("limit"))
.and_then(|v| v.as_u64())
.unwrap_or(50);
let offset = pagination_val
.as_ref()
.and_then(|p| p.get("offset"))
.and_then(|v| v.as_u64())
.unwrap_or(0);
let (sql, values) = DynamicTableManager::build_filtered_query_sql(
&table_name,
self.tenant_id,
limit,
offset,
filter_val,
None,
None,
None,
)
.map_err(|e| format!("查询构建失败: {}", e))?;
// 执行查询
let rt = tokio::runtime::Handle::current();
let rows = rt.block_on(async {
use sea_orm::{FromQueryResult, Statement};
#[derive(Debug, FromQueryResult)]
struct QueryRow {
data: serde_json::Value,
}
let results = QueryRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(&db)
.await
.map_err(|e| format!("查询执行失败: {}", e))?;
let items: Vec<serde_json::Value> = results
.into_iter()
.map(|r| r.data)
.collect();
Ok::<Vec<serde_json::Value>, String>(items)
})
.map_err(|e: String| e)?;
serde_json::to_vec(&rows).map_err(|e| e.to_string())
}
fn db_update(