Files
hms/crates/erp-plugin/src/data_service.rs
iven 0a041c3d22 feat(plugin): P1-P4 审计修复 — 第二批 (运行时监控 + 通知引擎 + 编号reset)
2.1 运行时监控:
- LoadedPlugin 新增 RuntimeMetrics (调用次数/错误/响应时间/燃料消耗)
- execute_wasm 自动采集每次调用的耗时和状态
- GET /admin/plugins/{id}/metrics 端点

2.2 通知规则引擎:
- notification.rs: 订阅 plugin.trigger.* 事件
- 触发时自动给管理员发送消息通知
- emit_trigger_events 增加 manifest_id 到 payload

2.3 编号 reset_rule:
- 替换 PostgreSQL SEQUENCE 为表行 + pg_advisory_xact_lock
- 支持 daily/monthly/yearly/never 重置周期
- 每个周期独立计数,切换时自动重置为 1
2026-04-19 14:41:17 +08:00

1694 lines
62 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement};
use uuid::Uuid;
use erp_core::audit::{AuditLog};
use erp_core::audit_service;
use erp_core::error::{AppError, AppResult};
use erp_core::events::EventBus;
use crate::data_dto::{AggregateMultiRow, BatchActionReq, PluginDataResp};
use crate::dynamic_table::{sanitize_identifier, DynamicTableManager};
use crate::entity::plugin;
use crate::entity::plugin_entity;
use crate::error::PluginError;
use crate::manifest::PluginField;
use crate::state::EntityInfo;
/// 根据 plugin 数据库 ID 查找 manifest 中匹配 entity 的触发事件
async fn find_trigger_events(
plugin_db_id: Uuid,
entity_name: &str,
db: &sea_orm::DatabaseConnection,
) -> AppResult<Vec<crate::manifest::PluginTriggerEvent>> {
let model = plugin::Entity::find_by_id(plugin_db_id)
.one(db)
.await?
.ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_db_id)))?;
let manifest: crate::manifest::PluginManifest =
serde_json::from_value(model.manifest_json)
.map_err(|e| PluginError::InvalidManifest(e.to_string()))?;
let triggers = manifest.trigger_events
.unwrap_or_default()
.into_iter()
.filter(|t| t.entity == entity_name)
.collect();
Ok(triggers)
}
/// 发布触发事件
async fn emit_trigger_events(
triggers: &[crate::manifest::PluginTriggerEvent],
action: &str,
entity_name: &str,
record_id: &str,
tenant_id: Uuid,
data: Option<&serde_json::Value>,
event_bus: &EventBus,
db: &sea_orm::DatabaseConnection,
manifest_id: &str,
) {
use crate::manifest::PluginTriggerOn;
for trigger in triggers {
let should_fire = match &trigger.on {
PluginTriggerOn::Create => action == "create",
PluginTriggerOn::Update => action == "update",
PluginTriggerOn::Delete => action == "delete",
PluginTriggerOn::CreateOrUpdate => action == "create" || action == "update",
};
if should_fire {
let payload = serde_json::json!({
"event": trigger.name,
"entity": entity_name,
"record_id": record_id,
"data": data,
"plugin_id": manifest_id,
"trigger_name": trigger.name,
"action": action,
});
// 发布原始触发事件
let event = erp_core::events::DomainEvent::new(
&trigger.name,
tenant_id,
payload.clone(),
);
event_bus.publish(event, db).await;
// 同时发布 plugin.trigger.{manifest_id} 事件用于通知引擎
let notify_event = erp_core::events::DomainEvent::new(
format!("plugin.trigger.{}.{}", manifest_id, trigger.name),
tenant_id,
payload,
);
event_bus.publish(notify_event, db).await;
}
}
}
/// 行级数据权限参数 — 传递到 service 层注入 SQL 条件
pub struct DataScopeParams {
pub scope_level: String,
pub user_id: Uuid,
pub dept_member_ids: Vec<Uuid>,
pub owner_field: String,
}
pub struct PluginDataService;
impl PluginDataService {
/// 创建插件数据
pub async fn create(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, true, None).await?;
let (sql, values) =
DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, &data);
#[derive(FromQueryResult)]
struct InsertResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = InsertResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.create", entity_name)
.with_resource_id(result.id),
db,
)
.await;
// 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "create", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await;
}
}
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 列表查询(支持过滤/搜索/排序/Generated Column 路由/数据权限)
pub async fn list(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
page: u64,
page_size: u64,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
sort_by: Option<String>,
sort_order: Option<String>,
cache: &moka::sync::Cache<String, EntityInfo>,
scope: Option<DataScopeParams>,
) -> AppResult<(Vec<PluginDataResp>, u64)> {
let info =
resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
// 获取 searchable 字段列表
let entity_fields = info.fields()?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
// 构建数据权限条件count 查询只有 tenant_id 占 $1scope 从 $2 开始)
let count_scope = build_scope_sql(&scope, &info.generated_fields, 2);
// Count
let (count_sql, mut count_values) =
DynamicTableManager::build_count_sql(&info.table_name, tenant_id);
let count_sql = merge_scope_condition(count_sql, &count_scope);
count_values.extend(count_scope.1);
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let total = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
count_sql,
count_values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
// Query — 使用 Generated Column 路由
let offset = page.saturating_sub(1) * page_size;
let (sql, mut values) = DynamicTableManager::build_filtered_query_sql_ex(
&info.table_name,
tenant_id,
page_size,
offset,
filter,
search_tuple,
sort_by,
sort_order,
&info.generated_fields,
)
.map_err(|e| AppError::Validation(e))?;
// 注入数据权限条件scope 参数索引接在 values 之后)
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
let sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let rows = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let items = rows
.into_iter()
.map(|r| PluginDataResp {
id: r.id.to_string(),
data: r.data,
created_at: Some(r.created_at),
updated_at: Some(r.updated_at),
version: Some(r.version),
})
.collect();
Ok((items, total))
}
/// 按 ID 获取
pub async fn get_by_id(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_get_by_id_sql(&info.table_name, id, tenant_id);
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let row = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::NotFound("记录不存在".to_string()))?;
Ok(PluginDataResp {
id: row.id.to_string(),
data: row.data,
created_at: Some(row.created_at),
updated_at: Some(row.updated_at),
version: Some(row.version),
})
}
/// 更新
pub async fn update(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
expected_version: i32,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
validate_data(&data, &fields)?;
validate_ref_entities(&data, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id)).await?;
// 循环引用检测
for field in &fields {
if field.no_cycle == Some(true) && data.get(&field.name).is_some() {
check_no_cycle(id, field, &data, &info.table_name, tenant_id, db).await?;
}
}
let (sql, values) = DynamicTableManager::build_update_sql(
&info.table_name,
id,
tenant_id,
operator_id,
&data,
expected_version,
);
#[derive(FromQueryResult)]
struct UpdateResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = UpdateResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::VersionMismatch)?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.update", entity_name)
.with_resource_id(id),
db,
)
.await;
// 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "update", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid).await;
}
}
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 部分更新PATCH— 只合并提供的字段
pub async fn partial_update(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
partial_data: serde_json::Value,
expected_version: i32,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginDataResp> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
// 合并现有数据后校验,确保 partial update 也能触发 required/pattern/ref 校验
let existing = Self::get_by_id(plugin_id, entity_name, id, tenant_id, db).await?;
let merged = {
let mut base = existing.data.as_object().cloned().unwrap_or_default();
if let Some(patch) = partial_data.as_object() {
for (k, v) in patch {
base.insert(k.clone(), v.clone());
}
}
serde_json::Value::Object(base)
};
validate_data(&merged, &fields)?;
validate_ref_entities(&merged, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id)).await?;
let (sql, values) = DynamicTableManager::build_patch_sql(
&info.table_name, id, tenant_id, operator_id, partial_data, expected_version,
);
#[derive(FromQueryResult)]
struct PatchResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = PatchResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres, sql, values,
)).one(db).await?.ok_or_else(|| AppError::VersionMismatch)?;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 删除(软删除)
pub async fn delete(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
// 解析 entity schema 获取 relations
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(info.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
// 处理级联关系
for relation in &entity_def.relations {
let rel_table = DynamicTableManager::table_name(&manifest_id, &relation.entity);
let fk = sanitize_identifier(&relation.foreign_key);
match relation.on_delete {
crate::manifest::OnDeleteStrategy::Restrict => {
let check_sql = format!(
"SELECT 1 as chk FROM \"{}\" WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1",
rel_table, fk
);
#[derive(FromQueryResult)]
struct RefCheck { chk: Option<i32> }
let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[id.to_string().into(), tenant_id.into()],
)).one(db).await?;
if has_ref.is_some() {
return Err(AppError::Validation(format!(
"存在关联的 {} 记录,无法删除",
relation.entity
)));
}
}
crate::manifest::OnDeleteStrategy::Nullify => {
let nullify_sql = format!(
"UPDATE \"{}\" SET data = jsonb_set(data, '{{{}}}', 'null'), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
nullify_sql,
[id.to_string().into(), tenant_id.into()],
)).await?;
}
crate::manifest::OnDeleteStrategy::Cascade => {
let cascade_sql = format!(
"UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
cascade_sql,
[id.to_string().into(), tenant_id.into()],
)).await?;
}
}
}
// 软删除主记录
let (sql, values) = DynamicTableManager::build_delete_sql(&info.table_name, id, tenant_id);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
audit_service::record(
AuditLog::new(tenant_id, None, "plugin.data.delete", entity_name)
.with_resource_id(id),
db,
)
.await;
// 触发事件发布
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(&triggers, "delete", entity_name, &id.to_string(), tenant_id, None, _event_bus, db, &mid).await;
}
}
Ok(())
}
/// 导出数据(支持 JSON/CSV/XLSX 格式)
pub async fn export(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
sort_by: Option<String>,
sort_order: Option<String>,
format: Option<String>,
cache: &moka::sync::Cache<String, EntityInfo>,
scope: Option<DataScopeParams>,
) -> AppResult<crate::data_dto::ExportPayload> {
use crate::data_dto::ExportPayload;
let info =
resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?;
let entity_fields = info.fields()?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
let (sql, mut values) = DynamicTableManager::build_filtered_query_sql_ex(
&info.table_name,
tenant_id,
10000,
0,
filter,
search_tuple,
sort_by,
sort_order,
&info.generated_fields,
)
.map_err(|e| AppError::Validation(e))?;
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
let sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
#[derive(FromQueryResult)]
struct DataRow { data: serde_json::Value }
let rows = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let data: Vec<serde_json::Value> = rows.into_iter().map(|r| r.data).collect();
let fmt = format.as_deref().unwrap_or("json").to_lowercase();
match fmt.as_str() {
"csv" => Ok(ExportPayload::Csv(Self::to_csv(&data, &entity_fields)?)),
"xlsx" => Ok(ExportPayload::Xlsx(Self::to_xlsx(&data, &entity_fields)?)),
_ => Ok(ExportPayload::Json(data)),
}
}
fn to_csv(
rows: &[serde_json::Value],
fields: &[crate::manifest::PluginField],
) -> AppResult<Vec<u8>> {
let mut wtr = csv::Writer::from_writer(Vec::new());
let headers: Vec<&str> = fields.iter().map(|f| f.name.as_str()).collect();
wtr.write_record(&headers).map_err(|e| AppError::Internal(format!("CSV 写头失败: {}", e)))?;
for row in rows {
let record: Vec<String> = headers.iter().map(|h| {
row.get(*h).and_then(|v| match v {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
serde_json::Value::Null => Some(String::new()),
other => Some(other.to_string()),
}).unwrap_or_default()
}).collect();
wtr.write_record(&record).map_err(|e| AppError::Internal(format!("CSV 写行失败: {}", e)))?;
}
wtr.into_inner()
.map_err(|e| AppError::Internal(format!("CSV 刷新失败: {}", e)))
}
fn to_xlsx(
rows: &[serde_json::Value],
fields: &[crate::manifest::PluginField],
) -> AppResult<Vec<u8>> {
use rust_xlsxwriter::*;
let mut wb = Workbook::new();
let ws = wb.add_worksheet();
let header_fmt = Format::new().set_bold().set_background_color(Color::RGB(0x4F46E5)).set_font_color(Color::White);
for (col, field) in fields.iter().enumerate() {
let label = field.display_name.as_deref().unwrap_or(&field.name);
ws.write_string_with_format(0, col as u16, label, &header_fmt)
.map_err(|e| AppError::Internal(format!("XLSX 写头失败: {}", e)))?;
}
for (row_idx, row) in rows.iter().enumerate() {
for (col, field) in fields.iter().enumerate() {
let val = row.get(&field.name);
let row_num = (row_idx + 1) as u32;
match val {
Some(serde_json::Value::String(s)) => { ws.write_string(row_num, col as u16, s).ok(); }
Some(serde_json::Value::Number(n)) => {
if let Some(f) = n.as_f64() { ws.write_number(row_num, col as u16, f).ok(); }
else { ws.write_string(row_num, col as u16, &n.to_string()).ok(); }
}
Some(serde_json::Value::Bool(b)) => { ws.write_string(row_num, col as u16, &b.to_string()).ok(); }
_ => {}
}
}
}
let buf = wb.save_to_buffer()
.map_err(|e| AppError::Internal(format!("XLSX 保存失败: {}", e)))?;
Ok(buf.to_vec())
}
/// 批量导入数据(逐行校验 + 逐行插入)
pub async fn import(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
operator_id: Uuid,
rows: Vec<serde_json::Value>,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> AppResult<crate::data_dto::ImportResult> {
use crate::data_dto::{ImportResult, ImportRowError};
if rows.len() > 1000 {
return Err(AppError::Validation("单次导入上限 1000 行".to_string()));
}
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let fields = info.fields()?;
let mut success_count = 0usize;
let mut row_errors: Vec<ImportRowError> = Vec::new();
for (i, row_data) in rows.iter().enumerate() {
if let Err(e) = validate_data(row_data, &fields) {
row_errors.push(ImportRowError { row: i, errors: vec![e.to_string()] });
continue;
}
if let Err(e) = validate_ref_entities(row_data, &fields, entity_name, plugin_id, tenant_id, db, true, None).await {
row_errors.push(ImportRowError { row: i, errors: vec![e.to_string()] });
continue;
}
let (sql, values) =
DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, row_data);
let result = db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
)).await;
match result {
Ok(_) => success_count += 1,
Err(e) => {
row_errors.push(ImportRowError { row: i, errors: vec![format!("写入失败: {}", e)] });
}
}
}
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "plugin.data.import", entity_name),
db,
)
.await;
if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await {
if let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await {
emit_trigger_events(
&triggers, "create", entity_name,
&format!("batch_import:{}", success_count),
tenant_id, None, event_bus, db, &mid,
).await;
}
}
Ok(ImportResult {
success_count,
error_count: row_errors.len(),
errors: row_errors,
})
}
/// 批量操作 — batch_delete / batch_update
pub async fn batch(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
operator_id: Uuid,
req: BatchActionReq,
db: &sea_orm::DatabaseConnection,
) -> AppResult<u64> {
if req.ids.is_empty() {
return Err(AppError::Validation("ids 不能为空".to_string()));
}
if req.ids.len() > 100 {
return Err(AppError::Validation("批量操作上限 100 条".to_string()));
}
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let ids: Vec<Uuid> = req
.ids
.iter()
.map(|s| Uuid::parse_str(s))
.collect::<Result<Vec<_>, _>>()
.map_err(|_| AppError::Validation("ids 中包含无效的 UUID".to_string()))?;
let affected = match req.action.as_str() {
"batch_delete" => {
// 批量删除前先执行级联策略(逐条,复用 delete 的级联逻辑)
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(info.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
for &del_id in &ids {
for relation in &entity_def.relations {
let rel_table = DynamicTableManager::table_name(&manifest_id, &relation.entity);
let fk = sanitize_identifier(&relation.foreign_key);
match relation.on_delete {
crate::manifest::OnDeleteStrategy::Restrict => {
let check_sql = format!(
"SELECT 1 as chk FROM \"{}\" WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1",
rel_table, fk
);
#[derive(FromQueryResult)]
struct RefCheck { chk: Option<i32> }
let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[del_id.to_string().into(), tenant_id.into()],
)).one(db).await?;
if has_ref.is_some() {
return Err(AppError::Validation(format!(
"记录 {} 存在关联的 {} 记录,无法删除",
del_id, relation.entity
)));
}
}
crate::manifest::OnDeleteStrategy::Nullify => {
let nullify_sql = format!(
"UPDATE \"{}\" SET data = jsonb_set(data, '{{{}}}', 'null'), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
nullify_sql,
[del_id.to_string().into(), tenant_id.into()],
)).await?;
}
crate::manifest::OnDeleteStrategy::Cascade => {
let cascade_sql = format!(
"UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL",
rel_table, fk
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
cascade_sql,
[del_id.to_string().into(), tenant_id.into()],
)).await?;
}
}
}
}
let placeholders: Vec<String> = ids
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 2))
.collect();
let sql = format!(
"UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() \
WHERE tenant_id = $1 AND id IN ({}) AND deleted_at IS NULL",
info.table_name,
placeholders.join(", ")
);
let mut values = vec![tenant_id.into()];
for id in &ids {
values.push((*id).into());
}
let result = db
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
result.rows_affected()
}
"batch_update" => {
let update_data = req.data.ok_or_else(|| {
AppError::Validation("batch_update 需要 data 字段".to_string())
})?;
let mut set_expr = "data".to_string();
if let Some(obj) = update_data.as_object() {
for key in obj.keys() {
let clean_key = sanitize_identifier(key);
set_expr = format!(
"jsonb_set({}, '{{{}}}', $2::jsonb->'{}', true)",
set_expr, clean_key, clean_key
);
}
}
let placeholders: Vec<String> = ids
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 3))
.collect();
let sql = format!(
"UPDATE \"{}\" SET data = {}, updated_at = NOW(), updated_by = $1, version = version + 1 \
WHERE tenant_id = $1 AND id IN ({}) AND deleted_at IS NULL",
info.table_name,
set_expr,
placeholders.join(", ")
);
let mut values = vec![operator_id.into()];
values.push(
serde_json::to_string(&update_data)
.unwrap_or_default()
.into(),
);
for id in &ids {
values.push((*id).into());
}
let result = db
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
result.rows_affected()
}
_ => {
return Err(AppError::Validation(format!(
"不支持的批量操作: {}",
req.action
)))
}
};
Ok(affected)
}
/// 统计记录数(支持过滤和搜索)
pub async fn count(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
scope: Option<DataScopeParams>,
) -> AppResult<u64> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let entity_fields = info.fields()?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
let (mut sql, mut values) = DynamicTableManager::build_filtered_count_sql(
&info.table_name,
tenant_id,
filter,
search_tuple,
)
.map_err(|e| AppError::Validation(e))?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
}
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let result = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
Ok(result)
}
/// 聚合查询 — 按字段分组计数
/// 返回 [(分组键, 计数), ...]
pub async fn aggregate(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
filter: Option<serde_json::Value>,
scope: Option<DataScopeParams>,
) -> AppResult<Vec<(String, i64)>> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (mut sql, mut values) = DynamicTableManager::build_aggregate_sql(
&info.table_name,
tenant_id,
group_by_field,
filter,
)
.map_err(|e| AppError::Validation(e))?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
}
#[derive(FromQueryResult)]
struct AggRow {
key: Option<String>,
count: i64,
}
let rows = AggRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let result = rows
.into_iter()
.map(|r| (r.key.unwrap_or_default(), r.count))
.collect();
Ok(result)
}
/// 多聚合查询 — 支持 COUNT + SUM/AVG/MIN/MAX
pub async fn aggregate_multi(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
aggregations: &[(String, String)],
filter: Option<serde_json::Value>,
scope: Option<DataScopeParams>,
) -> AppResult<Vec<AggregateMultiRow>> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (mut sql, mut values) = DynamicTableManager::build_aggregate_multi_sql(
&info.table_name,
tenant_id,
group_by_field,
aggregations,
filter,
)
.map_err(|e| AppError::Validation(e))?;
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
}
// 使用 json_agg 包装整行,返回 JSON 数组
let json_sql = format!("SELECT json_agg(row_to_json(t)) as data FROM ({}) t", sql);
#[derive(Debug, FromQueryResult)]
struct JsonResult {
data: Option<serde_json::Value>,
}
let result = JsonResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
json_sql,
values,
))
.one(db)
.await?;
let json_rows: Vec<serde_json::Value> = result
.and_then(|r| r.data)
.and_then(|d| d.as_array().cloned())
.unwrap_or_default();
let rows = json_rows.into_iter().map(|v| AggregateMultiRow {
key: v.get("key").and_then(|k| k.as_str()).unwrap_or_default().to_string(),
count: v.get("count").and_then(|c| c.as_i64()).unwrap_or(0),
metrics: v.as_object()
.map(|m| m.iter()
.filter(|(k, _)| *k != "key" && *k != "count")
.map(|(k, v)| (k.clone(), v.as_f64().unwrap_or(0.0)))
.collect()
)
.unwrap_or_default(),
}).collect();
Ok(rows)
}
/// 聚合查询(预留 Redis 缓存接口)
pub async fn aggregate_cached(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
filter: Option<serde_json::Value>,
) -> AppResult<Vec<(String, i64)>> {
// TODO: 未来版本添加 Redis 缓存层
Self::aggregate(plugin_id, entity_name, tenant_id, db, group_by_field, filter, None).await
}
/// 时间序列聚合 — 按时间字段截断为 day/week/month 统计计数
pub async fn timeseries(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
time_field: &str,
time_grain: &str,
start: Option<String>,
end: Option<String>,
scope: Option<DataScopeParams>,
) -> AppResult<Vec<crate::data_dto::TimeseriesItem>> {
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
let (mut sql, mut values) = DynamicTableManager::build_timeseries_sql(
&info.table_name,
tenant_id,
time_field,
time_grain,
start.as_deref(),
end.as_deref(),
)
.map_err(|e| AppError::Validation(e))?;
// 合并数据权限条件
let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1);
if !scope_condition.0.is_empty() {
sql = merge_scope_condition(sql, &scope_condition);
values.extend(scope_condition.1);
}
#[derive(FromQueryResult)]
struct TsRow {
period: Option<String>,
count: i64,
}
let rows = TsRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
Ok(rows
.into_iter()
.map(|r| crate::data_dto::TimeseriesItem {
period: r.period.unwrap_or_default(),
count: r.count,
})
.collect())
}
/// 对账扫描: 检查指定插件所有实体的跨插件引用是否有悬空引用
pub async fn reconcile_references(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<crate::data_dto::ReconciliationReport> {
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
// 获取该插件所有实体
let entities = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::DeletedAt.is_null())
.all(db)
.await?;
let mut valid_count: i64 = 0;
let mut dangling_count: i64 = 0;
let mut details = Vec::new();
for entity_rec in &entities {
let schema: crate::manifest::PluginEntity =
serde_json::from_value(entity_rec.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
// 找出所有有 ref_entity 的字段
let ref_fields: Vec<&PluginField> = schema.fields.iter()
.filter(|f| f.ref_entity.is_some())
.collect();
if ref_fields.is_empty() {
continue;
}
let table_name = DynamicTableManager::table_name(&manifest_id, &entity_rec.entity_name);
for field in &ref_fields {
let col = sanitize_identifier(&field.name);
#[derive(FromQueryResult)]
struct RefRow {
id: Uuid,
// 动态列 — SeaORM 无法直接映射,用 JSON 构建
}
// 查询所有有 ref 值的记录
let ref_sql = format!(
"SELECT id, {} as ref_val FROM {} WHERE tenant_id = $1 AND deleted_at IS NULL AND {} IS NOT NULL",
col, table_name, col,
);
#[derive(FromQueryResult)]
struct RefValRow {
id: Uuid,
ref_val: String,
}
let rows = RefValRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
ref_sql,
[tenant_id.into()],
))
.all(db)
.await?;
for row in rows {
// 验证 ref_val 是有效的 UUID 且目标记录存在
let Ok(target_uuid) = Uuid::parse_str(&row.ref_val) else { continue };
let ref_entity_name = field.ref_entity.as_deref().unwrap_or("");
let ref_plugin = field.ref_plugin.as_deref().unwrap_or(&manifest_id);
let target_table = DynamicTableManager::table_name(ref_plugin, ref_entity_name);
let check_sql = format!(
"SELECT COUNT(*) as cnt FROM {} WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
target_table,
);
#[derive(FromQueryResult)]
struct CountRow {
cnt: i64,
}
let count_row = CountRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[target_uuid.into(), tenant_id.into()],
))
.one(db)
.await?
.unwrap_or(CountRow { cnt: 0 });
if count_row.cnt > 0 {
valid_count += 1;
} else {
dangling_count += 1;
details.push(crate::data_dto::DanglingRef {
entity: entity_rec.entity_name.clone(),
field: field.name.clone(),
record_id: row.id.to_string(),
dangling_value: row.ref_val,
});
}
}
}
}
Ok(crate::data_dto::ReconciliationReport {
valid_count,
dangling_count,
details,
})
}
}
/// 从 plugins 表解析 manifest metadata.id如 "erp-crm"
pub async fn resolve_manifest_id(
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<String> {
let model = plugin::Entity::find()
.filter(plugin::Column::Id.eq(plugin_id))
.filter(plugin::Column::TenantId.eq(tenant_id))
.filter(plugin::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_id)))?;
let manifest: crate::manifest::PluginManifest =
serde_json::from_value(model.manifest_json)
.map_err(|e| AppError::Internal(format!("解析插件 manifest 失败: {}", e)))?;
Ok(manifest.metadata.id)
}
/// 从 plugin_entities 表获取实体完整信息(带租户隔离)
/// 注意:此函数不填充 generated_fields仅用于非 list 场景
async fn resolve_entity_info(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<EntityInfo> {
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
Ok(EntityInfo {
table_name: entity.table_name,
schema_json: entity.schema_json,
generated_fields: vec![], // 旧路径,不追踪 generated_fields
})
}
/// 从缓存或数据库获取实体信息(带 generated_fields 解析)
pub async fn resolve_entity_info_cached(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
cache: &moka::sync::Cache<String, EntityInfo>,
) -> AppResult<EntityInfo> {
let cache_key = format!("{}:{}:{}", plugin_id, entity_name, tenant_id);
if let Some(info) = cache.get(&cache_key) {
return Ok(info);
}
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
// 解析 generated_fields
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(entity.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let generated_fields: Vec<String> = entity_def
.fields
.iter()
.filter(|f| f.field_type.supports_generated_column())
.filter(|f| {
f.unique
|| f.sortable == Some(true)
|| f.filterable == Some(true)
|| (f.required && (f.sortable == Some(true) || f.filterable == Some(true)))
})
.map(|f| sanitize_identifier(&f.name))
.collect();
let info = EntityInfo {
table_name: entity.table_name,
schema_json: entity.schema_json,
generated_fields,
};
cache.insert(cache_key, info.clone());
Ok(info)
}
/// 跨插件实体解析 — 按 manifest_id + entity_name 查找目标插件的实体信息
pub async fn resolve_cross_plugin_entity(
target_manifest_id: &str,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<EntityInfo> {
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::ManifestId.eq(target_manifest_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!(
"跨插件实体 {}/{} 不存在或未公开",
target_manifest_id, entity_name
))
})?;
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(entity.schema_json.clone())
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let generated_fields: Vec<String> = entity_def
.fields
.iter()
.filter(|f| f.field_type.supports_generated_column())
.filter(|f| {
f.unique
|| f.sortable == Some(true)
|| f.filterable == Some(true)
|| (f.required && (f.sortable == Some(true) || f.filterable == Some(true)))
})
.map(|f| sanitize_identifier(&f.name))
.collect();
Ok(EntityInfo {
table_name: entity.table_name,
schema_json: entity.schema_json,
generated_fields,
})
}
/// 检查目标插件是否安装且活跃
pub async fn is_plugin_active(
target_manifest_id: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> bool {
// 通过 plugin_entities 的 manifest_id 找到 plugin_id再检查 plugins 表状态
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::ManifestId.eq(target_manifest_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await;
let Some(entity) = entity.ok().flatten() else {
return false;
};
let plugin = plugin::Entity::find_by_id(entity.plugin_id)
.filter(plugin::Column::TenantId.eq(tenant_id))
.filter(plugin::Column::DeletedAt.is_null())
.one(db)
.await;
matches!(plugin.ok().flatten(), Some(p) if p.status == "running" || p.status == "installed")
}
/// 校验数据:检查 required 字段 + 正则校验
fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
for field in fields {
let label = field.display_name.as_deref().unwrap_or(&field.name);
// required 检查
if field.required && !obj.contains_key(&field.name) {
return Err(AppError::Validation(format!("字段 '{}' 不能为空", label)));
}
// 正则校验
if let Some(validation) = &field.validation {
if let Some(pattern) = &validation.pattern {
if let Some(val) = obj.get(&field.name) {
let str_val = val.as_str().unwrap_or("");
if !str_val.is_empty() {
let re = regex::Regex::new(pattern)
.map_err(|e| AppError::Internal(format!("正则表达式编译失败: {}", e)))?;
if !re.is_match(str_val) {
let default_msg = format!("字段 '{}' 格式不正确", label);
let msg = validation.message.as_deref()
.unwrap_or(&default_msg);
return Err(AppError::Validation(msg.to_string()));
}
}
}
}
}
}
Ok(())
}
/// 校验外键引用 — 检查 ref_entity 字段指向的记录是否存在
/// 支持同插件引用和跨插件引用ref_plugin 字段)
/// 核心原则:跨插件引用目标插件未安装时跳过校验(软警告)
async fn validate_ref_entities(
data: &serde_json::Value,
fields: &[PluginField],
current_entity: &str,
plugin_id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
is_create: bool,
record_id: Option<Uuid>,
) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
for field in fields {
let Some(ref_entity_name) = &field.ref_entity else { continue };
let Some(val) = obj.get(&field.name) else { continue };
let str_val = val.as_str().unwrap_or("").trim().to_string();
if str_val.is_empty() && !field.required { continue; }
if str_val.is_empty() { continue; }
let ref_id = Uuid::parse_str(&str_val).map_err(|_| {
AppError::Validation(format!(
"字段 '{}' 的值 '{}' 不是有效的 UUID",
field.display_name.as_deref().unwrap_or(&field.name),
str_val
))
})?;
// 自引用 + create跳过记录尚未存在
if ref_entity_name == current_entity && field.ref_plugin.is_none() && is_create {
continue;
}
// 自引用 + update检查是否引用自身
if ref_entity_name == current_entity && field.ref_plugin.is_none() && !is_create {
if let Some(rid) = record_id {
if ref_id == rid { continue; }
}
}
// 确定目标表名
let ref_table = if let Some(target_plugin) = &field.ref_plugin {
// 跨插件引用 — 检查目标插件是否活跃
if !is_plugin_active(target_plugin, tenant_id, db).await {
// 目标插件未安装/禁用 → 跳过校验(软警告,不阻塞)
tracing::debug!(
field = %field.name,
target_plugin = %target_plugin,
"跨插件引用目标插件未活跃,跳过校验"
);
continue;
}
// 目标插件活跃 → 解析目标表名
match resolve_cross_plugin_entity(target_plugin, ref_entity_name, tenant_id, db).await {
Ok(info) => info.table_name,
Err(e) => {
tracing::warn!(
field = %field.name,
target_plugin = %target_plugin,
entity = %ref_entity_name,
error = %e,
"跨插件实体解析失败,跳过校验"
);
continue;
}
}
} else {
// 同插件引用 — 使用原有逻辑
let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?;
DynamicTableManager::table_name(&manifest_id, ref_entity_name)
};
let check_sql = format!(
"SELECT 1 as check_result FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1",
ref_table
);
#[derive(FromQueryResult)]
struct ExistsCheck { check_result: Option<i32> }
let result = ExistsCheck::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
check_sql,
[ref_id.into(), tenant_id.into()],
)).one(db).await?;
if result.is_none() {
return Err(AppError::Validation(format!(
"引用的 {} 记录不存在ID: {}",
ref_entity_name, ref_id
)));
}
}
Ok(())
}
/// 循环引用检测 — 用于 no_cycle 字段
async fn check_no_cycle(
record_id: Uuid,
field: &PluginField,
data: &serde_json::Value,
table_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
let Some(val) = data.get(&field.name) else { return Ok(()) };
let new_parent = val.as_str().unwrap_or("").trim().to_string();
if new_parent.is_empty() { return Ok(()); }
let new_parent_id = Uuid::parse_str(&new_parent).map_err(|_| {
AppError::Validation("parent_id 不是有效的 UUID".to_string())
})?;
let field_name = sanitize_identifier(&field.name);
let mut visited = vec![record_id];
let mut current_id = new_parent_id;
for _ in 0..100 {
if visited.contains(&current_id) {
let label = field.display_name.as_deref().unwrap_or(&field.name);
return Err(AppError::Validation(format!(
"字段 '{}' 形成循环引用", label
)));
}
visited.push(current_id);
let query_sql = format!(
"SELECT data->>'{}' as parent FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
field_name, table_name
);
#[derive(FromQueryResult)]
struct ParentRow { parent: Option<String> }
let row = ParentRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
query_sql,
[current_id.into(), tenant_id.into()],
)).one(db).await?;
match row {
Some(r) => {
let parent = r.parent.unwrap_or_default().trim().to_string();
if parent.is_empty() { break; }
current_id = Uuid::parse_str(&parent).map_err(|_| {
AppError::Internal("parent_id 不是有效的 UUID".to_string())
})?;
}
None => break,
}
}
Ok(())
}
/// 从 DataScopeParams 构建 SQL 条件片段和参数
fn build_scope_sql(
scope: &Option<DataScopeParams>,
generated_fields: &[String],
next_param_idx: usize,
) -> (String, Vec<sea_orm::Value>) {
match scope {
Some(s) => DynamicTableManager::build_data_scope_condition_with_params(
&s.scope_level,
&s.user_id,
&s.owner_field,
&s.dept_member_ids,
next_param_idx,
generated_fields,
),
None => (String::new(), vec![]),
}
}
/// 将数据权限条件合并到现有 SQL 中
///
/// `scope_condition` 是 `(sql_fragment, params)` 元组。
/// sql_fragment 格式为 `"field = $N OR ..."`,可直接拼接到 WHERE 子句。
fn merge_scope_condition(sql: String, scope_condition: &(String, Vec<sea_orm::Value>)) -> String {
if scope_condition.0.is_empty() {
return sql;
}
// 在 "deleted_at IS NULL" 之后追加 scope 条件
// 因为所有查询都包含 WHERE ... AND "deleted_at" IS NULL ...
// 我们在合适的位置追加 AND (scope_condition)
if sql.contains("\"deleted_at\" IS NULL") {
sql.replace(
"\"deleted_at\" IS NULL",
&format!("\"deleted_at\" IS NULL AND ({})", scope_condition.0),
)
} else if sql.contains("deleted_at IS NULL") {
sql.replace(
"deleted_at IS NULL",
&format!("deleted_at IS NULL AND ({})", scope_condition.0),
)
} else {
// 回退:直接追加到 WHERE 子句末尾
sql
}
}
#[cfg(test)]
mod validate_tests {
use super::*;
use crate::manifest::{FieldValidation, PluginField, PluginFieldType};
fn make_field(name: &str, pattern: Option<&str>, message: Option<&str>) -> PluginField {
PluginField {
name: name.to_string(),
field_type: PluginFieldType::String,
required: false,
validation: pattern.map(|p| FieldValidation {
pattern: Some(p.to_string()),
message: message.map(|m| m.to_string()),
}),
..PluginField::default_for_field()
}
}
#[test]
fn validate_phone_pattern_rejects_invalid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let data = serde_json::json!({"phone": "1234"});
let result = validate_data(&data, &fields);
assert!(result.is_err());
}
#[test]
fn validate_phone_pattern_accepts_valid() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"))];
let data = serde_json::json!({"phone": "13812345678"});
let result = validate_data(&data, &fields);
assert!(result.is_ok());
}
#[test]
fn validate_empty_optional_field_skips_pattern() {
let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), None)];
let data = serde_json::json!({"phone": ""});
let result = validate_data(&data, &fields);
assert!(result.is_ok());
}
}