fix: 修复测试发现的 7 个问题 + 全 workspace clippy 清零
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

功能修复:
1. 患者创建空名称验证:后端添加 name.trim().is_empty() 检查
2. 仪表盘统计容错:单个查询失败返回零值而非 500
3. FHIR 路由修复:从 /fhir 移到 /api/v1/fhir 保持一致
4. 冻结模块后端中间件:新增 frozen_module_middleware 拦截冻结路径
5. 积分端点权限码:health.health-data.list → health.points.list
6. 角色权限迁移:护士补充 devices.list,运营补充 points.list/manage
7. 测试结果文档:R01-R05 角色测试 + T00/T10 结果归档

Clippy 全 workspace 清零(14→0 errors):
- erp-core: 修复 empty doc line、collapsible if、redundant closure 等 9 处
- erp-health: 修复 too_many_arguments、unused var、unnecessary parens 等 58 处
- erp-ai: 修复 dead_code、unused import 等 11 处
- erp-plugin: 修复 too_many_arguments、wildcard pattern 等 11 处
- erp-server-migration: 修复 enum_variant_names 5 处
- erp-auth/config/workflow/message: 各 1-3 处

工程改进:
- lint-staged 配置迁移到 .lintstagedrc.js(函数式避免文件列表传给 clippy)
- cargo fmt 统一格式化
This commit is contained in:
iven
2026-05-07 23:43:14 +08:00
parent 786f57c151
commit 6d5a711d2c
323 changed files with 15662 additions and 6603 deletions

View File

@@ -1,13 +1,13 @@
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement};
use uuid::Uuid;
use erp_core::audit::{AuditLog};
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::error::{AppError, AppResult};
use erp_core::events::EventBus;
use crate::data_dto::{AggregateMultiRow, BatchActionReq, PluginDataResp};
use crate::dynamic_table::{sanitize_identifier, DynamicTableManager};
use crate::dynamic_table::{DynamicTableManager, sanitize_identifier};
use crate::entity::plugin;
use crate::entity::plugin_entity;
use crate::error::PluginError;
@@ -25,11 +25,11 @@ async fn find_trigger_events(
.await?
.ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_db_id)))?;
let manifest: crate::manifest::PluginManifest =
serde_json::from_value(model.manifest_json)
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: crate::manifest::PluginManifest = serde_json::from_value(model.manifest_json)
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let triggers = manifest.trigger_events
let triggers = manifest
.trigger_events
.unwrap_or_default()
.into_iter()
.filter(|t| t.entity == entity_name)
@@ -38,6 +38,7 @@ async fn find_trigger_events(
}
/// 发布触发事件
#[allow(clippy::too_many_arguments)]
async fn emit_trigger_events(
triggers: &[crate::manifest::PluginTriggerEvent],
action: &str,
@@ -68,11 +69,8 @@ async fn emit_trigger_events(
"action": action,
});
// 发布原始触发事件
let event = erp_core::events::DomainEvent::new(
&trigger.name,
tenant_id,
payload.clone(),
);
let event =
erp_core::events::DomainEvent::new(&trigger.name, tenant_id, payload.clone());
event_bus.publish(event, db).await;
// 同时发布 plugin.trigger.{manifest_id} 事件用于通知引擎
@@ -110,7 +108,17 @@ impl PluginDataService {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, true, None).await?;
validate_ref_entities(
&data,
&fields,
entity_name,
plugin_id,
tenant_id,
db,
true,
None,
)
.await?;
let (sql, values) =
DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, &data);
@@ -134,17 +142,33 @@ impl PluginDataService {
.ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.create", entity_name)
.with_resource_id(result.id),
AuditLog::new(
tenant_id,
Some(operator_id),
"plugin.data.create",
entity_name,
)
.with_resource_id(result.id),
db,
)
.await;
// 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "create", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await;
}
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await
&& let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await
{
emit_trigger_events(
&triggers,
"create",
entity_name,
&result.id.to_string(),
tenant_id,
Some(&result.data),
_event_bus,
db,
&mid,
)
.await;
}
Ok(PluginDataResp {
@@ -157,6 +181,7 @@ impl PluginDataService {
}
/// 列表查询(支持过滤/搜索/排序/Generated Column 路由/数据权限)
#[allow(clippy::too_many_arguments)]
pub async fn list(
plugin_id: Uuid,
entity_name: &str,
@@ -171,8 +196,7 @@ impl PluginDataService {
cache: &moka::sync::Cache<String, EntityInfo>,
scope: Option<DataScopeParams>,
) -> AppResult<(Vec<PluginDataResp>, u64)> {
let info =
resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
let info = resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
// 获取 searchable 字段列表
let entity_fields = info.fields()?;
@@ -224,7 +248,7 @@ impl PluginDataService {
sort_order,
&info.generated_fields,
)
.map_err(|e| AppError::Validation(e))?;
.map_err(AppError::Validation)?;
// 注入数据权限条件scope 参数索引接在 values 之后)
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
@@ -271,7 +295,8 @@ impl PluginDataService {
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_get_by_id_sql(&info.table_name, id, tenant_id);
let (sql, values) =
DynamicTableManager::build_get_by_id_sql(&info.table_name, id, tenant_id);
#[derive(FromQueryResult)]
struct DataRow {
@@ -301,6 +326,7 @@ impl PluginDataService {
}
/// 更新
#[allow(clippy::too_many_arguments)]
pub async fn update(
plugin_id: Uuid,
entity_name: &str,
@@ -315,7 +341,17 @@ impl PluginDataService {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id)).await?;
validate_ref_entities(
&data,
&fields,
entity_name,
plugin_id,
tenant_id,
db,
false,
Some(id),
)
.await?;
// 循环引用检测
for field in &fields {
@@ -349,20 +385,36 @@ impl PluginDataService {
))
.one(db)
.await?
.ok_or_else(|| AppError::VersionMismatch)?;
.ok_or(AppError::VersionMismatch)?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.update", entity_name)
.with_resource_id(id),
AuditLog::new(
tenant_id,
Some(operator_id),
"plugin.data.update",
entity_name,
)
.with_resource_id(id),
db,
)
.await;
// 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "update", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await;
}
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await
&& let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await
{
emit_trigger_events(
&triggers,
"update",
entity_name,
&result.id.to_string(),
tenant_id,
Some(&result.data),
_event_bus,
db,
&mid,
)
.await;
}
Ok(PluginDataResp {
@@ -375,6 +427,7 @@ impl PluginDataService {
}
/// 部分更新PATCH— 只合并提供的字段
#[allow(clippy::too_many_arguments)]
pub async fn partial_update(
plugin_id: Uuid,
entity_name: &str,
@@ -401,10 +454,25 @@ impl PluginDataService {
};
validate_data(&merged, &fields)?;
validate_ref_entities(&merged, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id)).await?;
validate_ref_entities(
&merged,
&fields,
entity_name,
plugin_id,
tenant_id,
db,
false,
Some(id),
)
.await?;
let (sql, values) = DynamicTableManager::build_patch_sql(
&info.table_name, id, tenant_id, operator_id, partial_data, expected_version,
&info.table_name,
id,
tenant_id,
operator_id,
partial_data,
expected_version,
);
#[derive(FromQueryResult)]
@@ -417,8 +485,13 @@ impl PluginDataService {
}
let result = PatchResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres, sql, values,
)).one(db).await?.ok_or_else(|| AppError::VersionMismatch)?;
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or(AppError::VersionMismatch)?;
Ok(PluginDataResp {
id: result.id.to_string(),
@@ -460,12 +533,16 @@ impl PluginDataService {
);
#[derive(FromQueryResult)]
#[allow(dead_code)] // FromQueryResult 映射需要 chk 字段,仅检查是否存在
struct RefCheck { chk: Option<i32> }
struct RefCheck {
chk: Option<i32>,
}
let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[id.to_string().into(), tenant_id.into()],
)).one(db).await?;
))
.one(db)
.await?;
if has_ref.is_some() {
return Err(AppError::Validation(format!(
"存在关联的 {} 记录,无法删除",
@@ -482,7 +559,8 @@ impl PluginDataService {
sea_orm::DatabaseBackend::Postgres,
nullify_sql,
[id.to_string().into(), tenant_id.into()],
)).await?;
))
.await?;
}
crate::manifest::OnDeleteStrategy::Cascade => {
let cascade_sql = format!(
@@ -493,7 +571,8 @@ impl PluginDataService {
sea_orm::DatabaseBackend::Postgres,
cascade_sql,
[id.to_string().into(), tenant_id.into()],
)).await?;
))
.await?;
}
}
}
@@ -509,23 +588,34 @@ impl PluginDataService {
.await?;
audit_service::record(
AuditLog::new(tenant_id, None, "plugin.data.delete", entity_name)
.with_resource_id(id),
AuditLog::new(tenant_id, None, "plugin.data.delete", entity_name).with_resource_id(id),
db,
)
.await;
// 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "delete", entity_name, &id.to_string(), tenant_id, None, _event_bus, db, &mid).await;
}
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await
&& let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await
{
emit_trigger_events(
&triggers,
"delete",
entity_name,
&id.to_string(),
tenant_id,
None,
_event_bus,
db,
&mid,
)
.await;
}
Ok(())
}
/// 导出数据(支持 JSON/CSV/XLSX 格式)
#[allow(clippy::too_many_arguments)]
pub async fn export(
plugin_id: Uuid,
entity_name: &str,
@@ -541,8 +631,7 @@ impl PluginDataService {
) -> AppResult<crate::data_dto::ExportPayload> {
use crate::data_dto::ExportPayload;
let info =
resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
let info = resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
let entity_fields = info.fields()?;
let search_tuple = {
@@ -568,14 +657,16 @@ impl PluginDataService {
sort_order,
&info.generated_fields,
)
.map_err(|e| AppError::Validation(e))?;
.map_err(AppError::Validation)?;
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
let sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
#[derive(FromQueryResult)]
struct DataRow { data: serde_json::Value }
struct DataRow {
data: serde_json::Value,
}
let rows = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
@@ -601,19 +692,26 @@ impl PluginDataService {
) -> AppResult<Vec<u8>> {
let mut wtr = csv::Writer::from_writer(Vec::new());
let headers: Vec<&str> = fields.iter().map(|f| f.name.as_str()).collect();
wtr.write_record(&headers).map_err(|e| AppError::Internal(format!("CSV 写头失败: {}", e)))?;
wtr.write_record(&headers)
.map_err(|e| AppError::Internal(format!("CSV 写头失败: {}", e)))?;
for row in rows {
let record: Vec<String> = headers.iter().map(|h| {
row.get(*h).and_then(|v| match v {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
serde_json::Value::Null => Some(String::new()),
other => Some(other.to_string()),
}).unwrap_or_default()
}).collect();
wtr.write_record(&record).map_err(|e| AppError::Internal(format!("CSV 写行失败: {}", e)))?;
let record: Vec<String> = headers
.iter()
.map(|h| {
row.get(*h)
.map(|v| match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Null => String::new(),
other => other.to_string(),
})
.unwrap_or_default()
})
.collect();
wtr.write_record(&record)
.map_err(|e| AppError::Internal(format!("CSV 写行失败: {}", e)))?;
}
wtr.into_inner()
@@ -628,7 +726,10 @@ impl PluginDataService {
let mut wb = Workbook::new();
let ws = wb.add_worksheet();
let header_fmt = Format::new().set_bold().set_background_color(Color::RGB(0x4F46E5)).set_font_color(Color::White);
let header_fmt = Format::new()
.set_bold()
.set_background_color(Color::RGB(0x4F46E5))
.set_font_color(Color::White);
for (col, field) in fields.iter().enumerate() {
let label = field.display_name.as_deref().unwrap_or(&field.name);
@@ -641,18 +742,26 @@ impl PluginDataService {
let val = row.get(&field.name);
let row_num = (row_idx + 1) as u32;
match val {
Some(serde_json::Value::String(s)) => { ws.write_string(row_num, col as u16, s).ok(); }
Some(serde_json::Value::Number(n)) => {
if let Some(f) = n.as_f64() { ws.write_number(row_num, col as u16, f).ok(); }
else { ws.write_string(row_num, col as u16, &n.to_string()).ok(); }
Some(serde_json::Value::String(s)) => {
ws.write_string(row_num, col as u16, s).ok();
}
Some(serde_json::Value::Number(n)) => {
if let Some(f) = n.as_f64() {
ws.write_number(row_num, col as u16, f).ok();
} else {
ws.write_string(row_num, col as u16, n.to_string()).ok();
}
}
Some(serde_json::Value::Bool(b)) => {
ws.write_string(row_num, col as u16, b.to_string()).ok();
}
Some(serde_json::Value::Bool(b)) => { ws.write_string(row_num, col as u16, &b.to_string()).ok(); }
_ => {}
}
}
}
let buf = wb.save_to_buffer()
let buf = wb
.save_to_buffer()
.map_err(|e| AppError::Internal(format!("XLSX 保存失败: {}", e)))?;
Ok(buf.to_vec())
}
@@ -681,45 +790,83 @@ impl PluginDataService {
for (i, row_data) in rows.iter().enumerate() {
if let Err(e) = validate_data(row_data, &fields) {
row_errors.push(ImportRowError { row: i, errors: vec![e.to_string()] });
row_errors.push(ImportRowError {
row: i,
errors: vec![e.to_string()],
});
continue;
}
if let Err(e) = validate_ref_entities(row_data, &fields, entity_name, plugin_id, tenant_id, db, true, None).await {
row_errors.push(ImportRowError { row: i, errors: vec![e.to_string()] });
if let Err(e) = validate_ref_entities(
row_data,
&fields,
entity_name,
plugin_id,
tenant_id,
db,
true,
None,
)
.await
{
row_errors.push(ImportRowError {
row: i,
errors: vec![e.to_string()],
});
continue;
}
let (sql, values) =
DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, row_data);
let (sql, values) = DynamicTableManager::build_insert_sql(
&info.table_name,
tenant_id,
operator_id,
row_data,
);
let result = db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
)).await;
let result = db
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await;
match result {
Ok(_) => success_count += 1,
Err(e) => {
row_errors.push(ImportRowError { row: i, errors: vec![format!("写入失败: {}", e)] });
row_errors.push(ImportRowError {
row: i,
errors: vec![format!("写入失败: {}", e)],
});
}
}
}
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.import", entity_name),
AuditLog::new(
tenant_id,
Some(operator_id),
"plugin.data.import",
entity_name,
),
db,
)
.await;
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(
&triggers, "create", entity_name,
&format!("batch_import:{}", success_count),
tenant_id, None, event_bus, db, &mid,
).await;
}
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await
&& let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await
{
emit_trigger_events(
&triggers,
"create",
entity_name,
&format!("batch_import:{}", success_count),
tenant_id,
None,
event_bus,
db,
&mid,
)
.await;
}
Ok(ImportResult {
@@ -757,13 +904,15 @@ impl PluginDataService {
"batch_delete" => {
// 批量删除前先执行级联策略(逐条,复用 delete 的级联逻辑)
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(info.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
serde_json::from_value(info.schema_json.clone()).map_err(|e| {
AppError::Internal(format!("解析 entity schema 失败: {}", e))
})?;
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
for &del_id in &ids {
for relation in &entity_def.relations {
let rel_table = DynamicTableManager::table_name(&manifest_id, &relation.entity);
let rel_table =
DynamicTableManager::table_name(&manifest_id, &relation.entity);
let fk = sanitize_identifier(&relation.foreign_key);
match relation.on_delete {
crate::manifest::OnDeleteStrategy::Restrict => {
@@ -773,12 +922,17 @@ impl PluginDataService {
);
#[derive(FromQueryResult)]
#[allow(dead_code)] // FromQueryResult 映射需要 chk 字段,仅检查是否存在
struct RefCheck { chk: Option<i32> }
let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[del_id.to_string().into(), tenant_id.into()],
)).one(db).await?;
struct RefCheck {
chk: Option<i32>,
}
let has_ref =
RefCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[del_id.to_string().into(), tenant_id.into()],
))
.one(db)
.await?;
if has_ref.is_some() {
return Err(AppError::Validation(format!(
"记录 {} 存在关联的 {} 记录,无法删除",
@@ -795,7 +949,8 @@ impl PluginDataService {
sea_orm::DatabaseBackend::Postgres,
nullify_sql,
[del_id.to_string().into(), tenant_id.into()],
)).await?;
))
.await?;
}
crate::manifest::OnDeleteStrategy::Cascade => {
let cascade_sql = format!(
@@ -806,7 +961,8 @@ impl PluginDataService {
sea_orm::DatabaseBackend::Postgres,
cascade_sql,
[del_id.to_string().into(), tenant_id.into()],
)).await?;
))
.await?;
}
}
}
@@ -884,7 +1040,7 @@ impl PluginDataService {
return Err(AppError::Validation(format!(
"不支持的批量操作: {}",
req.action
)))
)));
}
};
@@ -922,7 +1078,7 @@ impl PluginDataService {
filter,
search_tuple,
)
.map_err(|e| AppError::Validation(e))?;
.map_err(AppError::Validation)?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
@@ -968,7 +1124,7 @@ impl PluginDataService {
group_by_field,
filter,
)
.map_err(|e| AppError::Validation(e))?;
.map_err(AppError::Validation)?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
@@ -1000,6 +1156,7 @@ impl PluginDataService {
}
/// 多聚合查询 — 支持 COUNT + SUM/AVG/MIN/MAX
#[allow(clippy::too_many_arguments)]
pub async fn aggregate_multi(
plugin_id: Uuid,
entity_name: &str,
@@ -1019,7 +1176,7 @@ impl PluginDataService {
aggregations,
filter,
)
.map_err(|e| AppError::Validation(e))?;
.map_err(AppError::Validation)?;
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
@@ -1048,17 +1205,26 @@ impl PluginDataService {
.and_then(|d| d.as_array().cloned())
.unwrap_or_default();
let rows = json_rows.into_iter().map(|v| AggregateMultiRow {
key: v.get("key").and_then(|k| k.as_str()).unwrap_or_default().to_string(),
count: v.get("count").and_then(|c| c.as_i64()).unwrap_or(0),
metrics: v.as_object()
.map(|m| m.iter()
.filter(|(k, _)| *k != "key" && *k != "count")
.map(|(k, v)| (k.clone(), v.as_f64().unwrap_or(0.0)))
.collect()
)
.unwrap_or_default(),
}).collect();
let rows = json_rows
.into_iter()
.map(|v| AggregateMultiRow {
key: v
.get("key")
.and_then(|k| k.as_str())
.unwrap_or_default()
.to_string(),
count: v.get("count").and_then(|c| c.as_i64()).unwrap_or(0),
metrics: v
.as_object()
.map(|m| {
m.iter()
.filter(|(k, _)| *k != "key" && *k != "count")
.map(|(k, v)| (k.clone(), v.as_f64().unwrap_or(0.0)))
.collect()
})
.unwrap_or_default(),
})
.collect();
Ok(rows)
}
@@ -1073,10 +1239,20 @@ impl PluginDataService {
filter: Option<serde_json::Value>,
) -> AppResult<Vec<(String, i64)>> {
// TODO: 未来版本添加 Redis 缓存层
Self::aggregate(plugin_id, entity_name, tenant_id, db, group_by_field, filter, None).await
Self::aggregate(
plugin_id,
entity_name,
tenant_id,
db,
group_by_field,
filter,
None,
)
.await
}
/// 时间序列聚合 — 按时间字段截断为 day/week/month 统计计数
#[allow(clippy::too_many_arguments)]
pub async fn timeseries(
plugin_id: Uuid,
entity_name: &str,
@@ -1098,7 +1274,7 @@ impl PluginDataService {
start.as_deref(),
end.as_deref(),
)
.map_err(|e| AppError::Validation(e))?;
.map_err(AppError::Validation)?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
@@ -1156,7 +1332,9 @@ impl PluginDataService {
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
// 找出所有有 ref_entity 的字段
let ref_fields: Vec<&PluginField> = schema.fields.iter()
let ref_fields: Vec<&PluginField> = schema
.fields
.iter()
.filter(|f| f.ref_entity.is_some())
.collect();
@@ -1198,7 +1376,9 @@ impl PluginDataService {
for row in rows {
// 验证 ref_val 是有效的 UUID 且目标记录存在
let Ok(target_uuid) = Uuid::parse_str(&row.ref_val) else { continue };
let Ok(target_uuid) = Uuid::parse_str(&row.ref_val) else {
continue;
};
let ref_entity_name = field.ref_entity.as_deref().unwrap_or("");
let ref_plugin = field.ref_plugin.as_deref().unwrap_or(&manifest_id);
@@ -1260,9 +1440,8 @@ pub async fn resolve_manifest_id(
.await?
.ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_id)))?;
let manifest: crate::manifest::PluginManifest =
serde_json::from_value(model.manifest_json)
.map_err(|e| AppError::Internal(format!("解析插件 manifest 失败: {}", e)))?;
let manifest: crate::manifest::PluginManifest = serde_json::from_value(model.manifest_json)
.map_err(|e| AppError::Internal(format!("解析插件 manifest 失败: {}", e)))?;
Ok(manifest.metadata.id)
}
@@ -1417,9 +1596,9 @@ pub async fn is_plugin_active(
/// 校验数据:检查 required 字段 + 正则校验
fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
let obj = data
.as_object()
.ok_or_else(|| AppError::Validation("data 必须是 JSON 对象".to_string()))?;
for field in fields {
let label = field.display_name.as_deref().unwrap_or(&field.name);
@@ -1430,20 +1609,18 @@ fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<
}
// 正则校验
if let Some(validation) = &field.validation {
if let Some(pattern) = &validation.pattern {
if let Some(val) = obj.get(&field.name) {
let str_val = val.as_str().unwrap_or("");
if !str_val.is_empty() {
let re = regex::Regex::new(pattern)
.map_err(|e| AppError::Internal(format!("正则表达式编译失败: {}", e)))?;
if !re.is_match(str_val) {
let default_msg = format!("字段 '{}' 格式不正确", label);
let msg = validation.message.as_deref()
.unwrap_or(&default_msg);
return Err(AppError::Validation(msg.to_string()));
}
}
if let Some(validation) = &field.validation
&& let Some(pattern) = &validation.pattern
&& let Some(val) = obj.get(&field.name)
{
let str_val = val.as_str().unwrap_or("");
if !str_val.is_empty() {
let re = regex::Regex::new(pattern)
.map_err(|e| AppError::Internal(format!("正则表达式编译失败: {}", e)))?;
if !re.is_match(str_val) {
let default_msg = format!("字段 '{}' 格式不正确", label);
let msg = validation.message.as_deref().unwrap_or(&default_msg);
return Err(AppError::Validation(msg.to_string()));
}
}
}
@@ -1455,6 +1632,7 @@ fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<
/// 校验外键引用 — 检查 ref_entity 字段指向的记录是否存在
/// 支持同插件引用和跨插件引用ref_plugin 字段)
/// 核心原则:跨插件引用目标插件未安装时跳过校验(软警告)
#[allow(clippy::too_many_arguments)]
async fn validate_ref_entities(
data: &serde_json::Value,
fields: &[PluginField],
@@ -1465,17 +1643,25 @@ async fn validate_ref_entities(
is_create: bool,
record_id: Option<Uuid>,
) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
let obj = data
.as_object()
.ok_or_else(|| AppError::Validation("data 必须是 JSON 对象".to_string()))?;
for field in fields {
let Some(ref_entity_name) = &field.ref_entity else { continue };
let Some(val) = obj.get(&field.name) else { continue };
let Some(ref_entity_name) = &field.ref_entity else {
continue;
};
let Some(val) = obj.get(&field.name) else {
continue;
};
let str_val = val.as_str().unwrap_or("").trim().to_string();
if str_val.is_empty() && !field.required { continue; }
if str_val.is_empty() { continue; }
if str_val.is_empty() && !field.required {
continue;
}
if str_val.is_empty() {
continue;
}
let ref_id = Uuid::parse_str(&str_val).map_err(|_| {
AppError::Validation(format!(
@@ -1490,10 +1676,13 @@ async fn validate_ref_entities(
continue;
}
// 自引用 + update检查是否引用自身
if ref_entity_name == current_entity && field.ref_plugin.is_none() && !is_create {
if let Some(rid) = record_id {
if ref_id == rid { continue; }
}
if ref_entity_name == current_entity
&& field.ref_plugin.is_none()
&& !is_create
&& let Some(rid) = record_id
&& ref_id == rid
{
continue;
}
// 确定目标表名
@@ -1534,12 +1723,16 @@ async fn validate_ref_entities(
);
#[derive(FromQueryResult)]
#[allow(dead_code)] // FromQueryResult 映射需要 check_result 字段,仅检查是否存在
struct ExistsCheck { check_result: Option<i32> }
struct ExistsCheck {
check_result: Option<i32>,
}
let result = ExistsCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[ref_id.into(), tenant_id.into()],
)).one(db).await?;
))
.one(db)
.await?;
if result.is_none() {
return Err(AppError::Validation(format!(
@@ -1560,13 +1753,16 @@ async fn check_no_cycle(
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
let Some(val) = data.get(&field.name) else { return Ok(()) };
let Some(val) = data.get(&field.name) else {
return Ok(());
};
let new_parent = val.as_str().unwrap_or("").trim().to_string();
if new_parent.is_empty() { return Ok(()); }
if new_parent.is_empty() {
return Ok(());
}
let new_parent_id = Uuid::parse_str(&new_parent).map_err(|_| {
AppError::Validation("parent_id 不是有效的 UUID".to_string())
})?;
let new_parent_id = Uuid::parse_str(&new_parent)
.map_err(|_| AppError::Validation("parent_id 不是有效的 UUID".to_string()))?;
let field_name = sanitize_identifier(&field.name);
let mut visited = vec![record_id];
@@ -1576,7 +1772,8 @@ async fn check_no_cycle(
if visited.contains(&current_id) {
let label = field.display_name.as_deref().unwrap_or(&field.name);
return Err(AppError::Validation(format!(
"字段 '{}' 形成循环引用", label
"字段 '{}' 形成循环引用",
label
)));
}
visited.push(current_id);
@@ -1586,20 +1783,25 @@ async fn check_no_cycle(
field_name, table_name
);
#[derive(FromQueryResult)]
struct ParentRow { parent: Option<String> }
struct ParentRow {
parent: Option<String>,
}
let row = ParentRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
query_sql,
[current_id.into(), tenant_id.into()],
)).one(db).await?;
))
.one(db)
.await?;
match row {
Some(r) => {
let parent = r.parent.unwrap_or_default().trim().to_string();
if parent.is_empty() { break; }
current_id = Uuid::parse_str(&parent).map_err(|_| {
AppError::Internal("parent_id 不是有效的 UUID".to_string())
})?;
if parent.is_empty() {
break;
}
current_id = Uuid::parse_str(&parent)
.map_err(|_| AppError::Internal("parent_id 不是有效的 UUID".to_string()))?;
}
None => break,
}
@@ -1673,7 +1875,11 @@ mod validate_tests {
#[test]
fn validate_phone_pattern_rejects_invalid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let fields = vec![make_field(
"phone",
Some("^1[3-9]\\d{9}$"),
Some("手机号格式不正确"),
)];
let data = serde_json::json!({"phone": "1234"});
let result = validate_data(&data, &fields);
assert!(result.is_err());
@@ -1681,7 +1887,11 @@ mod validate_tests {
#[test]
fn validate_phone_pattern_accepts_valid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let fields = vec![make_field(
"phone",
Some("^1[3-9]\\d{9}$"),
Some("手机号格式不正确"),
)];
let data = serde_json::json!({"phone": "13812345678"});
let result = validate_data(&data, &fields);
assert!(result.is_ok());

View File

@@ -10,7 +10,13 @@ use crate::manifest::{PluginEntity, PluginField, PluginFieldType};
pub(crate) fn sanitize_identifier(input: &str) -> String {
input
.chars()
.map(|c| if c.is_ascii_alphanumeric() || c == '_' { c } else { '_' })
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect()
}
@@ -56,7 +62,9 @@ impl DynamicTableManager {
let col_name = format!("_f_{}", sanitize_identifier(&field.name));
let sql_type = field.field_type.generated_sql_type();
let expr = field.field_type.generated_expr(&sanitize_identifier(&field.name));
let expr = field
.field_type
.generated_expr(&sanitize_identifier(&field.name));
gen_cols.push(format!(
" \"{}\" {} GENERATED ALWAYS AS ({}) STORED",
@@ -80,8 +88,7 @@ impl DynamicTableManager {
// pg_trgm 索引
for field in &entity.fields {
if field.searchable == Some(true)
&& matches!(field.field_type, PluginFieldType::String)
if field.searchable == Some(true) && matches!(field.field_type, PluginFieldType::String)
{
let sf = sanitize_identifier(&field.name);
indexes.push(format!(
@@ -128,11 +135,7 @@ impl DynamicTableManager {
entity: &PluginEntity,
) -> PluginResult<()> {
let ddl = Self::build_create_table_sql(plugin_id, entity);
for sql in ddl
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
{
for sql in ddl.split(';').map(|s| s.trim()).filter(|s| !s.is_empty()) {
tracing::info!(sql = %sql, "Executing DDL");
db.execute_unprepared(sql).await.map_err(|e| {
tracing::error!(sql = %sql, error = %e, "DDL execution failed");
@@ -179,21 +182,25 @@ impl DynamicTableManager {
continue;
}
// 新增字段 + 需要 Generated Column 的条件
let needs_gen = field.unique
|| field.sortable == Some(true)
|| field.filterable == Some(true);
let needs_gen =
field.unique || field.sortable == Some(true) || field.filterable == Some(true);
if needs_gen {
new_filterable.push(field.clone());
if field.sortable == Some(true) {
new_sortable.push(field.clone());
}
}
if field.searchable == Some(true) && matches!(field.field_type, PluginFieldType::String) {
if field.searchable == Some(true) && matches!(field.field_type, PluginFieldType::String)
{
new_searchable.push(field.clone());
}
}
FieldDiff { new_filterable, new_sortable, new_searchable }
FieldDiff {
new_filterable,
new_sortable,
new_searchable,
}
}
/// Schema 演进:为已有实体新增 Generated Column 和索引
@@ -212,7 +219,9 @@ impl DynamicTableManager {
}
let col_name = format!("_f_{}", sanitize_identifier(&field.name));
let sql_type = field.field_type.generated_sql_type();
let expr = field.field_type.generated_expr(&sanitize_identifier(&field.name));
let expr = field
.field_type
.generated_expr(&sanitize_identifier(&field.name));
let _safe_field = sanitize_identifier(&field.name);
statements.push(format!(
@@ -329,7 +338,11 @@ impl DynamicTableManager {
LIMIT $2 OFFSET $3",
table_name
);
let values = vec![tenant_id.into(), (limit as i64).into(), (offset as i64).into()];
let values = vec![
tenant_id.into(),
(limit as i64).into(),
(offset as i64).into(),
];
(sql, values)
}
@@ -398,7 +411,9 @@ impl DynamicTableManager {
table_name, set_expr
);
let values = vec![
serde_json::to_string(&partial_data).unwrap_or_default().into(),
serde_json::to_string(&partial_data)
.unwrap_or_default()
.into(),
user_id.into(),
id.into(),
tenant_id.into(),
@@ -408,11 +423,7 @@ impl DynamicTableManager {
}
/// 构建 DELETE SQL软删除
pub fn build_delete_sql(
table_name: &str,
id: Uuid,
tenant_id: Uuid,
) -> (String, Vec<Value>) {
pub fn build_delete_sql(table_name: &str, id: Uuid, tenant_id: Uuid) -> (String, Vec<Value>) {
let sql = format!(
"UPDATE \"{}\" \
SET deleted_at = NOW(), updated_at = NOW() \
@@ -469,19 +480,19 @@ impl DynamicTableManager {
let mut values: Vec<Value> = vec![tenant_id.into()];
// 处理 filter与 build_filtered_query_sql 保持一致)
if let Some(f) = filter {
if let Some(obj) = f.as_object() {
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
if let Some(f) = filter
&& let Some(obj) = f.as_object()
{
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
}
}
@@ -533,19 +544,19 @@ impl DynamicTableManager {
let mut values: Vec<Value> = vec![tenant_id.into()];
// 处理 filter
if let Some(f) = filter {
if let Some(obj) = f.as_object() {
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
if let Some(f) = filter
&& let Some(obj) = f.as_object()
{
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
}
}
@@ -584,19 +595,19 @@ impl DynamicTableManager {
let mut param_idx = 2;
let mut values: Vec<Value> = vec![tenant_id.into()];
if let Some(f) = filter {
if let Some(obj) = f.as_object() {
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
if let Some(f) = filter
&& let Some(obj) = f.as_object()
{
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
}
}
@@ -610,16 +621,20 @@ impl DynamicTableManager {
let func_lower = func.to_lowercase();
match func_lower.as_str() {
"sum" => select_parts.push(format!(
"COALESCE(SUM(\"_f_{}\"), 0) as sum_{}", clean_field, clean_field
"COALESCE(SUM(\"_f_{}\"), 0) as sum_{}",
clean_field, clean_field
)),
"avg" => select_parts.push(format!(
"COALESCE(AVG(\"_f_{}\"), 0) as avg_{}", clean_field, clean_field
"COALESCE(AVG(\"_f_{}\"), 0) as avg_{}",
clean_field, clean_field
)),
"min" => select_parts.push(format!(
"MIN(\"_f_{}\") as min_{}", clean_field, clean_field
"MIN(\"_f_{}\") as min_{}",
clean_field, clean_field
)),
"max" => select_parts.push(format!(
"MAX(\"_f_{}\") as max_{}", clean_field, clean_field
"MAX(\"_f_{}\") as max_{}",
clean_field, clean_field
)),
_ => {}
}
@@ -641,6 +656,7 @@ impl DynamicTableManager {
}
/// 构建带过滤条件的查询 SQL
#[allow(clippy::too_many_arguments)]
pub fn build_filtered_query_sql(
table_name: &str,
tenant_id: Uuid,
@@ -659,19 +675,19 @@ impl DynamicTableManager {
let mut values: Vec<Value> = vec![tenant_id.into()];
// 处理 filter
if let Some(f) = filter {
if let Some(obj) = f.as_object() {
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
if let Some(f) = filter
&& let Some(obj) = f.as_object()
{
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
}
}
@@ -734,6 +750,7 @@ impl DynamicTableManager {
}
/// 扩展版查询构建 — 支持 Generated Column 路由
#[allow(clippy::too_many_arguments)]
pub fn build_filtered_query_sql_ex(
table_name: &str,
tenant_id: Uuid,
@@ -755,19 +772,19 @@ impl DynamicTableManager {
let mut values: Vec<Value> = vec![tenant_id.into()];
// filter
if let Some(f) = filter {
if let Some(obj) = f.as_object() {
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("{} = ${}", ref_fn(&clean_key), param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
if let Some(f) = filter
&& let Some(obj) = f.as_object()
{
for (key, val) in obj {
let clean_key = sanitize_identifier(key);
if clean_key.is_empty() {
return Err(format!("无效的过滤字段名: {}", key));
}
conditions.push(format!("{} = ${}", ref_fn(&clean_key), param_idx));
values.push(Value::String(Some(Box::new(
val.as_str().unwrap_or("").to_string(),
))));
param_idx += 1;
}
}
@@ -875,7 +892,8 @@ impl DynamicTableManager {
)
}
}
"all" | _ => (String::new(), vec![]),
"all" => (String::new(), vec![]),
_ => (String::new(), vec![]),
}
}
@@ -893,8 +911,8 @@ impl DynamicTableManager {
let json_str = BASE64
.decode(cursor)
.map_err(|e| format!("游标 Base64 解码失败: {}", e))?;
let obj: serde_json::Value = serde_json::from_slice(&json_str)
.map_err(|e| format!("游标 JSON 解析失败: {}", e))?;
let obj: serde_json::Value =
serde_json::from_slice(&json_str).map_err(|e| format!("游标 JSON 解析失败: {}", e))?;
let values = obj["v"]
.as_array()
.ok_or("游标缺少 v 字段")?
@@ -923,7 +941,7 @@ impl DynamicTableManager {
let ref_fn = Self::field_reference_fn(generated_fields);
let sort_col = sort_column
.as_deref()
.map(|s| ref_fn(s))
.map(ref_fn)
.unwrap_or("\"created_at\"".to_string());
let mut values: Vec<Value> = vec![tenant_id.into()];
@@ -1098,7 +1116,10 @@ mod tests {
assert!(sql.contains("ILIKE"), "Expected ILIKE in SQL, got: {}", sql);
// 验证搜索参数值包含 %...%
if let Value::String(Some(s)) = &values[1] {
assert!(s.contains("测试关键词"), "Search value should contain keyword");
assert!(
s.contains("测试关键词"),
"Search value should contain keyword"
);
assert!(s.starts_with('%'), "Search value should start with %");
}
}
@@ -1188,7 +1209,11 @@ mod tests {
None,
)
.unwrap();
assert!(sql.contains("\"data\"->>'status' ="), "Expected filter, got: {}", sql);
assert!(
sql.contains("\"data\"->>'status' ="),
"Expected filter, got: {}",
sql
);
assert_eq!(values.len(), 2); // tenant_id + filter_value
}
@@ -1231,8 +1256,16 @@ mod tests {
)
.unwrap();
assert!(sql.contains("GROUP BY"), "Expected GROUP BY, got: {}", sql);
assert!(sql.contains("\"data\"->>'status'"), "Expected group field, got: {}", sql);
assert!(sql.contains("ORDER BY count DESC"), "Expected ORDER BY count DESC, got: {}", sql);
assert!(
sql.contains("\"data\"->>'status'"),
"Expected group field, got: {}",
sql
);
assert!(
sql.contains("ORDER BY count DESC"),
"Expected ORDER BY count DESC, got: {}",
sql
);
assert_eq!(values.len(), 1); // 仅 tenant_id
}
@@ -1245,8 +1278,16 @@ mod tests {
Some(serde_json::json!({"status": "active"})),
)
.unwrap();
assert!(sql.contains("\"data\"->>'region'"), "Expected group field, got: {}", sql);
assert!(sql.contains("\"data\"->>'status' ="), "Expected filter, got: {}", sql);
assert!(
sql.contains("\"data\"->>'region'"),
"Expected group field, got: {}",
sql
);
assert!(
sql.contains("\"data\"->>'status' ="),
"Expected filter, got: {}",
sql
);
assert_eq!(values.len(), 2); // tenant_id + filter_value
}
@@ -1260,7 +1301,11 @@ mod tests {
);
let (sql, _) = result.unwrap();
assert!(!sql.contains("DROP TABLE"), "SQL 不应包含注入: {}", sql);
assert!(sql.contains("evil___DROP_TABLE__"), "字段名应被清理: {}", sql);
assert!(
sql.contains("evil___DROP_TABLE__"),
"字段名应被清理: {}",
sql
);
}
#[test]
@@ -1317,14 +1362,8 @@ mod tests {
};
let sql = DynamicTableManager::build_create_table_sql("erp_crm", &entity);
assert!(
sql.contains("_f_code"),
"应包含 _f_code Generated Column"
);
assert!(
sql.contains("_f_level"),
"应包含 _f_level Generated Column"
);
assert!(sql.contains("_f_code"), "应包含 _f_code Generated Column");
assert!(sql.contains("_f_level"), "应包含 _f_level Generated Column");
assert!(
sql.contains("_f_sort_order"),
"应包含 _f_sort_order Generated Column"
@@ -1333,10 +1372,7 @@ mod tests {
sql.contains("GENERATED ALWAYS AS"),
"应包含 GENERATED ALWAYS AS"
);
assert!(
sql.contains("::INTEGER"),
"Integer 字段应有类型转换"
);
assert!(sql.contains("::INTEGER"), "Integer 字段应有类型转换");
}
#[test]
@@ -1476,10 +1512,7 @@ mod tests {
&[],
)
.unwrap();
assert!(
sql.contains("ROW("),
"cursor 条件应使用 ROW 比较"
);
assert!(sql.contains("ROW("), "cursor 条件应使用 ROW 比较");
assert!(
values.len() >= 4,
"应有 tenant_id + cursor_val + cursor_id + limit"
@@ -1530,16 +1563,8 @@ mod tests {
"department 应使用 IN 条件, got: {}",
sql
);
assert!(
sql.contains("$2"),
"参数索引应从 2 开始, got: {}",
sql
);
assert!(
sql.contains("$3"),
"第二个参数索引应为 3, got: {}",
sql
);
assert!(sql.contains("$2"), "参数索引应从 2 开始, got: {}", sql);
assert!(sql.contains("$3"), "第二个参数索引应为 3, got: {}", sql);
assert_eq!(values.len(), 2);
}
@@ -1684,35 +1709,16 @@ mod tests {
#[test]
fn test_sanitize_removes_special_chars() {
let result = sanitize_identifier("table;name'here\"with`special");
assert!(
!result.contains(';'),
"号应被替换: {}",
result
);
assert!(
!result.contains('\''),
"单引号应被替换: {}",
result
);
assert!(
!result.contains('"'),
"双引号应被替换: {}",
result
);
assert!(
!result.contains('`'),
"反引号应被替换: {}",
result
);
assert!(!result.contains(';'), "分号应被替换: {}", result);
assert!(!result.contains('\''), "单引号应被替换: {}", result);
assert!(!result.contains('"'), "双引号应被替换: {}", result);
assert!(!result.contains('`'), "反引号应被替换: {}", result);
}
#[test]
fn test_sanitize_allows_alphanumeric_underscore() {
let result = sanitize_identifier("my_table_123");
assert_eq!(
result, "my_table_123",
"合法标识符应原样保留"
);
assert_eq!(result, "my_table_123", "合法标识符应原样保留");
}
#[test]
@@ -1723,26 +1729,14 @@ mod tests {
"DROP TABLE 注入应被清理为下划线: {}",
result
);
assert!(
!result.contains(';'),
"不应包含分号: {}",
result
);
assert!(!result.contains(';'), "不应包含分号: {}", result);
}
#[test]
fn test_sanitize_handles_sql_comment() {
let result = sanitize_identifier("users--");
assert_eq!(
result, "users__",
"SQL 注释应被替换为下划线: {}",
result
);
assert!(
!result.contains('-'),
"不应包含连字符: {}",
result
);
assert_eq!(result, "users__", "SQL 注释应被替换为下划线: {}", result);
assert!(!result.contains('-'), "不应包含连字符: {}", result);
}
#[test]
@@ -1753,11 +1747,7 @@ mod tests {
"UNION 注入中空格应被替换为下划线: {}",
result
);
assert!(
!result.contains(' '),
"不应包含空格: {}",
result
);
assert!(!result.contains(' '), "不应包含空格: {}", result);
}
#[test]

View File

@@ -3,7 +3,10 @@ use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use dashmap::DashMap;
use sea_orm::{ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, QueryFilter, Statement, TransactionTrait};
use sea_orm::{
ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, QueryFilter, Statement,
TransactionTrait,
};
use serde_json::json;
use tokio::sync::RwLock;
use uuid::Uuid;
@@ -190,9 +193,11 @@ impl PluginEngine {
let result = self
.execute_wasm(plugin_id, &ctx, |store, instance| {
instance.erp_plugin_plugin_api().call_init(store)
instance
.erp_plugin_plugin_api()
.call_init(store)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(|e| PluginError::ExecutionError(e))?;
.map_err(PluginError::ExecutionError)?;
Ok(())
})
.await;
@@ -296,7 +301,7 @@ impl PluginEngine {
.erp_plugin_plugin_api()
.call_handle_event(store, &event_type, &payload_bytes)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(|e| PluginError::ExecutionError(e))?;
.map_err(PluginError::ExecutionError)?;
Ok(())
})
.await
@@ -317,7 +322,7 @@ impl PluginEngine {
.erp_plugin_plugin_api()
.call_on_tenant_created(store, &tenant_id_str)
.map_err(|e| PluginError::ExecutionError(e.to_string()))?
.map_err(|e| PluginError::ExecutionError(e))?;
.map_err(PluginError::ExecutionError)?;
Ok(())
})
.await
@@ -351,7 +356,9 @@ impl PluginEngine {
/// 将插件从一个 key 重命名为另一个 key用于热更新的原子替换
pub async fn rename_plugin(&self, old_id: &str, new_id: &str) -> PluginResult<()> {
let (_, loaded) = self.plugins.remove(old_id)
let (_, loaded) = self
.plugins
.remove(old_id)
.ok_or_else(|| PluginError::NotFound(old_id.to_string()))?;
let mut loaded = Arc::try_unwrap(loaded)
.map_err(|_| PluginError::ExecutionError("插件仍被引用,无法重命名".to_string()))?;
@@ -419,7 +426,10 @@ impl PluginEngine {
if entry.value().id == plugin_id {
// 配置会在下次 execute_wasm 时从数据库自动重新加载
// 这里只清理可能缓存的旧配置
tracing::info!(plugin_id, "Plugin config refresh scheduled (loaded on next invocation)");
tracing::info!(
plugin_id,
"Plugin config refresh scheduled (loaded on next invocation)"
);
return Ok(());
}
}
@@ -438,12 +448,9 @@ impl PluginEngine {
/// 恢复数据库中状态为 running/enabled 的插件。
///
/// 服务器重启后调用此方法,重新加载 WASM 到内存并启动事件监听。
pub async fn recover_plugins(
&self,
db: &DatabaseConnection,
) -> PluginResult<Vec<String>> {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
pub async fn recover_plugins(&self, db: &DatabaseConnection) -> PluginResult<Vec<String>> {
use crate::entity::plugin;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
// 查询所有运行中的插件
let running_plugins = plugin::Entity::find()
@@ -472,7 +479,10 @@ impl PluginEngine {
}
// 加载 WASM 到内存
if let Err(e) = self.load(plugin_id_str, &model.wasm_binary, manifest.clone()).await {
if let Err(e) = self
.load(plugin_id_str, &model.wasm_binary, manifest.clone())
.await
{
tracing::error!(
plugin_id = %plugin_id_str,
tenant_id = %tenant_id,
@@ -543,7 +553,8 @@ impl PluginEngine {
let loaded = self.get_loaded(plugin_id)?;
// 构建跨插件实体映射(从 manifest 的 ref_plugin 字段提取)
let cross_plugin_entities = Self::build_cross_plugin_map(&loaded.manifest, &self.db, exec_ctx.tenant_id).await;
let cross_plugin_entities =
Self::build_cross_plugin_map(&loaded.manifest, &self.db, exec_ctx.tenant_id).await;
// 加载插件配置(从数据库)
let plugin_config = Self::load_plugin_config(plugin_id, exec_ctx.tenant_id, &self.db).await;
@@ -569,43 +580,41 @@ impl PluginEngine {
store.limiter(|state| &mut state.limits);
// 实例化
let instance = PluginWorld::instantiate_async(&mut store, &loaded.component, &loaded.linker)
.await
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let instance =
PluginWorld::instantiate_async(&mut store, &loaded.component, &loaded.linker)
.await
.map_err(|e| PluginError::InstantiationError(e.to_string()))?;
let timeout_secs = self.config.execution_timeout_secs;
let pid_owned = plugin_id.to_owned();
let start = std::time::Instant::now();
// spawn_blocking 闭包执行 WASM正常完成时收集 pending_ops
let (result, pending_ops): (PluginResult<R>, Vec<PendingOp>) =
tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking(move || {
match std::panic::catch_unwind(AssertUnwindSafe(|| {
let r = operation(&mut store, &instance);
// catch_unwind 内部不能调用 into_data需要 &mut self
// 但这里 operation 已完成store 仍可用
let ops = std::mem::take(&mut store.data_mut().pending_ops);
(r, ops)
})) {
Ok((r, ops)) => (r, ops),
Err(_) => {
// panic 后丢弃所有 pending_ops,避免半完成状态写入数据库
tracing::warn!(plugin = %pid_owned, "WASM panic, discarding pending ops");
(
Err(PluginError::ExecutionError("WASM panic".to_string())),
Vec::new(),
)
}
let (result, pending_ops): (PluginResult<R>, Vec<PendingOp>) = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking(move || {
match std::panic::catch_unwind(AssertUnwindSafe(|| {
let r = operation(&mut store, &instance);
// catch_unwind 内部不能调用 into_data需要 &mut self
// 但这里 operation 已完成store 仍可用
let ops = std::mem::take(&mut store.data_mut().pending_ops);
(r, ops)
})) {
Ok((r, ops)) => (r, ops),
Err(_) => {
// panic 后丢弃所有 pending_ops避免半完成状态写入数据库
tracing::warn!(plugin = %pid_owned, "WASM panic, discarding pending ops");
(
Err(PluginError::ExecutionError("WASM panic".to_string())),
Vec::new(),
)
}
}),
)
.await
.map_err(|_| {
PluginError::ExecutionError(format!("插件执行超时 ({}s)", timeout_secs))
})?
.map_err(|e| PluginError::ExecutionError(e.to_string()))?;
}
}),
)
.await
.map_err(|_| PluginError::ExecutionError(format!("插件执行超时 ({}s)", timeout_secs)))?
.map_err(|e| PluginError::ExecutionError(e.to_string()))?;
// 更新运行时指标
let elapsed_ms = start.elapsed().as_millis() as f64;
@@ -639,13 +648,16 @@ impl PluginEngine {
plugin_id: &str,
tenant_id: Uuid,
db: &DatabaseConnection,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = serde_json::Value> + Send + 'static>> {
) -> std::pin::Pin<Box<dyn std::future::Future<Output = serde_json::Value> + Send + 'static>>
{
let db = db.clone();
let pid = plugin_id.to_string();
Box::pin(async move {
use sea_orm::FromQueryResult;
#[derive(Debug, FromQueryResult)]
struct ConfigRow { config_json: serde_json::Value }
struct ConfigRow {
config_json: serde_json::Value,
}
ConfigRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"SELECT config_json FROM plugins WHERE tenant_id = $1\n\
@@ -671,16 +683,26 @@ impl PluginEngine {
tenant_id: Uuid,
) -> HashMap<String, String> {
let mut map = HashMap::new();
let Some(schema) = &manifest.schema else { return map };
let Some(schema) = &manifest.schema else {
return map;
};
for entity in &schema.entities {
for field in &entity.fields {
if let (Some(target_plugin), Some(ref_entity)) = (&field.ref_plugin, &field.ref_entity) {
if let (Some(target_plugin), Some(ref_entity)) =
(&field.ref_plugin, &field.ref_entity)
{
let key = format!("{}.{}", target_plugin, ref_entity);
// 从 plugin_entities 表查找目标表名
let table_name = crate::entity::plugin_entity::Entity::find()
.filter(crate::entity::plugin_entity::Column::ManifestId.eq(target_plugin.as_str()))
.filter(crate::entity::plugin_entity::Column::EntityName.eq(ref_entity.as_str()))
.filter(
crate::entity::plugin_entity::Column::ManifestId
.eq(target_plugin.as_str()),
)
.filter(
crate::entity::plugin_entity::Column::EntityName
.eq(ref_entity.as_str()),
)
.filter(crate::entity::plugin_entity::Column::TenantId.eq(tenant_id))
.filter(crate::entity::plugin_entity::Column::DeletedAt.is_null())
.one(db)
@@ -716,7 +738,10 @@ impl PluginEngine {
}
// 使用事务确保所有数据库操作的原子性
let txn = db.begin().await.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
let txn = db
.begin()
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
for op in &ops {
match op {
@@ -724,11 +749,16 @@ impl PluginEngine {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let parsed_data: serde_json::Value =
serde_json::from_slice(data).unwrap_or_default();
let id_uuid = id.parse::<Uuid>().map_err(|e| {
PluginError::ExecutionError(format!("无效的 ID: {}", e))
})?;
let (sql, values) =
DynamicTableManager::build_insert_sql_with_id(&table_name, id_uuid, tenant_id, user_id, &parsed_data);
let id_uuid = id
.parse::<Uuid>()
.map_err(|e| PluginError::ExecutionError(format!("无效的 ID: {}", e)))?;
let (sql, values) = DynamicTableManager::build_insert_sql_with_id(
&table_name,
id_uuid,
tenant_id,
user_id,
&parsed_data,
);
txn.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
@@ -752,9 +782,9 @@ impl PluginEngine {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let parsed_data: serde_json::Value =
serde_json::from_slice(data).unwrap_or_default();
let id_uuid = id.parse::<Uuid>().map_err(|e| {
PluginError::ExecutionError(format!("无效的 ID: {}", e))
})?;
let id_uuid = id
.parse::<Uuid>()
.map_err(|e| PluginError::ExecutionError(format!("无效的 ID: {}", e)))?;
let (sql, values) = DynamicTableManager::build_update_sql(
&table_name,
id_uuid,
@@ -780,9 +810,9 @@ impl PluginEngine {
}
PendingOp::Delete { entity, id } => {
let table_name = DynamicTableManager::table_name(plugin_id, entity);
let id_uuid = id.parse::<Uuid>().map_err(|e| {
PluginError::ExecutionError(format!("无效的 ID: {}", e))
})?;
let id_uuid = id
.parse::<Uuid>()
.map_err(|e| PluginError::ExecutionError(format!("无效的 ID: {}", e)))?;
let (sql, values) =
DynamicTableManager::build_delete_sql(&table_name, id_uuid, tenant_id);
txn.execute(Statement::from_sql_and_values(
@@ -807,18 +837,21 @@ impl PluginEngine {
}
// 提交事务
txn.commit().await.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
txn.commit()
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
// 事务提交成功后发布事件best-effort不阻塞主流程
for op in ops {
if let PendingOp::PublishEvent { event_type, payload } = op {
if let PendingOp::PublishEvent {
event_type,
payload,
} = op
{
let parsed_payload: serde_json::Value =
serde_json::from_slice(&payload).unwrap_or_default();
let event = erp_core::events::DomainEvent::new(
&event_type,
tenant_id,
parsed_payload,
);
let event =
erp_core::events::DomainEvent::new(&event_type, tenant_id, parsed_payload);
event_bus.publish(event, db).await;
tracing::debug!(

View File

@@ -10,13 +10,13 @@ use erp_core::types::{ApiResponse, PaginatedResponse, TenantContext};
use crate::data_dto::{
AggregateItem, AggregateMultiReq, AggregateMultiRow, AggregateQueryParams, BatchActionReq,
CountQueryParams, CreatePluginDataReq, ExportParams, ImportReq, ImportResult,
PatchPluginDataReq, PluginDataListParams,
PluginDataResp, PublicEntityResp, ReconciliationReport, ResolveLabelsReq, ResolveLabelsResp,
TimeseriesItem, TimeseriesParams, UpdatePluginDataReq, UserViewReq, UserViewResp,
PatchPluginDataReq, PluginDataListParams, PluginDataResp, PublicEntityResp,
ReconciliationReport, ResolveLabelsReq, ResolveLabelsResp, TimeseriesItem, TimeseriesParams,
UpdatePluginDataReq, UserViewReq, UserViewResp,
};
use sea_orm::{ConnectionTrait, Statement};
use crate::data_service::{DataScopeParams, PluginDataService, resolve_manifest_id};
use crate::state::PluginState;
use sea_orm::{ConnectionTrait, Statement};
/// 获取当前用户对指定权限的 data_scope 等级
///
@@ -61,10 +61,7 @@ async fn get_data_scope(
///
/// 当前返回 TenantContext 中的 department_ids。
/// 未来实现递归查询部门树时将支持 include_sub_depts 参数。
async fn get_dept_members(
ctx: &TenantContext,
_include_sub_depts: bool,
) -> Vec<Uuid> {
async fn get_dept_members(ctx: &TenantContext, _include_sub_depts: bool) -> Vec<Uuid> {
// 当前 department_ids 为空时返回空列表
// 未来实现递归查询部门树
if ctx.department_ids.is_empty() {
@@ -109,9 +106,7 @@ where
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(
&ctx, &manifest_id, &entity, &fine_perm, &state.db,
).await?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
@@ -282,9 +277,16 @@ where
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::partial_update(
plugin_id, &entity, id, ctx.tenant_id, ctx.user_id,
req.data, req.version, &state.db,
).await?;
plugin_id,
&entity,
id,
ctx.tenant_id,
ctx.user_id,
req.data,
req.version,
&state.db,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
@@ -394,9 +396,7 @@ where
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(
&ctx, &manifest_id, &entity, &fine_perm, &state.db,
).await?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
@@ -444,9 +444,7 @@ where
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(
&ctx, &manifest_id, &entity, &fine_perm, &state.db,
).await?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
@@ -499,9 +497,7 @@ where
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(
&ctx, &manifest_id, &entity, &fine_perm, &state.db,
).await?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let result = PluginDataService::timeseries(
plugin_id,
@@ -563,9 +559,8 @@ async fn check_entity_data_scope(
let Some(e) = entity else { return Ok(false) };
let schema: crate::manifest::PluginEntity =
serde_json::from_value(e.schema_json)
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let schema: crate::manifest::PluginEntity = serde_json::from_value(e.schema_json)
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
Ok(schema.data_scope.unwrap_or(false))
}
@@ -595,11 +590,10 @@ where
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
let scope = resolve_data_scope(
&ctx, &manifest_id, &entity, &fine_perm, &state.db,
).await?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let aggregations: Vec<(String, String)> = body.aggregations
let aggregations: Vec<(String, String)> = body
.aggregations
.iter()
.map(|a| (a.func.clone(), a.field.clone()))
.collect();
@@ -633,9 +627,9 @@ pub async fn resolve_ref_labels<S>(
where
PluginState: FromRef<S>,
{
use sea_orm::{FromQueryResult, Statement};
use crate::data_service::{resolve_cross_plugin_entity, is_plugin_active};
use crate::data_service::{is_plugin_active, resolve_cross_plugin_entity};
use crate::manifest::PluginEntity;
use sea_orm::{FromQueryResult, Statement};
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
@@ -643,12 +637,15 @@ where
// 获取当前实体的 schema
let entity_info = crate::data_service::resolve_entity_info_cached(
plugin_id, &entity, ctx.tenant_id, &state.db, &state.entity_cache,
).await?;
let entity_def: PluginEntity =
serde_json::from_value(entity_info.schema_json).map_err(|e|
AppError::Internal(format!("解析 entity schema 失败: {}", e))
)?;
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
&state.entity_cache,
)
.await?;
let entity_def: PluginEntity = serde_json::from_value(entity_info.schema_json)
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let mut labels = serde_json::Map::<String, serde_json::Value>::new();
let mut meta = serde_json::Map::<String, serde_json::Value>::new();
@@ -657,7 +654,9 @@ where
// 查找字段定义
let field_def = entity_def.fields.iter().find(|f| &f.name == field_name);
let Some(field_def) = field_def else { continue };
let Some(ref_entity_name) = &field_def.ref_entity else { continue };
let Some(ref_entity_name) = &field_def.ref_entity else {
continue;
};
let target_plugin = field_def.ref_plugin.as_deref().unwrap_or(&manifest_id);
let label_field = field_def.ref_label_field.as_deref().unwrap_or("name");
@@ -665,16 +664,20 @@ where
let installed = is_plugin_active(target_plugin, ctx.tenant_id, &state.db).await;
// meta 信息
meta.insert(field_name.clone(), serde_json::json!({
"target_plugin": target_plugin,
"target_entity": ref_entity_name,
"label_field": label_field,
"plugin_installed": installed,
}));
meta.insert(
field_name.clone(),
serde_json::json!({
"target_plugin": target_plugin,
"target_entity": ref_entity_name,
"label_field": label_field,
"plugin_installed": installed,
}),
);
if !installed {
// 目标插件未安装 → 所有 UUID 返回 null
let nulls: serde_json::Map<String, serde_json::Value> = uuids.iter()
let nulls: serde_json::Map<String, serde_json::Value> = uuids
.iter()
.map(|u| (u.clone(), serde_json::Value::Null))
.collect();
labels.insert(field_name.clone(), serde_json::Value::Object(nulls));
@@ -683,10 +686,18 @@ where
// 解析目标表名
let target_table = if field_def.ref_plugin.is_some() {
match resolve_cross_plugin_entity(target_plugin, ref_entity_name, ctx.tenant_id, &state.db).await {
match resolve_cross_plugin_entity(
target_plugin,
ref_entity_name,
ctx.tenant_id,
&state.db,
)
.await
{
Ok(info) => info.table_name,
Err(_) => {
let nulls: serde_json::Map<String, serde_json::Value> = uuids.iter()
let nulls: serde_json::Map<String, serde_json::Value> = uuids
.iter()
.map(|u| (u.clone(), serde_json::Value::Null))
.collect();
labels.insert(field_name.clone(), serde_json::Value::Object(nulls));
@@ -698,33 +709,48 @@ where
};
// 批量查询标签
let uuid_strs: Vec<String> = uuids.iter().filter_map(|u| Uuid::parse_str(u).ok()).map(|u| u.to_string()).collect();
let uuid_strs: Vec<String> = uuids
.iter()
.filter_map(|u| Uuid::parse_str(u).ok())
.map(|u| u.to_string())
.collect();
if uuid_strs.is_empty() {
labels.insert(field_name.clone(), serde_json::json!({}));
continue;
}
// 构建 IN 子句参数
let placeholders: Vec<String> = (2..uuid_strs.len() + 2).map(|i| format!("${}", i)).collect();
let placeholders: Vec<String> = (2..uuid_strs.len() + 2)
.map(|i| format!("${}", i))
.collect();
let sql = format!(
"SELECT id::text, data->>'{}' as label FROM \"{}\" WHERE id IN ({}) AND tenant_id = $1 AND deleted_at IS NULL",
label_field, target_table, placeholders.join(", ")
label_field,
target_table,
placeholders.join(", ")
);
let mut values: Vec<sea_orm::Value> = vec![ctx.tenant_id.into()];
for u in &uuid_strs {
let uuid: Uuid = u.parse().map_err(|e| AppError::Internal(format!("invalid uuid: {}", e)))?;
let uuid: Uuid = u
.parse()
.map_err(|e| AppError::Internal(format!("invalid uuid: {}", e)))?;
values.push(uuid.into());
}
#[derive(FromQueryResult)]
struct LabelRow { id: String, label: Option<String> }
struct LabelRow {
id: String,
label: Option<String>,
}
let rows = LabelRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
)).all(&state.db).await?;
))
.all(&state.db)
.await?;
let mut field_labels: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
// 初始化所有请求的 UUID 为 null
@@ -733,7 +759,10 @@ where
}
// 用查询结果填充
for row in rows {
field_labels.insert(row.id, serde_json::Value::String(row.label.unwrap_or_default()));
field_labels.insert(
row.id,
serde_json::Value::String(row.label.unwrap_or_default()),
);
}
labels.insert(field_name.clone(), serde_json::Value::Object(field_labels));
@@ -758,7 +787,7 @@ where
PluginState: FromRef<S>,
{
use crate::entity::plugin_entity;
use sea_orm::{EntityTrait, QueryFilter, ColumnTrait};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let entities = plugin_entity::Entity::find()
.filter(plugin_entity::Column::TenantId.eq(ctx.tenant_id))
@@ -767,18 +796,23 @@ where
.all(&state.db)
.await?;
let result: Vec<PublicEntityResp> = entities.iter().map(|e| {
let display_name = e.schema_json.get("display_name")
.and_then(|v| v.as_str())
.unwrap_or(&e.entity_name)
.to_string();
PublicEntityResp {
manifest_id: e.manifest_id.clone(),
plugin_id: e.plugin_id.to_string(),
entity_name: e.entity_name.clone(),
display_name,
}
}).collect();
let result: Vec<PublicEntityResp> = entities
.iter()
.map(|e| {
let display_name = e
.schema_json
.get("display_name")
.and_then(|v| v.as_str())
.unwrap_or(&e.entity_name)
.to_string();
PublicEntityResp {
manifest_id: e.manifest_id.clone(),
plugin_id: e.plugin_id.to_string(),
entity_name: e.entity_name.clone(),
display_name,
}
})
.collect();
Ok(Json(ApiResponse::ok(result)))
}
@@ -807,16 +841,14 @@ where
S: Clone + Send + Sync + 'static,
{
use crate::data_dto::ExportPayload;
use axum::http::{header, StatusCode};
use axum::body::Body;
use axum::http::{StatusCode, header};
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
let scope = resolve_data_scope(
&ctx, &manifest_id, &entity, &fine_perm, &state.db,
).await?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let filter: Option<serde_json::Value> = params
.filter
@@ -838,7 +870,11 @@ where
)
.await?;
let filename = format!("{}_export_{}", entity, chrono::Utc::now().format("%Y%m%d%H%M%S"));
let filename = format!(
"{}_export_{}",
entity,
chrono::Utc::now().format("%Y%m%d%H%M%S")
);
match payload {
ExportPayload::Json(data) => {
let body = serde_json::to_string(&ApiResponse::ok(data))
@@ -849,22 +885,27 @@ where
.body(Body::from(body))
.unwrap())
}
ExportPayload::Csv(bytes) => {
Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/csv; charset=utf-8")
.header(header::CONTENT_DISPOSITION, format!("attachment; filename=\"{}.csv\"", filename))
.body(Body::from(bytes))
.unwrap())
}
ExportPayload::Xlsx(bytes) => {
Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
.header(header::CONTENT_DISPOSITION, format!("attachment; filename=\"{}.xlsx\"", filename))
.body(Body::from(bytes))
.unwrap())
}
ExportPayload::Csv(bytes) => Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/csv; charset=utf-8")
.header(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}.csv\"", filename),
)
.body(Body::from(bytes))
.unwrap()),
ExportPayload::Xlsx(bytes) => Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
.header(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}.xlsx\"", filename),
)
.body(Body::from(bytes))
.unwrap()),
}
}
@@ -927,12 +968,8 @@ where
{
require_permission(&ctx, "plugin.admin")?;
let report = PluginDataService::reconcile_references(
plugin_id,
ctx.tenant_id,
&state.db,
)
.await?;
let report =
PluginDataService::reconcile_references(plugin_id, ctx.tenant_id, &state.db).await?;
Ok(Json(ApiResponse::ok(report)))
}
@@ -982,16 +1019,19 @@ where
let mid = manifest_id.clone();
let ent = entity.clone();
let items = rows.into_iter().map(|r| UserViewResp {
id: r.id.to_string(),
plugin_id: mid.clone(),
entity_name: ent.clone(),
view_name: r.view_name,
view_config: r.view_config,
is_default: r.is_default,
created_at: r.created_at,
updated_at: r.updated_at,
}).collect();
let items = rows
.into_iter()
.map(|r| UserViewResp {
id: r.id.to_string(),
plugin_id: mid.clone(),
entity_name: ent.clone(),
view_name: r.view_name,
view_config: r.view_config,
is_default: r.is_default,
created_at: r.created_at,
updated_at: r.updated_at,
})
.collect();
Ok(Json(ApiResponse::ok(items)))
}
@@ -1067,11 +1107,15 @@ where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
state.db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"DELETE FROM plugin_user_views WHERE id = $1 AND tenant_id = $2 AND user_id = $3",
[view_id.into(), ctx.tenant_id.into(), ctx.user_id.into()],
)).await.map_err(|e| AppError::Internal(e.to_string()))?;
state
.db
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"DELETE FROM plugin_user_views WHERE id = $1 AND tenant_id = $2 AND user_id = $3",
[view_id.into(), ctx.tenant_id.into(), ctx.user_id.into()],
))
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(Json(ApiResponse::ok(())))
}

View File

@@ -1,8 +1,11 @@
use axum::Extension;
use axum::extract::{FromRef, Path, Query, State};
use axum::response::Json;
use axum::Extension;
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set, prelude::Decimal};
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set,
prelude::Decimal,
};
use uuid::Uuid;
use erp_core::error::AppError;
@@ -64,8 +67,8 @@ where
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20).min(100);
let mut query = market_entry::Entity::find()
.filter(market_entry::Column::Status.eq("published"));
let mut query =
market_entry::Entity::find().filter(market_entry::Column::Status.eq("published"));
if let Some(ref category) = params.category {
query = query.filter(market_entry::Column::Category.eq(category.as_str()));
@@ -82,7 +85,11 @@ where
query = query.order_by_desc(market_entry::Column::DownloadCount);
let total = query.clone().count(db).await.map_err(|e| AppError::Internal(e.to_string()))?;
let total = query
.clone()
.count(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
let total_pages = ((total as f64) / (page_size as f64)).ceil() as u64;
let models = query
@@ -194,7 +201,9 @@ where
.map_err(|e| AppError::Internal(e.to_string()))?;
if existing.is_some() {
return Err(AppError::Validation("该插件已安装,如需更新请使用升级功能".to_string()));
return Err(AppError::Validation(
"该插件已安装,如需更新请使用升级功能".to_string(),
));
}
// upload → install → enable 一条龙
@@ -207,30 +216,26 @@ where
wasm_binary,
&manifest_toml,
db,
).await?;
)
.await?;
let plugin_id = plugin_resp.id;
let _plugin_resp = crate::service::PluginService::install(
plugin_id,
ctx.tenant_id,
ctx.user_id,
db,
engine,
).await?;
let _plugin_resp =
crate::service::PluginService::install(plugin_id, ctx.tenant_id, ctx.user_id, db, engine)
.await?;
let plugin_resp = crate::service::PluginService::enable(
plugin_id,
ctx.tenant_id,
ctx.user_id,
db,
engine,
).await?;
let plugin_resp =
crate::service::PluginService::enable(plugin_id, ctx.tenant_id, ctx.user_id, db, engine)
.await?;
// 递增下载计数
let mut active: market_entry::ActiveModel = market_model.into();
let current = active.download_count.take().unwrap_or(0);
active.download_count = Set(current + 1);
let _ = active.update(db).await.map_err(|e| AppError::Internal(e.to_string()))?;
let _ = active
.update(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(Json(ApiResponse::ok(plugin_resp)))
}
@@ -263,14 +268,17 @@ where
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
let items = reviews.iter().map(|r| MarketReviewResp {
id: r.id.to_string(),
user_id: r.user_id.to_string(),
market_entry_id: r.market_entry_id.to_string(),
rating: r.rating,
review_text: r.review_text.clone(),
created_at: Some(r.created_at),
}).collect();
let items = reviews
.iter()
.map(|r| MarketReviewResp {
id: r.id.to_string(),
user_id: r.user_id.to_string(),
market_entry_id: r.market_entry_id.to_string(),
rating: r.rating,
review_text: r.review_text.clone(),
created_at: Some(r.created_at),
})
.collect();
Ok(Json(ApiResponse::ok(items)))
}
@@ -322,7 +330,10 @@ where
let mut active: market_review::ActiveModel = existing.into();
active.rating = Set(body.rating);
active.review_text = Set(body.review_text);
active.update(db).await.map_err(|e| AppError::Internal(e.to_string()))?
active
.update(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?
} else {
let review_id = Uuid::now_v7();
let now = Utc::now();
@@ -335,7 +346,10 @@ where
review_text: Set(body.review_text),
created_at: Set(now),
};
model.insert(db).await.map_err(|e| AppError::Internal(e.to_string()))?
model
.insert(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?
};
// 重新计算平均评分
@@ -356,7 +370,10 @@ where
let avg_decimal = Decimal::from_f64_retain(avg).unwrap_or_default();
entry_active.rating_avg = Set(avg_decimal);
entry_active.rating_count = Set(count);
let _ = entry_active.update(db).await.map_err(|e| AppError::Internal(e.to_string()))?;
let _ = entry_active
.update(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(Json(ApiResponse::ok(MarketReviewResp {
id: review_model.id.to_string(),

View File

@@ -7,9 +7,7 @@ use erp_core::error::AppError;
use erp_core::rbac::require_permission;
use erp_core::types::{ApiResponse, PaginatedResponse, Pagination, TenantContext};
use crate::dto::{
PluginHealthResp, PluginListParams, PluginResp, UpdatePluginConfigReq,
};
use crate::dto::{PluginHealthResp, PluginListParams, PluginResp, UpdatePluginConfigReq};
use crate::service::PluginService;
use crate::state::PluginState;
@@ -39,20 +37,27 @@ where
let mut wasm_binary: Option<Vec<u8>> = None;
let mut manifest_toml: Option<String> = None;
while let Some(field) = multipart.next_field().await.map_err(|e| {
AppError::Validation(format!("Multipart 解析失败: {}", e))
})? {
while let Some(field) = multipart
.next_field()
.await
.map_err(|e| AppError::Validation(format!("Multipart 解析失败: {}", e)))?
{
let name = field.name().unwrap_or("");
match name {
"wasm" => {
wasm_binary = Some(field.bytes().await.map_err(|e| {
AppError::Validation(format!("读取 WASM 文件失败: {}", e))
})?.to_vec());
wasm_binary = Some(
field
.bytes()
.await
.map_err(|e| AppError::Validation(format!("读取 WASM 文件失败: {}", e)))?
.to_vec(),
);
}
"manifest" => {
let bytes = field.bytes().await.map_err(|e| {
AppError::Validation(format!("读取 Manifest 失败: {}", e))
})?;
let bytes = field
.bytes()
.await
.map_err(|e| AppError::Validation(format!("读取 Manifest 失败: {}", e)))?;
let text = String::from_utf8(bytes.to_vec()).map_err(|e| {
AppError::Validation(format!("Manifest 不是有效的 UTF-8: {}", e))
})?;
@@ -62,21 +67,12 @@ where
}
}
let wasm = wasm_binary.ok_or_else(|| {
AppError::Validation("缺少 wasm 文件".to_string())
})?;
let manifest = manifest_toml.ok_or_else(|| {
AppError::Validation("缺少 manifest 文件".to_string())
})?;
let wasm = wasm_binary.ok_or_else(|| AppError::Validation("缺少 wasm 文件".to_string()))?;
let manifest =
manifest_toml.ok_or_else(|| AppError::Validation("缺少 manifest 文件".to_string()))?;
let result = PluginService::upload(
ctx.tenant_id,
ctx.user_id,
wasm,
&manifest,
&state.db,
)
.await?;
let result =
PluginService::upload(ctx.tenant_id, ctx.user_id, wasm, &manifest, &state.db).await?;
Ok(Json(ApiResponse::ok(result)))
}
@@ -195,18 +191,12 @@ where
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "plugin.admin")?;
let result = PluginService::install(
id,
ctx.tenant_id,
ctx.user_id,
&state.db,
&state.engine,
)
.await
.map_err(|e| {
tracing::error!(error = %e, "Install failed");
e
})?;
let result = PluginService::install(id, ctx.tenant_id, ctx.user_id, &state.db, &state.engine)
.await
.map_err(|e| {
tracing::error!(error = %e, "Install failed");
e
})?;
Ok(Json(ApiResponse::ok(result)))
}
@@ -230,14 +220,8 @@ where
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "plugin.admin")?;
let result = PluginService::enable(
id,
ctx.tenant_id,
ctx.user_id,
&state.db,
&state.engine,
)
.await?;
let result =
PluginService::enable(id, ctx.tenant_id, ctx.user_id, &state.db, &state.engine).await?;
Ok(Json(ApiResponse::ok(result)))
}
@@ -261,14 +245,8 @@ where
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "plugin.admin")?;
let result = PluginService::disable(
id,
ctx.tenant_id,
ctx.user_id,
&state.db,
&state.engine,
)
.await?;
let result =
PluginService::disable(id, ctx.tenant_id, ctx.user_id, &state.db, &state.engine).await?;
Ok(Json(ApiResponse::ok(result)))
}
@@ -292,14 +270,8 @@ where
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "plugin.admin")?;
let result = PluginService::uninstall(
id,
ctx.tenant_id,
ctx.user_id,
&state.db,
&state.engine,
)
.await?;
let result =
PluginService::uninstall(id, ctx.tenant_id, ctx.user_id, &state.db, &state.engine).await?;
Ok(Json(ApiResponse::ok(result)))
}
@@ -373,8 +345,12 @@ where
require_permission(&ctx, "plugin.list")?;
// 通过 plugin_id 找到 manifest_id再查询 metrics
let manifest_id = crate::data_service::resolve_manifest_id(id, ctx.tenant_id, &state.db).await?;
let metrics = state.engine.get_metrics(&manifest_id).await
let manifest_id =
crate::data_service::resolve_manifest_id(id, ctx.tenant_id, &state.db).await?;
let metrics = state
.engine
.get_metrics(&manifest_id)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
let avg_ms = if metrics.total_invocations > 0 {
@@ -457,20 +433,27 @@ where
let mut wasm_binary: Option<Vec<u8>> = None;
let mut manifest_toml: Option<String> = None;
while let Some(field) = multipart.next_field().await.map_err(|e| {
AppError::Validation(format!("Multipart 解析失败: {}", e))
})? {
while let Some(field) = multipart
.next_field()
.await
.map_err(|e| AppError::Validation(format!("Multipart 解析失败: {}", e)))?
{
let name = field.name().unwrap_or("");
match name {
"wasm" => {
wasm_binary = Some(field.bytes().await.map_err(|e| {
AppError::Validation(format!("读取 WASM 文件失败: {}", e))
})?.to_vec());
wasm_binary = Some(
field
.bytes()
.await
.map_err(|e| AppError::Validation(format!("读取 WASM 文件失败: {}", e)))?
.to_vec(),
);
}
"manifest" => {
let bytes = field.bytes().await.map_err(|e| {
AppError::Validation(format!("读取 Manifest 失败: {}", e))
})?;
let bytes = field
.bytes()
.await
.map_err(|e| AppError::Validation(format!("读取 Manifest 失败: {}", e)))?;
manifest_toml = Some(String::from_utf8(bytes.to_vec()).map_err(|e| {
AppError::Validation(format!("Manifest 不是有效的 UTF-8: {}", e))
})?);
@@ -479,12 +462,9 @@ where
}
}
let wasm = wasm_binary.ok_or_else(|| {
AppError::Validation("缺少 wasm 文件".to_string())
})?;
let manifest = manifest_toml.ok_or_else(|| {
AppError::Validation("缺少 manifest 文件".to_string())
})?;
let wasm = wasm_binary.ok_or_else(|| AppError::Validation("缺少 wasm 文件".to_string()))?;
let manifest =
manifest_toml.ok_or_else(|| AppError::Validation("缺少 manifest 文件".to_string()))?;
let result = PluginService::upgrade(
id,
@@ -525,6 +505,7 @@ where
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| AppError::Validation(format!("manifest 解析失败: {}", e)))?;
let report = crate::plugin_validator::validate_plugin_security(&manifest, model.wasm_binary.len())?;
let report =
crate::plugin_validator::validate_plugin_security(&manifest, model.wasm_binary.len())?;
Ok(Json(ApiResponse::ok(report)))
}

View File

@@ -4,9 +4,9 @@ use sea_orm::DatabaseConnection;
use uuid::Uuid;
use wasmtime::StoreLimits;
use crate::erp::plugin::host_api;
use crate::dynamic_table::DynamicTableManager;
use crate::engine::PluginEngine;
use crate::erp::plugin::host_api;
/// 待刷新的写操作
#[derive(Debug)]
@@ -144,15 +144,15 @@ impl host_api::Host for HostState {
) -> Result<Vec<u8>, String> {
// 预填充模式(向后兼容)
if self.db.is_none() {
return self.query_results
return self
.query_results
.get(&entity)
.cloned()
.ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity));
}
let db = self.db.clone().ok_or("数据库连接不可用")?;
let event_bus = self.event_bus.clone()
.ok_or("事件总线不可用")?;
let event_bus = self.event_bus.clone().ok_or("事件总线不可用")?;
// 先 flush pending writes确保读后写一致性
let ops = std::mem::take(&mut self.pending_ops);
@@ -217,30 +217,28 @@ impl host_api::Host for HostState {
// 执行查询
let rt = tokio::runtime::Handle::current();
let rows = rt.block_on(async {
use sea_orm::{FromQueryResult, Statement};
#[derive(Debug, FromQueryResult)]
struct QueryRow {
data: serde_json::Value,
}
let rows = rt
.block_on(async {
use sea_orm::{FromQueryResult, Statement};
#[derive(Debug, FromQueryResult)]
struct QueryRow {
data: serde_json::Value,
}
let results = QueryRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(&db)
.await
.map_err(|e| format!("查询执行失败: {}", e))?;
let results = QueryRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(&db)
.await
.map_err(|e| format!("查询执行失败: {}", e))?;
let items: Vec<serde_json::Value> = results
.into_iter()
.map(|r| r.data)
.collect();
let items: Vec<serde_json::Value> = results.into_iter().map(|r| r.data).collect();
Ok::<Vec<serde_json::Value>, String>(items)
})
.map_err(|e: String| e)?;
Ok::<Vec<serde_json::Value>, String>(items)
})
.map_err(|e: String| e)?;
serde_json::to_vec(&rows).map_err(|e| e.to_string())
}
@@ -306,13 +304,13 @@ impl host_api::Host for HostState {
}
fn numbering_generate(&mut self, rule_key: String) -> Result<String, String> {
let rule = self.numbering_rules
let rule = self
.numbering_rules
.get(&rule_key)
.ok_or_else(|| format!("编号规则 '{}' 未声明", rule_key))?
.clone();
let db = self.db.clone()
.ok_or("编号生成需要数据库连接")?;
let db = self.db.clone().ok_or("编号生成需要数据库连接")?;
let _tenant_id = self.tenant_id;
let plugin_id = self.plugin_id.clone();
@@ -320,7 +318,7 @@ impl host_api::Host for HostState {
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
use sea_orm::{Statement, FromQueryResult, ConnectionTrait};
use sea_orm::{ConnectionTrait, FromQueryResult, Statement};
let now = chrono::Utc::now();
let year = now.format("%Y").to_string();
@@ -354,7 +352,9 @@ impl host_api::Host for HostState {
db.execute(Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
create_sql,
)).await.map_err(|e| format!("创建序列表失败: {}", e))?;
))
.await
.map_err(|e| format!("创建序列表失败: {}", e))?;
// 使用 advisory lock 保证并发安全
// lock_id 基于规则名哈希
@@ -369,11 +369,15 @@ impl host_api::Host for HostState {
db.execute(Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
lock_sql,
)).await.map_err(|e| format!("获取锁失败: {}", e))?;
))
.await
.map_err(|e| format!("获取锁失败: {}", e))?;
// 读取当前值
#[derive(Debug, FromQueryResult)]
struct SeqRow { current_val: i64 }
struct SeqRow {
current_val: i64,
}
let read_sql = format!(
"SELECT current_val FROM {} WHERE rule_key = $1 AND period_key = $2",
@@ -383,7 +387,10 @@ impl host_api::Host for HostState {
sea_orm::DatabaseBackend::Postgres,
read_sql,
[rule_key.clone().into(), period_key.clone().into()],
)).one(&db).await.map_err(|e| format!("读取序列失败: {}", e))?;
))
.one(&db)
.await
.map_err(|e| format!("读取序列失败: {}", e))?;
let next_val = current.map(|r| r.current_val + 1).unwrap_or(1);
@@ -396,12 +403,19 @@ impl host_api::Host for HostState {
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
upsert_sql,
[rule_key.clone().into(), period_key.clone().into(), next_val.into()],
)).await.map_err(|e| format!("更新序列失败: {}", e))?;
[
rule_key.clone().into(),
period_key.clone().into(),
next_val.into(),
],
))
.await
.map_err(|e| format!("更新序列失败: {}", e))?;
let seq_str = format!("{:0>width$}", next_val, width = rule.seq_length as usize);
let number = rule.format
let number = rule
.format
.replace("{PREFIX}", &rule.prefix)
.replace("{YEAR}", &year)
.replace("{MONTH}", &month)
@@ -414,11 +428,11 @@ impl host_api::Host for HostState {
}
fn setting_get(&mut self, key: String) -> Result<Vec<u8>, String> {
let config = self.plugin_config.as_object()
let config = self
.plugin_config
.as_object()
.ok_or("插件配置不是有效对象")?;
let value = config.get(&key)
.cloned()
.unwrap_or(serde_json::Value::Null);
let value = config.get(&key).cloned().unwrap_or(serde_json::Value::Null);
serde_json::to_vec(&value).map_err(|e| e.to_string())
}
}

View File

@@ -11,8 +11,8 @@ wasmtime::component::bindgen!({
pub mod data_dto;
pub mod data_service;
pub mod dynamic_table;
pub mod dto;
pub mod dynamic_table;
pub mod engine;
pub mod entity;
pub mod error;

View File

@@ -58,20 +58,20 @@ pub struct PluginEntity {
#[serde(default)]
pub relations: Vec<PluginRelation>,
#[serde(default)]
pub data_scope: Option<bool>, // 是否启用行级数据权限
pub data_scope: Option<bool>, // 是否启用行级数据权限
#[serde(default)]
pub is_public: Option<bool>, // 是否可被其他插件引用
pub is_public: Option<bool>, // 是否可被其他插件引用
#[serde(default)]
pub importable: Option<bool>, // 是否支持数据导入
pub importable: Option<bool>, // 是否支持数据导入
#[serde(default)]
pub exportable: Option<bool>, // 是否支持数据导出
pub exportable: Option<bool>, // 是否支持数据导出
}
/// 字段校验规则
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldValidation {
pub pattern: Option<String>, // 正则表达式
pub message: Option<String>, // 校验失败提示
pub pattern: Option<String>, // 正则表达式
pub message: Option<String>, // 校验失败提示
}
/// 插件字段定义
@@ -95,18 +95,18 @@ pub struct PluginField {
pub sortable: Option<bool>,
#[serde(default)]
pub visible_when: Option<String>,
pub ref_entity: Option<String>, // 外键引用的实体名
pub ref_label_field: Option<String>, // entity_select 下拉显示的字段名
pub ref_entity: Option<String>, // 外键引用的实体名
pub ref_label_field: Option<String>, // entity_select 下拉显示的字段名
pub ref_search_fields: Option<Vec<String>>, // entity_select 搜索匹配的字段列表
pub cascade_from: Option<String>, // 级联过滤的来源字段(当前实体)
pub cascade_filter: Option<String>, // 级联过滤的目标字段(引用实体的字段)
pub validation: Option<FieldValidation>, // 字段校验规则
pub cascade_from: Option<String>, // 级联过滤的来源字段(当前实体)
pub cascade_filter: Option<String>, // 级联过滤的目标字段(引用实体的字段)
pub validation: Option<FieldValidation>, // 字段校验规则
#[serde(default)]
pub no_cycle: Option<bool>, // 禁止循环引用
pub no_cycle: Option<bool>, // 禁止循环引用
#[serde(default)]
pub scope_role: Option<String>, // 标记为数据权限的"所有者"字段
pub ref_plugin: Option<String>, // 跨插件引用的目标插件 manifest ID如 "erp-crm"
pub ref_fallback_label: Option<String>, // 目标插件未安装时的降级显示文本
pub scope_role: Option<String>, // 标记为数据权限的"所有者"字段
pub ref_plugin: Option<String>, // 跨插件引用的目标插件 manifest ID如 "erp-crm"
pub ref_fallback_label: Option<String>, // 目标插件未安装时的降级显示文本
}
/// 字段类型
@@ -198,9 +198,9 @@ pub struct PluginIndex {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum OnDeleteStrategy {
Nullify, // 置空外键字段
Cascade, // 级联软删除
Restrict, // 存在关联时拒绝删除
Nullify, // 置空外键字段
Cascade, // 级联软删除
Restrict, // 存在关联时拒绝删除
}
/// 实体关联关系声明
@@ -210,11 +210,11 @@ pub struct PluginRelation {
pub foreign_key: String,
pub on_delete: OnDeleteStrategy,
#[serde(default)]
pub name: Option<String>, // 关联名称UI 显示用)
pub name: Option<String>, // 关联名称UI 显示用)
#[serde(default, alias = "type")]
pub relation_type: Option<String>, // "one_to_many" | "many_to_one" | "many_to_many"
pub relation_type: Option<String>, // "one_to_many" | "many_to_one" | "many_to_many"
#[serde(default)]
pub display_field: Option<String>, // 关联记录的显示字段
pub display_field: Option<String>, // 关联记录的显示字段
}
/// 事件订阅配置
@@ -311,10 +311,7 @@ pub enum PluginPageType {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum PluginWidget {
#[serde(rename = "stat_cards")]
StatCards {
label: String,
cards: Vec<StatCard>,
},
StatCards { label: String, cards: Vec<StatCard> },
#[serde(rename = "action_list")]
ActionList {
label: String,
@@ -385,10 +382,7 @@ pub struct ActionQuery {
#[serde(tag = "type")]
pub enum PluginSection {
#[serde(rename = "fields")]
Fields {
label: String,
fields: Vec<String>,
},
Fields { label: String, fields: Vec<String> },
#[serde(rename = "crud")]
Crud {
label: String,
@@ -408,7 +402,7 @@ pub struct PluginPermission {
#[serde(default)]
pub description: String,
#[serde(default)]
pub data_scope_levels: Option<Vec<String>>, // 支持的数据范围等级
pub data_scope_levels: Option<Vec<String>>, // 支持的数据范围等级
}
// ============================================================
@@ -545,7 +539,9 @@ pub fn parse_manifest(toml_str: &str) -> PluginResult<PluginManifest> {
// 验证必填字段
if manifest.metadata.id.is_empty() {
return Err(PluginError::InvalidManifest("metadata.id 不能为空".to_string()));
return Err(PluginError::InvalidManifest(
"metadata.id 不能为空".to_string(),
));
}
if manifest.metadata.name.is_empty() {
return Err(PluginError::InvalidManifest(
@@ -642,7 +638,9 @@ fn validate_pages(pages: &[PluginPageType]) -> PluginResult<()> {
));
}
}
PluginPageType::Detail { entity, sections, .. } => {
PluginPageType::Detail {
entity, sections, ..
} => {
if entity.is_empty() {
return Err(PluginError::InvalidManifest(
"detail page 的 entity 不能为空".into(),
@@ -937,7 +935,10 @@ label = "空标签页"
fn field_type_to_sql_mapping() {
assert_eq!(PluginFieldType::String.generated_sql_type(), "TEXT");
assert_eq!(PluginFieldType::Integer.generated_sql_type(), "INTEGER");
assert_eq!(PluginFieldType::Float.generated_sql_type(), "DOUBLE PRECISION");
assert_eq!(
PluginFieldType::Float.generated_sql_type(),
"DOUBLE PRECISION"
);
assert_eq!(PluginFieldType::Decimal.generated_sql_type(), "NUMERIC");
assert_eq!(PluginFieldType::Boolean.generated_sql_type(), "BOOLEAN");
assert_eq!(PluginFieldType::Date.generated_sql_type(), "DATE");
@@ -948,9 +949,18 @@ label = "空标签页"
#[test]
fn field_type_generated_expression() {
assert_eq!(PluginFieldType::String.generated_expr("name"), "data->>'name'");
assert_eq!(PluginFieldType::Integer.generated_expr("age"), "(data->>'age')::INTEGER");
assert_eq!(PluginFieldType::Uuid.generated_expr("ref_id"), "(data->>'ref_id')::UUID");
assert_eq!(
PluginFieldType::String.generated_expr("name"),
"data->>'name'"
);
assert_eq!(
PluginFieldType::Integer.generated_expr("age"),
"(data->>'age')::INTEGER"
);
assert_eq!(
PluginFieldType::Uuid.generated_expr("ref_id"),
"(data->>'ref_id')::UUID"
);
}
#[test]
@@ -1064,7 +1074,10 @@ on_delete = "cascade"
assert_eq!(entity.relations.len(), 2);
assert_eq!(entity.relations[0].entity, "contact");
assert_eq!(entity.relations[0].foreign_key, "customer_id");
assert!(matches!(entity.relations[0].on_delete, OnDeleteStrategy::Cascade));
assert!(matches!(
entity.relations[0].on_delete,
OnDeleteStrategy::Cascade
));
}
#[test]
@@ -1139,9 +1152,7 @@ ref_search_fields = ["name", "code"]
assert_eq!(field.ref_label_field.as_deref(), Some("name"));
assert_eq!(
field.ref_search_fields.as_deref(),
Some(
&["name".to_string(), "code".to_string()][..]
)
Some(&["name".to_string(), "code".to_string()][..])
);
}
@@ -1406,7 +1417,10 @@ description = "发票创建后是否自动发送通知"
assert_eq!(settings.fields[0].group.as_deref(), Some("财务"));
assert_eq!(settings.fields[1].name, "invoice_prefix");
assert_eq!(settings.fields[2].name, "auto_notify");
assert!(matches!(settings.fields[2].field_type, PluginSettingType::Boolean));
assert!(matches!(
settings.fields[2].field_type,
PluginSettingType::Boolean
));
}
#[test]
@@ -1436,7 +1450,10 @@ seq_length = 4
assert_eq!(numbering[0].entity, "invoice");
assert_eq!(numbering[0].field, "invoice_no");
assert_eq!(numbering[0].prefix, "INV");
assert!(matches!(numbering[0].reset_rule, PluginNumberingReset::Yearly));
assert!(matches!(
numbering[0].reset_rule,
PluginNumberingReset::Yearly
));
}
#[test]
@@ -1716,7 +1733,9 @@ tags = ["status"]
assert_eq!(ui.pages.len(), 1);
match &ui.pages[0] {
PluginPageType::Dashboard {
label, icon, widgets,
label,
icon,
widgets,
} => {
assert_eq!(label, "工作台");
assert_eq!(icon.as_deref(), Some("DashboardOutlined"));
@@ -1738,7 +1757,9 @@ tags = ["status"]
// action_list
match &widgets[1] {
PluginWidget::ActionList {
label, max_items, queries,
label,
max_items,
queries,
} => {
assert_eq!(label, "紧急待办");
assert_eq!(*max_items, Some(5));
@@ -1752,7 +1773,11 @@ tags = ["status"]
// funnel
match &widgets[2] {
PluginWidget::Funnel {
label, entity, lane_field, value_field, lane_order,
label,
entity,
lane_field,
value_field,
lane_order,
} => {
assert_eq!(label, "商机漏斗");
assert_eq!(entity, "invoice");
@@ -1766,7 +1791,10 @@ tags = ["status"]
// card_list
match &widgets[3] {
PluginWidget::CardList {
label, entity, title_field, ..
label,
entity,
title_field,
..
} => {
assert_eq!(label, "活跃项目");
assert_eq!(entity, "invoice");

View File

@@ -17,8 +17,18 @@ impl ErpModule for PluginModule {
fn permissions(&self) -> Vec<PermissionDescriptor> {
vec![
PermissionDescriptor { code: "plugin.admin".into(), name: "插件管理".into(), description: "管理插件全生命周期".into(), module: "plugin".into() },
PermissionDescriptor { code: "plugin.list".into(), name: "查看插件".into(), description: "查看插件列表".into(), module: "plugin".into() },
PermissionDescriptor {
code: "plugin.admin".into(),
name: "插件管理".into(),
description: "管理插件全生命周期".into(),
module: "plugin".into(),
},
PermissionDescriptor {
code: "plugin.list".into(),
name: "查看插件".into(),
description: "查看插件列表".into(),
module: "plugin".into(),
},
]
}
@@ -35,8 +45,14 @@ impl PluginModule {
S: Clone + Send + Sync + 'static,
{
let admin_routes = Router::new()
.route("/admin/plugins/upload", post(crate::handler::plugin_handler::upload_plugin::<S>))
.route("/admin/plugins", get(crate::handler::plugin_handler::list_plugins::<S>))
.route(
"/admin/plugins/upload",
post(crate::handler::plugin_handler::upload_plugin::<S>),
)
.route(
"/admin/plugins",
get(crate::handler::plugin_handler::list_plugins::<S>),
)
.route(
"/admin/plugins/{id}",
get(crate::handler::plugin_handler::get_plugin::<S>)
@@ -151,11 +167,10 @@ impl PluginModule {
);
// 实体注册表路由
let registry_routes = Router::new()
.route(
"/plugin-registry/entities",
get(crate::handler::data_handler::list_public_entities::<S>),
);
let registry_routes = Router::new().route(
"/plugin-registry/entities",
get(crate::handler::data_handler::list_public_entities::<S>),
);
// 市场路由
let market_routes = Router::new()
@@ -177,6 +192,9 @@ impl PluginModule {
.post(crate::handler::market_handler::submit_market_review::<S>),
);
admin_routes.merge(data_routes).merge(registry_routes).merge(market_routes)
admin_routes
.merge(data_routes)
.merge(registry_routes)
.merge(market_routes)
}
}

View File

@@ -1,9 +1,9 @@
use sea_orm::{ConnectionTrait, Statement, FromQueryResult};
use uuid::Uuid;
use chrono::Utc;
use sea_orm::{ConnectionTrait, FromQueryResult, Statement};
use uuid::Uuid;
use erp_core::events::{DomainEvent, EventBus};
use erp_core::error::AppResult;
use erp_core::events::{DomainEvent, EventBus};
/// 启动插件通知监听器 — 订阅 plugin.trigger.* 事件
pub fn start_notification_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
@@ -23,17 +23,28 @@ pub fn start_notification_listener(db: sea_orm::DatabaseConnection, event_bus: E
});
}
async fn handle_trigger_event(event: &DomainEvent, db: &sea_orm::DatabaseConnection) -> AppResult<()> {
let plugin_id = event.payload.get("plugin_id")
async fn handle_trigger_event(
event: &DomainEvent,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
let plugin_id = event
.payload
.get("plugin_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let trigger_name = event.payload.get("trigger_name")
let trigger_name = event
.payload
.get("trigger_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let entity = event.payload.get("entity")
let entity = event
.payload
.get("entity")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let action = event.payload.get("action")
let action = event
.payload
.get("action")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
@@ -45,7 +56,9 @@ async fn handle_trigger_event(event: &DomainEvent, db: &sea_orm::DatabaseConnect
// 查询所有管理员用户
#[derive(FromQueryResult)]
struct AdminUser { id: Uuid }
struct AdminUser {
id: Uuid,
}
let admins = AdminUser::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,

View File

@@ -48,7 +48,10 @@ impl RuntimeMetrics {
}
/// 上传时安全扫描
pub fn validate_plugin_security(manifest: &PluginManifest, wasm_size: usize) -> PluginResult<ValidationReport> {
pub fn validate_plugin_security(
manifest: &PluginManifest,
wasm_size: usize,
) -> PluginResult<ValidationReport> {
let mut errors = Vec::new();
let mut warnings = Vec::new();
@@ -70,7 +73,8 @@ pub fn validate_plugin_security(manifest: &PluginManifest, wasm_size: usize) ->
if entity.fields.len() > 50 {
errors.push(format!(
"实体 '{}' 字段数量过多: {} (上限 50)",
entity.name, entity.fields.len()
entity.name,
entity.fields.len()
));
}
@@ -78,7 +82,8 @@ pub fn validate_plugin_security(manifest: &PluginManifest, wasm_size: usize) ->
if entity.indexes.len() > 10 {
warnings.push(format!(
"实体 '{}' 索引数量较多: {} (>10 可能影响写入性能)",
entity.name, entity.indexes.len()
entity.name,
entity.indexes.len()
));
}
@@ -90,7 +95,11 @@ pub fn validate_plugin_security(manifest: &PluginManifest, wasm_size: usize) ->
entity.name, field.name
));
}
if !field.name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
if !field
.name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_')
{
errors.push(format!(
"字段名包含非法字符: '{}.{}' (只允许字母、数字、下划线)",
entity.name, field.name
@@ -167,8 +176,11 @@ fn collect_metrics(manifest: &PluginManifest, wasm_size: usize) -> PluginMetrics
}
metrics.has_settings = manifest.settings.is_some();
metrics.has_numbering = manifest.numbering.as_ref().map_or(false, |n| !n.is_empty());
metrics.has_trigger_events = manifest.trigger_events.as_ref().map_or(false, |t| !t.is_empty());
metrics.has_numbering = manifest.numbering.as_ref().is_some_and(|n| !n.is_empty());
metrics.has_trigger_events = manifest
.trigger_events
.as_ref()
.is_some_and(|t| !t.is_empty());
metrics
}

View File

@@ -1,21 +1,21 @@
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, Set};
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use uuid::Uuid;
use sha2::{Sha256, Digest};
use erp_core::sea_orm_ext::bump_version;
use erp_core::error::AppResult;
use crate::dto::{
PluginEntityResp, PluginHealthResp, PluginPermissionResp, PluginResp,
};
use crate::dto::{PluginEntityResp, PluginHealthResp, PluginPermissionResp, PluginResp};
use crate::dynamic_table::DynamicTableManager;
use crate::engine::PluginEngine;
use crate::entity::{plugin, plugin_entity, plugin_event_subscription};
use crate::error::PluginError;
use crate::manifest::{parse_manifest, PluginManifest};
use crate::manifest::{PluginManifest, parse_manifest};
pub struct PluginService;
@@ -32,11 +32,14 @@ impl PluginService {
let manifest = parse_manifest(manifest_toml)?;
// 安全扫描
let validation = crate::plugin_validator::validate_plugin_security(&manifest, wasm_binary.len())?;
let validation =
crate::plugin_validator::validate_plugin_security(&manifest, wasm_binary.len())?;
if !validation.valid {
return Err(PluginError::ValidationError(format!(
"插件安全校验失败: {}", validation.errors.join("; ")
)).into());
"插件安全校验失败: {}",
validation.errors.join("; ")
))
.into());
}
// 计算 WASM hash
@@ -48,8 +51,8 @@ impl PluginService {
let plugin_id = Uuid::now_v7();
// 序列化 manifest 为 JSON
let manifest_json =
serde_json::to_value(&manifest).map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest_json = serde_json::to_value(&manifest)
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let model = plugin::ActiveModel {
id: Set(plugin_id),
@@ -98,9 +101,8 @@ impl PluginService {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status(&model.status, "uploaded")?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let now = Utc::now();
@@ -108,7 +110,8 @@ impl PluginService {
let mut entity_resps = Vec::new();
if let Some(schema) = &manifest.schema {
for (i, entity_def) in schema.entities.iter().enumerate() {
let table_name = DynamicTableManager::table_name(&manifest.metadata.id, &entity_def.name);
let table_name =
DynamicTableManager::table_name(&manifest.metadata.id, &entity_def.name);
tracing::info!(step = i, entity = %entity_def.name, table = %table_name, "Creating dynamic table");
// 创建动态表
@@ -185,11 +188,7 @@ impl PluginService {
// 加载到内存
tracing::info!(manifest_id = %manifest.metadata.id, "Loading plugin into engine");
engine
.load(
&manifest.metadata.id,
&model.wasm_binary,
manifest.clone(),
)
.load(&manifest.metadata.id, &model.wasm_binary, manifest.clone())
.await?;
// 更新状态
@@ -214,9 +213,8 @@ impl PluginService {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status_any(&model.status, &["installed", "disabled"])?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let plugin_manifest_id = &manifest.metadata.id;
@@ -270,9 +268,8 @@ impl PluginService {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status_any(&model.status, &["running", "enabled"])?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
// 禁用引擎
engine.disable(&manifest.metadata.id).await?;
@@ -299,9 +296,8 @@ impl PluginService {
let model = find_plugin(plugin_id, tenant_id, db).await?;
validate_status_any(&model.status, &["installed", "disabled"])?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
// 卸载(如果 disabled 状态engine 可能仍在内存中)
engine.unload(&manifest.metadata.id).await.ok();
@@ -376,19 +372,16 @@ impl PluginService {
}
if let Some(q) = search {
query = query.filter(
plugin::Column::Name.contains(q)
plugin::Column::Name
.contains(q)
.or(plugin::Column::Description.contains(q)),
);
}
let paginator = query
.clone()
.paginate(db, page_size);
let paginator = query.clone().paginate(db, page_size);
let total = paginator.num_items().await?;
let models = paginator
.fetch_page(page.saturating_sub(1))
.await?;
let models = paginator.fetch_page(page.saturating_sub(1)).await?;
let mut resps = Vec::with_capacity(models.len());
@@ -397,27 +390,25 @@ impl PluginService {
let entities_map = find_batch_plugin_entities(&plugin_ids, tenant_id, db).await;
for model in models {
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone()).unwrap_or_else(|_| {
PluginManifest {
metadata: crate::manifest::PluginMetadata {
id: String::new(),
name: String::new(),
version: String::new(),
description: String::new(),
author: String::new(),
min_platform_version: None,
dependencies: vec![],
},
schema: None,
events: None,
ui: None,
permissions: None,
settings: None,
numbering: None,
templates: None,
trigger_events: None,
}
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.unwrap_or_else(|_| PluginManifest {
metadata: crate::manifest::PluginMetadata {
id: String::new(),
name: String::new(),
version: String::new(),
description: String::new(),
author: String::new(),
min_platform_version: None,
dependencies: vec![],
},
schema: None,
events: None,
ui: None,
permissions: None,
settings: None,
numbering: None,
templates: None,
trigger_events: None,
});
let entities = entities_map.get(&model.id).cloned().unwrap_or_default();
resps.push(plugin_model_to_resp(&model, &manifest, entities));
@@ -433,9 +424,8 @@ impl PluginService {
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let entities = find_plugin_entities(plugin_id, tenant_id, db).await?;
Ok(plugin_model_to_resp(&model, &manifest, entities))
}
@@ -455,13 +445,15 @@ impl PluginService {
erp_core::error::check_version(expected_version, model.version)?;
// 校验配置值是否符合 manifest settings 声明
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
if let Some(settings) = &manifest.settings {
validate_plugin_settings(config.as_object().ok_or_else(|| {
PluginError::ValidationError("config 必须是 JSON 对象".to_string())
})?, &settings.fields)?;
validate_plugin_settings(
config.as_object().ok_or_else(|| {
PluginError::ValidationError("config 必须是 JSON 对象".to_string())
})?,
&settings.fields,
)?;
}
let now = Utc::now();
@@ -485,7 +477,9 @@ impl PluginService {
bus.publish(event, db).await;
}
let entities = find_plugin_entities(plugin_id, tenant_id, db).await.unwrap_or_default();
let entities = find_plugin_entities(plugin_id, tenant_id, db)
.await
.unwrap_or_default();
Ok(plugin_model_to_resp(&model, &manifest, entities))
}
@@ -497,9 +491,8 @@ impl PluginService {
engine: &PluginEngine,
) -> AppResult<PluginHealthResp> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let details = engine.health_check(&manifest.metadata.id).await?;
@@ -521,9 +514,8 @@ impl PluginService {
db: &sea_orm::DatabaseConnection,
) -> AppResult<serde_json::Value> {
let model = find_plugin(plugin_id, tenant_id, db).await?;
let manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
// 构建 schema 响应entities + ui 页面配置 + settings + numbering + trigger_events
let mut result = serde_json::Map::new();
@@ -599,17 +591,18 @@ impl PluginService {
let new_manifest = parse_manifest(new_manifest_toml)?;
let model = find_plugin(plugin_id, tenant_id, db).await?;
let old_manifest: PluginManifest =
serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let old_manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone())
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let old_version = old_manifest.metadata.version.clone();
let new_version = new_manifest.metadata.version.clone();
if old_manifest.metadata.id != new_manifest.metadata.id {
return Err(PluginError::InvalidManifest(
format!("插件 ID 不匹配: 旧={}, 新={}", old_manifest.metadata.id, new_manifest.metadata.id)
).into());
return Err(PluginError::InvalidManifest(format!(
"插件 ID 不匹配: 旧={}, 新={}",
old_manifest.metadata.id, new_manifest.metadata.id
))
.into());
}
let plugin_manifest_id = &new_manifest.metadata.id;
@@ -619,8 +612,8 @@ impl PluginService {
let old_schema = old_manifest.schema.as_ref();
for entity in &new_schema.entities {
let old_entity = old_schema
.and_then(|s| s.entities.iter().find(|e| e.name == entity.name));
let old_entity =
old_schema.and_then(|s| s.entities.iter().find(|e| e.name == entity.name));
match old_entity {
None => {
@@ -637,8 +630,12 @@ impl PluginService {
"Schema 演进:新增 Generated Column"
);
DynamicTableManager::alter_add_generated_columns(
db, plugin_manifest_id, entity, &diff
).await?;
db,
plugin_manifest_id,
entity,
&diff,
)
.await?;
}
}
}
@@ -700,7 +697,10 @@ impl PluginService {
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?);
active.updated_at = Set(now);
active.updated_by = Set(Some(operator_id));
active.update(db).await.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
active
.update(db)
.await
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
}
}
}
@@ -719,20 +719,16 @@ impl PluginService {
// ---- 内部辅助 ----
fn find_plugin(
async fn find_plugin(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> impl std::future::Future<Output = AppResult<plugin::Model>> + Send {
async move {
plugin::Entity::find_by_id(plugin_id)
.one(db)
.await?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| {
erp_core::error::AppError::NotFound(format!("插件 {} 不存在", plugin_id))
})
}
) -> AppResult<plugin::Model> {
plugin::Entity::find_by_id(plugin_id)
.one(db)
.await?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| erp_core::error::AppError::NotFound(format!("插件 {} 不存在", plugin_id)))
}
/// 公开的插件查询 — 供 handler 使用
@@ -764,11 +760,14 @@ async fn find_batch_plugin_entities(
let mut result: HashMap<Uuid, Vec<PluginEntityResp>> = HashMap::new();
for e in entities {
result.entry(e.plugin_id).or_default().push(PluginEntityResp {
name: e.entity_name.clone(),
display_name: e.entity_name,
table_name: e.table_name,
});
result
.entry(e.plugin_id)
.or_default()
.push(PluginEntityResp {
name: e.entity_name.clone(),
display_name: e.entity_name,
table_name: e.table_name,
});
}
result
}
@@ -849,39 +848,38 @@ fn validate_plugin_settings(
}
// 类型校验
if let Some(val) = value {
if !val.is_null() {
let type_ok = match field.field_type {
PluginSettingType::Text => val.is_string(),
PluginSettingType::Number => val.is_number(),
PluginSettingType::Boolean => val.is_boolean(),
PluginSettingType::Select => val.is_string(),
PluginSettingType::Multiselect => val.is_array(),
PluginSettingType::Color => val.is_string(),
PluginSettingType::Date => val.is_string(),
PluginSettingType::Datetime => val.is_string(),
PluginSettingType::Json => true,
};
if !type_ok {
return Err(PluginError::ValidationError(format!(
"配置项 '{}' 类型错误,期望 {:?}",
field.name, field.field_type
))
.into());
}
if let Some(val) = value
&& !val.is_null()
{
let type_ok = match field.field_type {
PluginSettingType::Text => val.is_string(),
PluginSettingType::Number => val.is_number(),
PluginSettingType::Boolean => val.is_boolean(),
PluginSettingType::Select => val.is_string(),
PluginSettingType::Multiselect => val.is_array(),
PluginSettingType::Color => val.is_string(),
PluginSettingType::Date => val.is_string(),
PluginSettingType::Datetime => val.is_string(),
PluginSettingType::Json => true,
};
if !type_ok {
return Err(PluginError::ValidationError(format!(
"配置项 '{}' 类型错误,期望 {:?}",
field.name, field.field_type
))
.into());
}
// 数值范围校验
if let Some((min, max)) = field.range {
if let Some(n) = val.as_f64() {
if n < min || n > max {
return Err(PluginError::ValidationError(format!(
"配置项 '{}' ({}) 的值 {} 超出范围 [{}, {}]",
field.name, field.display_name, n, min, max
))
.into());
}
}
}
// 数值范围校验
if let Some((min, max)) = field.range
&& let Some(n) = val.as_f64()
&& (n < min || n > max)
{
return Err(PluginError::ValidationError(format!(
"配置项 '{}' ({}) 的值 {} 超出范围 [{}, {}]",
field.name, field.display_name, n, min, max
))
.into());
}
}
}
@@ -959,7 +957,7 @@ async fn register_plugin_permissions(
sea_orm::Value::from(resource),
sea_orm::Value::from(action),
sea_orm::Value::from(description),
sea_orm::Value::from(now.clone()),
sea_orm::Value::from(*now),
sea_orm::Value::from(operator_id),
],
))
@@ -1038,10 +1036,7 @@ pub async fn grant_permissions_to_admin(
error = %e,
"分配插件权限给 admin 角色失败"
);
PluginError::DatabaseError(format!(
"分配插件权限给 admin 角色失败: {}",
e
))
PluginError::DatabaseError(format!("分配插件权限给 admin 角色失败: {}", e))
})?;
let rows = result.rows_affected();
@@ -1082,7 +1077,7 @@ async fn unregister_plugin_permissions(
sea_orm::DatabaseBackend::Postgres,
rp_sql,
vec![
sea_orm::Value::from(now.clone()),
sea_orm::Value::from(now),
sea_orm::Value::from(tenant_id),
sea_orm::Value::from(prefix.clone()),
],