Files
erp/crates/erp-plugin/src/data_service.rs
iven e24b820d80 feat(plugin): 循环引用检测 — no_cycle 字段支持
新增 check_no_cycle 异步函数,通过沿 parent 链上溯检测
是否存在循环引用。在 update 方法中集成,对声明 no_cycle
的字段执行检测,最多遍历 100 层防止无限循环。
2026-04-17 10:38:41 +08:00

684 lines
22 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement};
use uuid::Uuid;
use erp_core::error::{AppError, AppResult};
use erp_core::events::EventBus;
use crate::data_dto::PluginDataResp;
use crate::dynamic_table::{sanitize_identifier, DynamicTableManager};
use crate::entity::plugin;
use crate::entity::plugin_entity;
use crate::error::PluginError;
use crate::manifest::PluginField;
use crate::state::EntityInfo;
pub struct PluginDataService;
impl PluginDataService {
/// 创建插件数据
pub async fn create(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, true, None).await?;
let (sql, values) =
DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, &data);
#[derive(FromQueryResult)]
struct InsertResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = InsertResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 列表查询(支持过滤/搜索/排序/Generated Column 路由)
pub async fn list(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
page: u64,
page_size: u64,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
sort_by: Option<String>,
sort_order: Option<String>,
cache: &moka::sync::Cache<String, EntityInfo>,
) -> AppResult<(Vec<PluginDataResp>, u64)> {
let info =
resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
// 获取 searchable 字段列表
let entity_fields = info.fields()?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
// Count
let (count_sql, count_values) =
DynamicTableManager::build_count_sql(&info.table_name, tenant_id);
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let total = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
count_sql,
count_values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
// Query — 使用 Generated Column 路由
let offset = page.saturating_sub(1) * page_size;
let (sql, values) = DynamicTableManager::build_filtered_query_sql_ex(
&info.table_name,
tenant_id,
page_size,
offset,
filter,
search_tuple,
sort_by,
sort_order,
&info.generated_fields,
)
.map_err(|e| AppError::Validation(e))?;
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let rows = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let items = rows
.into_iter()
.map(|r| PluginDataResp {
id: r.id.to_string(),
data: r.data,
created_at: Some(r.created_at),
updated_at: Some(r.updated_at),
version: Some(r.version),
})
.collect();
Ok((items, total))
}
/// 按 ID 获取
pub async fn get_by_id(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_get_by_id_sql(&info.table_name, id, tenant_id);
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let row = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::NotFound("记录不存在".to_string()))?;
Ok(PluginDataResp {
id: row.id.to_string(),
data: row.data,
created_at: Some(row.created_at),
updated_at: Some(row.updated_at),
version: Some(row.version),
})
}
/// 更新
pub async fn update(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
expected_version: i32,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id)).await?;
// 循环引用检测
for field in &fields {
if field.no_cycle == Some(true) && data.get(&field.name).is_some() {
check_no_cycle(id, field, &data, &info.table_name, tenant_id, db).await?;
}
}
let (sql, values) = DynamicTableManager::build_update_sql(
&info.table_name,
id,
tenant_id,
operator_id,
&data,
expected_version,
);
#[derive(FromQueryResult)]
struct UpdateResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = UpdateResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::VersionMismatch)?;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 删除(软删除)
pub async fn delete(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_delete_sql(&info.table_name, id, tenant_id);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
Ok(())
}
/// 统计记录数(支持过滤和搜索)
pub async fn count(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
) -> AppResult<u64> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let entity_fields = info.fields()?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
let (sql, values) = DynamicTableManager::build_filtered_count_sql(
&info.table_name,
tenant_id,
filter,
search_tuple,
)
.map_err(|e| AppError::Validation(e))?;
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let result = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
Ok(result)
}
/// 聚合查询 — 按字段分组计数
/// 返回 [(分组键, 计数), ...]
pub async fn aggregate(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
filter: Option<serde_json::Value>,
) -> AppResult<Vec<(String, i64)>> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_aggregate_sql(
&info.table_name,
tenant_id,
group_by_field,
filter,
)
.map_err(|e| AppError::Validation(e))?;
#[derive(FromQueryResult)]
struct AggRow {
key: Option<String>,
count: i64,
}
let rows = AggRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let result = rows
.into_iter()
.map(|r| (r.key.unwrap_or_default(), r.count))
.collect();
Ok(result)
}
/// 聚合查询(预留 Redis 缓存接口)
pub async fn aggregate_cached(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
filter: Option<serde_json::Value>,
) -> AppResult<Vec<(String, i64)>> {
// TODO: 未来版本添加 Redis 缓存层
Self::aggregate(plugin_id, entity_name, tenant_id, db, group_by_field, filter).await
}
}
/// 从 plugins 表解析 manifest metadata.id如 "erp-crm"
pub async fn resolve_manifest_id(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<String> {
let model = plugin::Entity::find()
.filter(plugin::Column::Id.eq(plugin_id))
.filter(plugin::Column::TenantId.eq(tenant_id))
.filter(plugin::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_id)))?;
let manifest: crate::manifest::PluginManifest =
serde_json::from_value(model.manifest_json)
.map_err(|e| AppError::Internal(format!("解析插件 manifest 失败: {}", e)))?;
Ok(manifest.metadata.id)
}
/// 从 plugin_entities 表获取实体完整信息(带租户隔离)
/// 注意:此函数不填充 generated_fields仅用于非 list 场景
async fn resolve_entity_info(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<EntityInfo> {
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
Ok(EntityInfo {
table_name: entity.table_name,
schema_json: entity.schema_json,
generated_fields: vec![], // 旧路径,不追踪 generated_fields
})
}
/// 从缓存或数据库获取实体信息(带 generated_fields 解析)
pub async fn resolve_entity_info_cached(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
cache: &moka::sync::Cache<String, EntityInfo>,
) -> AppResult<EntityInfo> {
let cache_key = format!("{}:{}:{}", plugin_id, entity_name, tenant_id);
if let Some(info) = cache.get(&cache_key) {
return Ok(info);
}
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
// 解析 generated_fields
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(entity.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let generated_fields: Vec<String> = entity_def
.fields
.iter()
.filter(|f| f.field_type.supports_generated_column())
.filter(|f| {
f.unique
|| f.sortable == Some(true)
|| f.filterable == Some(true)
|| (f.required && (f.sortable == Some(true) || f.filterable == Some(true)))
})
.map(|f| sanitize_identifier(&f.name))
.collect();
let info = EntityInfo {
table_name: entity.table_name,
schema_json: entity.schema_json,
generated_fields,
};
cache.insert(cache_key, info.clone());
Ok(info)
}
/// 校验数据:检查 required 字段 + 正则校验
fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
for field in fields {
let label = field.display_name.as_deref().unwrap_or(&field.name);
// required 检查
if field.required && !obj.contains_key(&field.name) {
return Err(AppError::Validation(format!("字段 '{}' 不能为空", label)));
}
// 正则校验
if let Some(validation) = &field.validation {
if let Some(pattern) = &validation.pattern {
if let Some(val) = obj.get(&field.name) {
let str_val = val.as_str().unwrap_or("");
if !str_val.is_empty() {
let re = regex::Regex::new(pattern)
.map_err(|e| AppError::Internal(format!("正则表达式编译失败: {}", e)))?;
if !re.is_match(str_val) {
let default_msg = format!("字段 '{}' 格式不正确", label);
let msg = validation.message.as_deref()
.unwrap_or(&default_msg);
return Err(AppError::Validation(msg.to_string()));
}
}
}
}
}
}
Ok(())
}
/// 校验外键引用 — 检查 ref_entity 字段指向的记录是否存在
async fn validate_ref_entities(
data: &serde_json::Value,
fields: &[PluginField],
current_entity: &str,
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
is_create: bool,
record_id: Option<Uuid>,
) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
for field in fields {
let Some(ref_entity_name) = &field.ref_entity else { continue };
let Some(val) = obj.get(&field.name) else { continue };
let str_val = val.as_str().unwrap_or("").trim().to_string();
if str_val.is_empty() && !field.required { continue; }
if str_val.is_empty() { continue; }
let ref_id = Uuid::parse_str(&str_val).map_err(|_| {
AppError::Validation(format!(
"字段 '{}' 的值 '{}' 不是有效的 UUID",
field.display_name.as_deref().unwrap_or(&field.name),
str_val
))
})?;
// 自引用 + create跳过记录尚未存在
if ref_entity_name == current_entity && is_create {
continue;
}
// 自引用 + update检查是否引用自身
if ref_entity_name == current_entity && !is_create {
if let Some(rid) = record_id {
if ref_id == rid { continue; }
}
}
// 查询被引用记录是否存在
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
let ref_table = DynamicTableManager::table_name(&manifest_id, ref_entity_name);
let check_sql = format!(
"SELECT 1 as check_result FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1",
ref_table
);
#[derive(FromQueryResult)]
struct ExistsCheck { check_result: Option<i32> }
let result = ExistsCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[ref_id.into(), tenant_id.into()],
)).one(db).await?;
if result.is_none() {
return Err(AppError::Validation(format!(
"引用的 {} 记录不存在ID: {}",
ref_entity_name, ref_id
)));
}
}
Ok(())
}
/// 循环引用检测 — 用于 no_cycle 字段
async fn check_no_cycle(
record_id: Uuid,
field: &PluginField,
data: &serde_json::Value,
table_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
let Some(val) = data.get(&field.name) else { return Ok(()) };
let new_parent = val.as_str().unwrap_or("").trim().to_string();
if new_parent.is_empty() { return Ok(()); }
let new_parent_id = Uuid::parse_str(&new_parent).map_err(|_| {
AppError::Validation("parent_id 不是有效的 UUID".to_string())
})?;
let field_name = sanitize_identifier(&field.name);
let mut visited = vec![record_id];
let mut current_id = new_parent_id;
for _ in 0..100 {
if visited.contains(&current_id) {
let label = field.display_name.as_deref().unwrap_or(&field.name);
return Err(AppError::Validation(format!(
"字段 '{}' 形成循环引用", label
)));
}
visited.push(current_id);
let query_sql = format!(
"SELECT data->>'{}' as parent FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
field_name, table_name
);
#[derive(FromQueryResult)]
struct ParentRow { parent: Option<String> }
let row = ParentRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
query_sql,
[current_id.into(), tenant_id.into()],
)).one(db).await?;
match row {
Some(r) => {
let parent = r.parent.unwrap_or_default().trim().to_string();
if parent.is_empty() { break; }
current_id = Uuid::parse_str(&parent).map_err(|_| {
AppError::Internal("parent_id 不是有效的 UUID".to_string())
})?;
}
None => break,
}
}
Ok(())
}
#[cfg(test)]
mod validate_tests {
use super::*;
use crate::manifest::{FieldValidation, PluginField, PluginFieldType};
fn make_field(name: &str, pattern: Option<&str>, message: Option<&str>) -> PluginField {
PluginField {
name: name.to_string(),
field_type: PluginFieldType::String,
required: false,
validation: pattern.map(|p| FieldValidation {
pattern: Some(p.to_string()),
message: message.map(|m| m.to_string()),
}),
..PluginField::default_for_field()
}
}
#[test]
fn validate_phone_pattern_rejects_invalid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let data = serde_json::json!({"phone": "1234"});
let result = validate_data(&data, &fields);
assert!(result.is_err());
}
#[test]
fn validate_phone_pattern_accepts_valid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let data = serde_json::json!({"phone": "13812345678"});
let result = validate_data(&data, &fields);
assert!(result.is_ok());
}
#[test]
fn validate_empty_optional_field_skips_pattern() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), None)];
let data = serde_json::json!({"phone": ""});
let result = validate_data(&data, &fields);
assert!(result.is_ok());
}
}