Files
erp/crates/erp-plugin/src/data_service.rs
iven 5ba11f985f
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix(web,plugin): 前端审计修复 — 401 消除 + 统计卡片 crash + 销售漏斗 500 + antd 6 废弃 API
- API client: proactive token refresh(请求前 30s 检查过期,提前刷新避免 401)
- Plugin store: fetchPlugins promise 去重,防止 StrictMode 并发重复请求
- Home stats: 简化 useEffect 加载逻辑,修复 tagColor undefined crash
- PluginGraphPage: valueStyle → styles.content, Spin tip → description(antd 6)
- DashboardWidgets: trailColor → railColor(antd 6)
- data_service: build_scope_sql 参数索引修复(硬编码 $100 → 动态 values.len()+1)
- erp-core error: Internal 错误添加 tracing::error 日志输出
2026-04-18 20:31:49 +08:00

1118 lines
40 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement};
use uuid::Uuid;
use erp_core::audit::{AuditLog};
use erp_core::audit_service;
use erp_core::error::{AppError, AppResult};
use erp_core::events::EventBus;
use crate::data_dto::{BatchActionReq, PluginDataResp};
use crate::dynamic_table::{sanitize_identifier, DynamicTableManager};
use crate::entity::plugin;
use crate::entity::plugin_entity;
use crate::error::PluginError;
use crate::manifest::PluginField;
use crate::state::EntityInfo;
/// 行级数据权限参数 — 传递到 service 层注入 SQL 条件
pub struct DataScopeParams {
pub scope_level: String,
pub user_id: Uuid,
pub dept_member_ids: Vec<Uuid>,
pub owner_field: String,
}
pub struct PluginDataService;
impl PluginDataService {
/// 创建插件数据
pub async fn create(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, true, None).await?;
let (sql, values) =
DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, &data);
#[derive(FromQueryResult)]
struct InsertResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = InsertResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.create", entity_name)
.with_resource_id(result.id),
db,
)
.await;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 列表查询(支持过滤/搜索/排序/Generated Column 路由/数据权限)
pub async fn list(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
page: u64,
page_size: u64,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
sort_by: Option<String>,
sort_order: Option<String>,
cache: &moka::sync::Cache<String, EntityInfo>,
scope: Option<DataScopeParams>,
) -> AppResult<(Vec<PluginDataResp>, u64)> {
let info =
resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
// 获取 searchable 字段列表
let entity_fields = info.fields()?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
// 构建数据权限条件count 查询只有 tenant_id 占 $1scope 从 $2 开始)
let count_scope = build_scope_sql(&scope, &info.generated_fields, 2);
// Count
let (count_sql, mut count_values) =
DynamicTableManager::build_count_sql(&info.table_name, tenant_id);
let count_sql = merge_scope_condition(count_sql, &count_scope);
count_values.extend(count_scope.1);
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let total = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
count_sql,
count_values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
// Query — 使用 Generated Column 路由
let offset = page.saturating_sub(1) * page_size;
let (sql, mut values) = DynamicTableManager::build_filtered_query_sql_ex(
&info.table_name,
tenant_id,
page_size,
offset,
filter,
search_tuple,
sort_by,
sort_order,
&info.generated_fields,
)
.map_err(|e| AppError::Validation(e))?;
// 注入数据权限条件scope 参数索引接在 values 之后)
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
let sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let rows = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let items = rows
.into_iter()
.map(|r| PluginDataResp {
id: r.id.to_string(),
data: r.data,
created_at: Some(r.created_at),
updated_at: Some(r.updated_at),
version: Some(r.version),
})
.collect();
Ok((items, total))
}
/// 按 ID 获取
pub async fn get_by_id(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_get_by_id_sql(&info.table_name, id, tenant_id);
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let row = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::NotFound("记录不存在".to_string()))?;
Ok(PluginDataResp {
id: row.id.to_string(),
data: row.data,
created_at: Some(row.created_at),
updated_at: Some(row.updated_at),
version: Some(row.version),
})
}
/// 更新
pub async fn update(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
expected_version: i32,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id)).await?;
// 循环引用检测
for field in &fields {
if field.no_cycle == Some(true) && data.get(&field.name).is_some() {
check_no_cycle(id, field, &data, &info.table_name, tenant_id, db).await?;
}
}
let (sql, values) = DynamicTableManager::build_update_sql(
&info.table_name,
id,
tenant_id,
operator_id,
&data,
expected_version,
);
#[derive(FromQueryResult)]
struct UpdateResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = UpdateResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::VersionMismatch)?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.update", entity_name)
.with_resource_id(id),
db,
)
.await;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 部分更新PATCH— 只合并提供的字段
pub async fn partial_update(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
partial_data: serde_json::Value,
expected_version: i32,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
// 合并现有数据后校验,确保 partial update 也能触发 required/pattern/ref 校验
let existing = Self::get_by_id(plugin_id, entity_name, id, tenant_id, db).await?;
let merged = {
let mut base = existing.data.as_object().cloned().unwrap_or_default();
if let Some(patch) = partial_data.as_object() {
for (k, v) in patch {
base.insert(k.clone(), v.clone());
}
}
serde_json::Value::Object(base)
};
validate_data(&merged, &fields)?;
validate_ref_entities(&merged, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id)).await?;
let (sql, values) = DynamicTableManager::build_patch_sql(
&info.table_name, id, tenant_id, operator_id, partial_data, expected_version,
);
#[derive(FromQueryResult)]
struct PatchResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = PatchResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres, sql, values,
)).one(db).await?.ok_or_else(|| AppError::VersionMismatch)?;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 删除(软删除)
pub async fn delete(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
// 解析 entity schema 获取 relations
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(info.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
// 处理级联关系
for relation in &entity_def.relations {
let rel_table = DynamicTableManager::table_name(&manifest_id, &relation.entity);
let fk = sanitize_identifier(&relation.foreign_key);
match relation.on_delete {
crate::manifest::OnDeleteStrategy::Restrict => {
let check_sql = format!(
"SELECT 1 as chk FROM \"{}\" WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1",
rel_table, fk
);
#[derive(FromQueryResult)]
struct RefCheck { chk: Option<i32> }
let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[id.to_string().into(), tenant_id.into()],
)).one(db).await?;
if has_ref.is_some() {
return Err(AppError::Validation(format!(
"存在关联的 {} 记录,无法删除",
relation.entity
)));
}
}
crate::manifest::OnDeleteStrategy::Nullify => {
let nullify_sql = format!(
"UPDATE \"{}\" SET data = jsonb_set(data, '{{{}}}', 'null'), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
nullify_sql,
[id.to_string().into(), tenant_id.into()],
)).await?;
}
crate::manifest::OnDeleteStrategy::Cascade => {
let cascade_sql = format!(
"UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
cascade_sql,
[id.to_string().into(), tenant_id.into()],
)).await?;
}
}
}
// 软删除主记录
let (sql, values) = DynamicTableManager::build_delete_sql(&info.table_name, id, tenant_id);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
audit_service::record(
AuditLog::new(tenant_id, None, "plugin.data.delete", entity_name)
.with_resource_id(id),
db,
)
.await;
Ok(())
}
/// 批量操作 — batch_delete / batch_update
pub async fn batch(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
operator_id: Uuid,
req: BatchActionReq,
db: &sea_orm::DatabaseConnection,
) -> AppResult<u64> {
if req.ids.is_empty() {
return Err(AppError::Validation("ids 不能为空".to_string()));
}
if req.ids.len() > 100 {
return Err(AppError::Validation("批量操作上限 100 条".to_string()));
}
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let ids: Vec<Uuid> = req
.ids
.iter()
.map(|s| Uuid::parse_str(s))
.collect::<Result<Vec<_>, _>>()
.map_err(|_| AppError::Validation("ids 中包含无效的 UUID".to_string()))?;
let affected = match req.action.as_str() {
"batch_delete" => {
// 批量删除前先执行级联策略(逐条,复用 delete 的级联逻辑)
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(info.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
for &del_id in &ids {
for relation in &entity_def.relations {
let rel_table = DynamicTableManager::table_name(&manifest_id, &relation.entity);
let fk = sanitize_identifier(&relation.foreign_key);
match relation.on_delete {
crate::manifest::OnDeleteStrategy::Restrict => {
let check_sql = format!(
"SELECT 1 as chk FROM \"{}\" WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1",
rel_table, fk
);
#[derive(FromQueryResult)]
struct RefCheck { chk: Option<i32> }
let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[del_id.to_string().into(), tenant_id.into()],
)).one(db).await?;
if has_ref.is_some() {
return Err(AppError::Validation(format!(
"记录 {} 存在关联的 {} 记录,无法删除",
del_id, relation.entity
)));
}
}
crate::manifest::OnDeleteStrategy::Nullify => {
let nullify_sql = format!(
"UPDATE \"{}\" SET data = jsonb_set(data, '{{{}}}', 'null'), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
nullify_sql,
[del_id.to_string().into(), tenant_id.into()],
)).await?;
}
crate::manifest::OnDeleteStrategy::Cascade => {
let cascade_sql = format!(
"UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
cascade_sql,
[del_id.to_string().into(), tenant_id.into()],
)).await?;
}
}
}
}
let placeholders: Vec<String> = ids
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 2))
.collect();
let sql = format!(
"UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() \
WHERE tenant_id = $1 AND id IN ({}) AND deleted_at IS NULL",
info.table_name,
placeholders.join(", ")
);
let mut values = vec![tenant_id.into()];
for id in &ids {
values.push((*id).into());
}
let result = db
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
result.rows_affected()
}
"batch_update" => {
let update_data = req.data.ok_or_else(|| {
AppError::Validation("batch_update 需要 data 字段".to_string())
})?;
let mut set_expr = "data".to_string();
if let Some(obj) = update_data.as_object() {
for key in obj.keys() {
let clean_key = sanitize_identifier(key);
set_expr = format!(
"jsonb_set({}, '{{{}}}', $2::jsonb->'{}', true)",
set_expr, clean_key, clean_key
);
}
}
let placeholders: Vec<String> = ids
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 3))
.collect();
let sql = format!(
"UPDATE \"{}\" SET data = {}, updated_at = NOW(), updated_by = $1, version = version + 1 \
WHERE tenant_id = $1 AND id IN ({}) AND deleted_at IS NULL",
info.table_name,
set_expr,
placeholders.join(", ")
);
let mut values = vec![operator_id.into()];
values.push(
serde_json::to_string(&update_data)
.unwrap_or_default()
.into(),
);
for id in &ids {
values.push((*id).into());
}
let result = db
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
result.rows_affected()
}
_ => {
return Err(AppError::Validation(format!(
"不支持的批量操作: {}",
req.action
)))
}
};
Ok(affected)
}
/// 统计记录数(支持过滤和搜索)
pub async fn count(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
scope: Option<DataScopeParams>,
) -> AppResult<u64> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let entity_fields = info.fields()?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
let (mut sql, mut values) = DynamicTableManager::build_filtered_count_sql(
&info.table_name,
tenant_id,
filter,
search_tuple,
)
.map_err(|e| AppError::Validation(e))?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
}
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let result = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
Ok(result)
}
/// 聚合查询 — 按字段分组计数
/// 返回 [(分组键, 计数), ...]
pub async fn aggregate(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
filter: Option<serde_json::Value>,
scope: Option<DataScopeParams>,
) -> AppResult<Vec<(String, i64)>> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (mut sql, mut values) = DynamicTableManager::build_aggregate_sql(
&info.table_name,
tenant_id,
group_by_field,
filter,
)
.map_err(|e| AppError::Validation(e))?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
}
#[derive(FromQueryResult)]
struct AggRow {
key: Option<String>,
count: i64,
}
let rows = AggRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let result = rows
.into_iter()
.map(|r| (r.key.unwrap_or_default(), r.count))
.collect();
Ok(result)
}
/// 聚合查询(预留 Redis 缓存接口)
pub async fn aggregate_cached(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
filter: Option<serde_json::Value>,
) -> AppResult<Vec<(String, i64)>> {
// TODO: 未来版本添加 Redis 缓存层
Self::aggregate(plugin_id, entity_name, tenant_id, db, group_by_field, filter, None).await
}
/// 时间序列聚合 — 按时间字段截断为 day/week/month 统计计数
pub async fn timeseries(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
time_field: &str,
time_grain: &str,
start: Option<String>,
end: Option<String>,
scope: Option<DataScopeParams>,
) -> AppResult<Vec<crate::data_dto::TimeseriesItem>> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (mut sql, mut values) = DynamicTableManager::build_timeseries_sql(
&info.table_name,
tenant_id,
time_field,
time_grain,
start.as_deref(),
end.as_deref(),
)
.map_err(|e| AppError::Validation(e))?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
}
#[derive(FromQueryResult)]
struct TsRow {
period: Option<String>,
count: i64,
}
let rows = TsRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
Ok(rows
.into_iter()
.map(|r| crate::data_dto::TimeseriesItem {
period: r.period.unwrap_or_default(),
count: r.count,
})
.collect())
}
}
/// 从 plugins 表解析 manifest metadata.id如 "erp-crm"
pub async fn resolve_manifest_id(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<String> {
let model = plugin::Entity::find()
.filter(plugin::Column::Id.eq(plugin_id))
.filter(plugin::Column::TenantId.eq(tenant_id))
.filter(plugin::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_id)))?;
let manifest: crate::manifest::PluginManifest =
serde_json::from_value(model.manifest_json)
.map_err(|e| AppError::Internal(format!("解析插件 manifest 失败: {}", e)))?;
Ok(manifest.metadata.id)
}
/// 从 plugin_entities 表获取实体完整信息(带租户隔离)
/// 注意:此函数不填充 generated_fields仅用于非 list 场景
async fn resolve_entity_info(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<EntityInfo> {
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
Ok(EntityInfo {
table_name: entity.table_name,
schema_json: entity.schema_json,
generated_fields: vec![], // 旧路径,不追踪 generated_fields
})
}
/// 从缓存或数据库获取实体信息(带 generated_fields 解析)
pub async fn resolve_entity_info_cached(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
cache: &moka::sync::Cache<String, EntityInfo>,
) -> AppResult<EntityInfo> {
let cache_key = format!("{}:{}:{}", plugin_id, entity_name, tenant_id);
if let Some(info) = cache.get(&cache_key) {
return Ok(info);
}
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
// 解析 generated_fields
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(entity.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let generated_fields: Vec<String> = entity_def
.fields
.iter()
.filter(|f| f.field_type.supports_generated_column())
.filter(|f| {
f.unique
|| f.sortable == Some(true)
|| f.filterable == Some(true)
|| (f.required && (f.sortable == Some(true) || f.filterable == Some(true)))
})
.map(|f| sanitize_identifier(&f.name))
.collect();
let info = EntityInfo {
table_name: entity.table_name,
schema_json: entity.schema_json,
generated_fields,
};
cache.insert(cache_key, info.clone());
Ok(info)
}
/// 校验数据:检查 required 字段 + 正则校验
fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
for field in fields {
let label = field.display_name.as_deref().unwrap_or(&field.name);
// required 检查
if field.required && !obj.contains_key(&field.name) {
return Err(AppError::Validation(format!("字段 '{}' 不能为空", label)));
}
// 正则校验
if let Some(validation) = &field.validation {
if let Some(pattern) = &validation.pattern {
if let Some(val) = obj.get(&field.name) {
let str_val = val.as_str().unwrap_or("");
if !str_val.is_empty() {
let re = regex::Regex::new(pattern)
.map_err(|e| AppError::Internal(format!("正则表达式编译失败: {}", e)))?;
if !re.is_match(str_val) {
let default_msg = format!("字段 '{}' 格式不正确", label);
let msg = validation.message.as_deref()
.unwrap_or(&default_msg);
return Err(AppError::Validation(msg.to_string()));
}
}
}
}
}
}
Ok(())
}
/// 校验外键引用 — 检查 ref_entity 字段指向的记录是否存在
async fn validate_ref_entities(
data: &serde_json::Value,
fields: &[PluginField],
current_entity: &str,
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
is_create: bool,
record_id: Option<Uuid>,
) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
for field in fields {
let Some(ref_entity_name) = &field.ref_entity else { continue };
let Some(val) = obj.get(&field.name) else { continue };
let str_val = val.as_str().unwrap_or("").trim().to_string();
if str_val.is_empty() && !field.required { continue; }
if str_val.is_empty() { continue; }
let ref_id = Uuid::parse_str(&str_val).map_err(|_| {
AppError::Validation(format!(
"字段 '{}' 的值 '{}' 不是有效的 UUID",
field.display_name.as_deref().unwrap_or(&field.name),
str_val
))
})?;
// 自引用 + create跳过记录尚未存在
if ref_entity_name == current_entity && is_create {
continue;
}
// 自引用 + update检查是否引用自身
if ref_entity_name == current_entity && !is_create {
if let Some(rid) = record_id {
if ref_id == rid { continue; }
}
}
// 查询被引用记录是否存在
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
let ref_table = DynamicTableManager::table_name(&manifest_id, ref_entity_name);
let check_sql = format!(
"SELECT 1 as check_result FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1",
ref_table
);
#[derive(FromQueryResult)]
struct ExistsCheck { check_result: Option<i32> }
let result = ExistsCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[ref_id.into(), tenant_id.into()],
)).one(db).await?;
if result.is_none() {
return Err(AppError::Validation(format!(
"引用的 {} 记录不存在ID: {}",
ref_entity_name, ref_id
)));
}
}
Ok(())
}
/// 循环引用检测 — 用于 no_cycle 字段
async fn check_no_cycle(
record_id: Uuid,
field: &PluginField,
data: &serde_json::Value,
table_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
let Some(val) = data.get(&field.name) else { return Ok(()) };
let new_parent = val.as_str().unwrap_or("").trim().to_string();
if new_parent.is_empty() { return Ok(()); }
let new_parent_id = Uuid::parse_str(&new_parent).map_err(|_| {
AppError::Validation("parent_id 不是有效的 UUID".to_string())
})?;
let field_name = sanitize_identifier(&field.name);
let mut visited = vec![record_id];
let mut current_id = new_parent_id;
for _ in 0..100 {
if visited.contains(&current_id) {
let label = field.display_name.as_deref().unwrap_or(&field.name);
return Err(AppError::Validation(format!(
"字段 '{}' 形成循环引用", label
)));
}
visited.push(current_id);
let query_sql = format!(
"SELECT data->>'{}' as parent FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
field_name, table_name
);
#[derive(FromQueryResult)]
struct ParentRow { parent: Option<String> }
let row = ParentRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
query_sql,
[current_id.into(), tenant_id.into()],
)).one(db).await?;
match row {
Some(r) => {
let parent = r.parent.unwrap_or_default().trim().to_string();
if parent.is_empty() { break; }
current_id = Uuid::parse_str(&parent).map_err(|_| {
AppError::Internal("parent_id 不是有效的 UUID".to_string())
})?;
}
None => break,
}
}
Ok(())
}
/// 从 DataScopeParams 构建 SQL 条件片段和参数
fn build_scope_sql(
scope: &Option<DataScopeParams>,
generated_fields: &[String],
next_param_idx: usize,
) -> (String, Vec<sea_orm::Value>) {
match scope {
Some(s) => DynamicTableManager::build_data_scope_condition_with_params(
&s.scope_level,
&s.user_id,
&s.owner_field,
&s.dept_member_ids,
next_param_idx,
generated_fields,
),
None => (String::new(), vec![]),
}
}
/// 将数据权限条件合并到现有 SQL 中
///
/// `scope_condition` 是 `(sql_fragment, params)` 元组。
/// sql_fragment 格式为 `"field = $N OR ..."`,可直接拼接到 WHERE 子句。
fn merge_scope_condition(sql: String, scope_condition: &(String, Vec<sea_orm::Value>)) -> String {
if scope_condition.0.is_empty() {
return sql;
}
// 在 "deleted_at IS NULL" 之后追加 scope 条件
// 因为所有查询都包含 WHERE ... AND "deleted_at" IS NULL ...
// 我们在合适的位置追加 AND (scope_condition)
if sql.contains("\"deleted_at\" IS NULL") {
sql.replace(
"\"deleted_at\" IS NULL",
&format!("\"deleted_at\" IS NULL AND ({})", scope_condition.0),
)
} else if sql.contains("deleted_at IS NULL") {
sql.replace(
"deleted_at IS NULL",
&format!("deleted_at IS NULL AND ({})", scope_condition.0),
)
} else {
// 回退:直接追加到 WHERE 子句末尾
sql
}
}
#[cfg(test)]
mod validate_tests {
use super::*;
use crate::manifest::{FieldValidation, PluginField, PluginFieldType};
fn make_field(name: &str, pattern: Option<&str>, message: Option<&str>) -> PluginField {
PluginField {
name: name.to_string(),
field_type: PluginFieldType::String,
required: false,
validation: pattern.map(|p| FieldValidation {
pattern: Some(p.to_string()),
message: message.map(|m| m.to_string()),
}),
..PluginField::default_for_field()
}
}
#[test]
fn validate_phone_pattern_rejects_invalid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let data = serde_json::json!({"phone": "1234"});
let result = validate_data(&data, &fields);
assert!(result.is_err());
}
#[test]
fn validate_phone_pattern_accepts_valid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let data = serde_json::json!({"phone": "13812345678"});
let result = validate_data(&data, &fields);
assert!(result.is_ok());
}
#[test]
fn validate_empty_optional_field_skips_pattern() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), None)];
let data = serde_json::json!({"phone": ""});
let result = validate_data(&data, &fields);
assert!(result.is_ok());
}
}