Files
hms/crates/erp-plugin/src/service.rs
iven 3483395f5e fix(plugin): 修复插件 schema API、动态表 JSONB 和 SQL 注入防护
- get_schema 端点同时返回 entities 和 ui 页面配置,修复前端无法生成动态菜单的问题
- 动态表 INSERT/UPDATE 添加 ::jsonb 类型转换,修复 PostgreSQL 类型推断错误
- JSONB 索引创建改为非致命(warn 跳过),避免索引冲突阻断安装流程
- 权限注册/注销改用参数化查询,消除 SQL 注入风险
- DDL 语句改用 execute_unprepared,避免不必要的安全检查开销
- clear_plugin 支持已上传状态的清理
- 添加关键步骤 tracing 日志便于排查安装问题
2026-04-16 23:42:40 +08:00

740 lines
26 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, Set};
use uuid::Uuid;
use sha2::{Sha256, Digest};
use erp_core::error::AppResult;
use crate::dto::{
PluginEntityResp, PluginHealthResp, PluginPermissionResp, PluginResp,
};
use crate::dynamic_table::DynamicTableManager;
use crate::engine::PluginEngine;
use crate::entity::{plugin, plugin_entity, plugin_event_subscription};
use crate::error::PluginError;
use crate::manifest::{parse_manifest, PluginManifest};
pub struct PluginService;
impl PluginService {
/// 上传插件: 解析 manifest + 存储 wasm_binary + status=uploaded
pub async fn upload(
tenant_id: Uuid,
operator_id: Uuid,
wasm_binary: Vec<u8>,
manifest_toml: &str,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginResp> {
// 解析 manifest
let manifest = parse_manifest(manifest_toml)?;
// 计算 WASM hash
let mut hasher = Sha256::new();
hasher.update(&wasm_binary);
let wasm_hash = format!("{:x}", hasher.finalize());
let now = Utc::now();
let plugin_id = Uuid::now_v7();
// 序列化 manifest 为 JSON
let manifest_json =
serde_json::to_value(&manifest).map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let model = plugin::ActiveModel {
id: Set(plugin_id),
tenant_id: Set(tenant_id),
name: Set(manifest.metadata.name.clone()),
plugin_version: Set(manifest.metadata.version.clone()),
description: Set(if manifest.metadata.description.is_empty() {
None
} else {
Some(manifest.metadata.description.clone())
}),
author: Set(if manifest.metadata.author.is_empty() {
None
} else {
Some(manifest.metadata.author.clone())
}),
status: Set("uploaded".to_string()),
manifest_json: Set(manifest_json),
wasm_binary: Set(wasm_binary),
wasm_hash: Set(wasm_hash),
config_json: Set(serde_json::json!({})),
error_message: Set(None),
installed_at: Set(None),
enabled_at: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(Some(operator_id)),
updated_by: Set(Some(operator_id)),
deleted_at: Set(None),
version: Set(1),
};
let model = model.insert(db).await?;
Ok(plugin_model_to_resp(&model, &manifest, vec![]))
}
/// 安装插件: 创建动态表 + 注册 entity 记录 + 注册事件订阅 + 注册权限 + status=installed
pub async fn install(
plugin_id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
engine: &PluginEngine,
) -> AppResult<PluginResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status(&model.status, "uploaded")?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let now = Utc::now();
// 创建动态表 + 注册 entity 记录
let mut entity_resps = Vec::new();
if let Some(schema) = &manifest.schema {
for (i, entity_def) in schema.entities.iter().enumerate() {
let table_name = DynamicTableManager::table_name(&manifest.metadata.id, &entity_def.name);
tracing::info!(step = i, entity = %entity_def.name, table = %table_name, "Creating dynamic table");
// 创建动态表
DynamicTableManager::create_table(db, &manifest.metadata.id, entity_def).await
.map_err(|e| {
tracing::error!(entity = %entity_def.name, table = %table_name, error = %e, "Failed to create dynamic table");
e
})?;
// 注册 entity 记录
let entity_id = Uuid::now_v7();
let entity_model = plugin_entity::ActiveModel {
id: Set(entity_id),
tenant_id: Set(tenant_id),
plugin_id: Set(plugin_id),
entity_name: Set(entity_def.name.clone()),
table_name: Set(table_name.clone()),
schema_json: Set(serde_json::to_value(entity_def).unwrap_or_default()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(Some(operator_id)),
updated_by: Set(Some(operator_id)),
deleted_at: Set(None),
version: Set(1),
};
entity_model.insert(db).await?;
entity_resps.push(PluginEntityResp {
name: entity_def.name.clone(),
display_name: entity_def.display_name.clone(),
table_name,
});
}
}
// 注册事件订阅
if let Some(events) = &manifest.events {
for pattern in &events.subscribe {
let sub_id = Uuid::now_v7();
let sub_model = plugin_event_subscription::ActiveModel {
id: Set(sub_id),
plugin_id: Set(plugin_id),
event_pattern: Set(pattern.clone()),
created_at: Set(now),
};
sub_model.insert(db).await?;
}
}
// 注册插件声明的权限到 permissions 表
tracing::info!("Registering plugin permissions");
if let Some(perms) = &manifest.permissions {
register_plugin_permissions(
db,
tenant_id,
operator_id,
&manifest.metadata.id,
perms,
&now,
)
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to register permissions");
e
})?;
}
// 加载到内存
tracing::info!(manifest_id = %manifest.metadata.id, "Loading plugin into engine");
engine
.load(
&manifest.metadata.id,
&model.wasm_binary,
manifest.clone(),
)
.await?;
// 更新状态
let mut active: plugin::ActiveModel = model.into();
active.status = Set("installed".to_string());
active.installed_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
let model = active.update(db).await?;
Ok(plugin_model_to_resp(&model, &manifest, entity_resps))
}
/// 启用插件: engine.initialize + start_event_listener + status=running
pub async fn enable(
plugin_id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
engine: &PluginEngine,
) -> AppResult<PluginResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status_any(&model.status, &["installed", "disabled"])?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let plugin_manifest_id = &manifest.metadata.id;
// 如果之前是 disabled 状态,需要先卸载再重新加载到内存
// disable 只改内存状态但不从 DashMap 移除)
if model.status == "disabled" {
engine.unload(plugin_manifest_id).await.ok();
engine
.load(plugin_manifest_id, &model.wasm_binary, manifest.clone())
.await?;
}
// 初始化
engine.initialize(plugin_manifest_id).await?;
// 启动事件监听
engine.start_event_listener(plugin_manifest_id).await?;
let now = Utc::now();
let mut active: plugin::ActiveModel = model.into();
active.status = Set("running".to_string());
active.enabled_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
active.error_message = Set(None);
let model = active.update(db).await?;
let entity_resps = find_plugin_entities(plugin_id, tenant_id, db).await?;
Ok(plugin_model_to_resp(&model, &manifest, entity_resps))
}
/// 禁用插件: engine.disable + cancel 事件订阅 + status=disabled
pub async fn disable(
plugin_id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
engine: &PluginEngine,
) -> AppResult<PluginResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status_any(&model.status, &["running", "enabled"])?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
// 禁用引擎
engine.disable(&manifest.metadata.id).await?;
let now = Utc::now();
let mut active: plugin::ActiveModel = model.into();
active.status = Set("disabled".to_string());
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
let model = active.update(db).await?;
let entity_resps = find_plugin_entities(plugin_id, tenant_id, db).await?;
Ok(plugin_model_to_resp(&model, &manifest, entity_resps))
}
/// 卸载插件: unload + 有条件地 drop 动态表 + 清理权限 + status=uninstalled
pub async fn uninstall(
plugin_id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
engine: &PluginEngine,
) -> AppResult<PluginResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status_any(&model.status, &["installed", "disabled"])?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
// 卸载(如果 disabled 状态engine 可能仍在内存中)
engine.unload(&manifest.metadata.id).await.ok();
// 软删除当前租户的 entity 记录
let now = Utc::now();
let tenant_entities = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::DeletedAt.is_null())
.all(db)
.await?;
for entity in &tenant_entities {
let mut active: plugin_entity::ActiveModel = entity.clone().into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
active.update(db).await?;
}
// 仅当没有其他租户的活跃 entity 记录引用相同的 table_name 时才 drop 表
if let Some(schema) = &manifest.schema {
for entity_def in &schema.entities {
let table_name =
DynamicTableManager::table_name(&manifest.metadata.id, &entity_def.name);
// 检查是否还有其他租户的活跃 entity 记录引用此表
let other_tenants_count = plugin_entity::Entity::find()
.filter(plugin_entity::Column::TableName.eq(&table_name))
.filter(plugin_entity::Column::TenantId.ne(tenant_id))
.filter(plugin_entity::Column::DeletedAt.is_null())
.count(db)
.await?;
if other_tenants_count == 0 {
// 没有其他租户使用,安全删除
DynamicTableManager::drop_table(db, &manifest.metadata.id, &entity_def.name)
.await
.ok();
}
}
}
// 清理此插件注册的权限
unregister_plugin_permissions(db, tenant_id, &manifest.metadata.id).await?;
let mut active: plugin::ActiveModel = model.into();
active.status = Set("uninstalled".to_string());
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
let model = active.update(db).await?;
Ok(plugin_model_to_resp(&model, &manifest, vec![]))
}
/// 列表查询
pub async fn list(
tenant_id: Uuid,
page: u64,
page_size: u64,
status: Option<&str>,
search: Option<&str>,
db: &sea_orm::DatabaseConnection,
) -> AppResult<(Vec<PluginResp>, u64)> {
let mut query = plugin::Entity::find()
.filter(plugin::Column::TenantId.eq(tenant_id))
.filter(plugin::Column::DeletedAt.is_null());
if let Some(s) = status {
query = query.filter(plugin::Column::Status.eq(s));
}
if let Some(q) = search {
query = query.filter(
plugin::Column::Name.contains(q)
.or(plugin::Column::Description.contains(q)),
);
}
let paginator = query
.clone()
.paginate(db, page_size);
let total = paginator.num_items().await?;
let models = paginator
.fetch_page(page.saturating_sub(1))
.await?;
let mut resps = Vec::with_capacity(models.len());
for model in models {
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone()).unwrap_or_else(|_| {
PluginManifest {
metadata: crate::manifest::PluginMetadata {
id: String::new(),
name: String::new(),
version: String::new(),
description: String::new(),
author: String::new(),
min_platform_version: None,
dependencies: vec![],
},
schema: None,
events: None,
ui: None,
permissions: None,
}
});
let entities = find_plugin_entities(model.id, tenant_id, db).await.unwrap_or_default();
resps.push(plugin_model_to_resp(&model, &manifest, entities));
}
Ok((resps, total))
}
/// 按 ID 获取详情
pub async fn get_by_id(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let entities = find_plugin_entities(plugin_id, tenant_id, db).await?;
Ok(plugin_model_to_resp(&model, &manifest, entities))
}
/// 更新配置
pub async fn update_config(
plugin_id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
config: serde_json::Value,
expected_version: i32,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
erp_core::error::check_version(expected_version, model.version)?;
let now = Utc::now();
let mut active: plugin::ActiveModel = model.into();
active.config_json = Set(config);
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
let model = active.update(db).await?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let entities = find_plugin_entities(plugin_id, tenant_id, db).await.unwrap_or_default();
Ok(plugin_model_to_resp(&model, &manifest, entities))
}
/// 健康检查
pub async fn health_check(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
engine: &PluginEngine,
) -> AppResult<PluginHealthResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let details = engine.health_check(&manifest.metadata.id).await?;
Ok(PluginHealthResp {
plugin_id,
status: details
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
details,
})
}
/// 获取插件 Schema
pub async fn get_schema(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<serde_json::Value> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
// 构建 schema 响应entities + ui 页面配置
let mut result = serde_json::Map::new();
if let Some(schema) = &manifest.schema {
result.insert(
"entities".to_string(),
serde_json::to_value(&schema.entities).unwrap_or_default(),
);
}
if let Some(ui) = &manifest.ui {
result.insert(
"ui".to_string(),
serde_json::to_value(ui).unwrap_or_default(),
);
}
Ok(serde_json::Value::Object(result))
}
/// 清除插件记录(软删除,仅限已卸载状态)
pub async fn purge(
plugin_id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status_any(&model.status, &["uninstalled", "uploaded"])?;
let now = Utc::now();
let mut active: plugin::ActiveModel = model.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
active.update(db).await?;
Ok(())
}
}
// ---- 内部辅助 ----
fn find_plugin(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> impl std::future::Future<Output = AppResult<plugin::Model>> + Send {
async move {
plugin::Entity::find_by_id(plugin_id)
.one(db)
.await?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| {
erp_core::error::AppError::NotFound(format!("插件 {} 不存在", plugin_id))
})
}
}
async fn find_plugin_entities(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<Vec<PluginEntityResp>> {
let entities = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::DeletedAt.is_null())
.all(db)
.await?;
Ok(entities
.into_iter()
.map(|e| PluginEntityResp {
name: e.entity_name.clone(),
display_name: e.entity_name,
table_name: e.table_name,
})
.collect())
}
fn validate_status(actual: &str, expected: &str) -> AppResult<()> {
if actual != expected {
return Err(PluginError::InvalidState {
expected: expected.to_string(),
actual: actual.to_string(),
}
.into());
}
Ok(())
}
fn validate_status_any(actual: &str, expected: &[&str]) -> AppResult<()> {
if !expected.contains(&actual) {
return Err(PluginError::InvalidState {
expected: expected.join(""),
actual: actual.to_string(),
}
.into());
}
Ok(())
}
fn plugin_model_to_resp(
model: &plugin::Model,
manifest: &PluginManifest,
entities: Vec<PluginEntityResp>,
) -> PluginResp {
let permissions = manifest.permissions.as_ref().map(|perms| {
perms
.iter()
.map(|p| PluginPermissionResp {
code: p.code.clone(),
name: p.name.clone(),
description: p.description.clone(),
})
.collect()
});
PluginResp {
id: model.id,
name: model.name.clone(),
version: model.plugin_version.clone(),
description: model.description.clone(),
author: model.author.clone(),
status: model.status.clone(),
config: model.config_json.clone(),
installed_at: model.installed_at,
enabled_at: model.enabled_at,
entities,
permissions,
record_version: model.version,
}
}
/// 将插件声明的权限注册到 permissions 表。
///
/// 使用 raw SQL 避免依赖 erp-auth 的 entity 类型。
/// 权限码格式:`{plugin_manifest_id}.{code}`(如 `erp-crm.customer.list`)。
/// 使用 `ON CONFLICT DO NOTHING` 保证幂等。
async fn register_plugin_permissions(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
operator_id: Uuid,
plugin_manifest_id: &str,
perms: &[crate::manifest::PluginPermission],
now: &chrono::DateTime<chrono::Utc>,
) -> AppResult<()> {
for perm in perms {
let full_code = format!("{}.{}", plugin_manifest_id, perm.code);
let resource = plugin_manifest_id.to_string();
let action = perm.code.clone();
let description: Option<String> = if perm.description.is_empty() {
None
} else {
Some(perm.description.clone())
};
let sql = r#"
INSERT INTO permissions (id, tenant_id, code, name, resource, action, description, created_at, updated_at, created_by, updated_by, deleted_at, version)
VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $7, $8, $8, NULL, 1)
ON CONFLICT (tenant_id, code) WHERE deleted_at IS NULL DO NOTHING
"#;
db.execute(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
vec![
sea_orm::Value::from(tenant_id),
sea_orm::Value::from(full_code.clone()),
sea_orm::Value::from(perm.name.clone()),
sea_orm::Value::from(resource),
sea_orm::Value::from(action),
sea_orm::Value::from(description),
sea_orm::Value::from(now.clone()),
sea_orm::Value::from(operator_id),
],
))
.await
.map_err(|e| {
tracing::error!(
plugin = plugin_manifest_id,
permission = %full_code,
error = %e,
"注册插件权限失败"
);
PluginError::DatabaseError(format!("注册插件权限 {} 失败: {}", full_code, e))
})?;
}
tracing::info!(
plugin = plugin_manifest_id,
count = perms.len(),
tenant_id = %tenant_id,
"插件权限注册完成"
);
Ok(())
}
/// 清理插件注册的权限(软删除)。
///
/// 使用 raw SQL 按前缀匹配清理:`{plugin_manifest_id}.%`。
/// 同时清理 role_permissions 中对这些权限的关联。
async fn unregister_plugin_permissions(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
plugin_manifest_id: &str,
) -> AppResult<()> {
let prefix = format!("{}.%", plugin_manifest_id);
let now = chrono::Utc::now();
// 先软删除 role_permissions 中的关联
let rp_sql = r#"
UPDATE role_permissions
SET deleted_at = $1, updated_at = $1
WHERE permission_id IN (
SELECT id FROM permissions
WHERE tenant_id = $2
AND code LIKE $3
AND deleted_at IS NULL
)
AND deleted_at IS NULL
"#;
db.execute(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
rp_sql,
vec![
sea_orm::Value::from(now.clone()),
sea_orm::Value::from(tenant_id),
sea_orm::Value::from(prefix.clone()),
],
))
.await
.map_err(|e| {
tracing::error!(
plugin = plugin_manifest_id,
error = %e,
"清理插件权限角色关联失败"
);
PluginError::DatabaseError(format!("清理插件权限角色关联失败: {}", e))
})?;
// 再软删除 permissions
let perm_sql = r#"
UPDATE permissions
SET deleted_at = $1, updated_at = $1
WHERE tenant_id = $2
AND code LIKE $3
AND deleted_at IS NULL
"#;
db.execute(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
perm_sql,
vec![
sea_orm::Value::from(now),
sea_orm::Value::from(tenant_id),
sea_orm::Value::from(prefix),
],
))
.await
.map_err(|e| {
tracing::error!(
plugin = plugin_manifest_id,
error = %e,
"清理插件权限失败"
);
PluginError::DatabaseError(format!("清理插件权限失败: {}", e))
})?;
tracing::info!(
plugin = plugin_manifest_id,
tenant_id = %tenant_id,
"插件权限清理完成"
);
Ok(())
}