use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement}; use uuid::Uuid; use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::error::{AppError, AppResult}; use erp_core::events::EventBus; use crate::data_dto::{AggregateMultiRow, BatchActionReq, PluginDataResp}; use crate::dynamic_table::{DynamicTableManager, sanitize_identifier}; use crate::entity::plugin; use crate::entity::plugin_entity; use crate::error::PluginError; use crate::manifest::PluginField; use crate::state::EntityInfo; /// 根据 plugin 数据库 ID 查找 manifest 中匹配 entity 的触发事件 async fn find_trigger_events( plugin_db_id: Uuid, entity_name: &str, db: &sea_orm::DatabaseConnection, ) -> AppResult> { let model = plugin::Entity::find_by_id(plugin_db_id) .one(db) .await? .ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_db_id)))?; let manifest: crate::manifest::PluginManifest = serde_json::from_value(model.manifest_json) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let triggers = manifest .trigger_events .unwrap_or_default() .into_iter() .filter(|t| t.entity == entity_name) .collect(); Ok(triggers) } /// 发布触发事件 #[allow(clippy::too_many_arguments)] async fn emit_trigger_events( triggers: &[crate::manifest::PluginTriggerEvent], action: &str, entity_name: &str, record_id: &str, tenant_id: Uuid, data: Option<&serde_json::Value>, event_bus: &EventBus, db: &sea_orm::DatabaseConnection, manifest_id: &str, ) { use crate::manifest::PluginTriggerOn; for trigger in triggers { let should_fire = match &trigger.on { PluginTriggerOn::Create => action == "create", PluginTriggerOn::Update => action == "update", PluginTriggerOn::Delete => action == "delete", PluginTriggerOn::CreateOrUpdate => action == "create" || action == "update", }; if should_fire { let payload = serde_json::json!({ "event": trigger.name, "entity": entity_name, "record_id": record_id, "data": data, "plugin_id": manifest_id, "trigger_name": trigger.name, "action": action, }); // 发布原始触发事件 let event = erp_core::events::DomainEvent::new(&trigger.name, tenant_id, payload.clone()); event_bus.publish(event, db).await; // 同时发布 plugin.trigger.{manifest_id} 事件用于通知引擎 let notify_event = erp_core::events::DomainEvent::new( format!("plugin.trigger.{}.{}", manifest_id, trigger.name), tenant_id, payload, ); event_bus.publish(notify_event, db).await; } } } /// 行级数据权限参数 — 传递到 service 层注入 SQL 条件 pub struct DataScopeParams { pub scope_level: String, pub user_id: Uuid, pub dept_member_ids: Vec, pub owner_field: String, } pub struct PluginDataService; impl PluginDataService { /// 创建插件数据 pub async fn create( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, operator_id: Uuid, data: serde_json::Value, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let fields = info.fields()?; validate_data(&data, &fields)?; validate_ref_entities( &data, &fields, entity_name, plugin_id, tenant_id, db, true, None, ) .await?; let (sql, values) = DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, &data); #[derive(FromQueryResult)] struct InsertResult { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let result = InsertResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?; audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "plugin.data.create", entity_name, ) .with_resource_id(result.id), db, ) .await; // 触发事件发布 if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await && let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { emit_trigger_events( &triggers, "create", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid, ) .await; } Ok(PluginDataResp { id: result.id.to_string(), data: result.data, created_at: Some(result.created_at), updated_at: Some(result.updated_at), version: Some(result.version), }) } /// 列表查询(支持过滤/搜索/排序/Generated Column 路由/数据权限) #[allow(clippy::too_many_arguments)] pub async fn list( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, page: u64, page_size: u64, db: &sea_orm::DatabaseConnection, filter: Option, search: Option, sort_by: Option, sort_order: Option, cache: &moka::sync::Cache, scope: Option, ) -> AppResult<(Vec, u64)> { let info = resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?; // 获取 searchable 字段列表 let entity_fields = info.fields()?; let search_tuple = { let searchable: Vec<&str> = entity_fields .iter() .filter(|f| f.searchable == Some(true)) .map(|f| f.name.as_str()) .collect(); match (searchable.is_empty(), &search) { (false, Some(kw)) => Some((searchable.join(","), kw.clone())), _ => None, } }; // 构建数据权限条件(count 查询只有 tenant_id 占 $1,scope 从 $2 开始) let count_scope = build_scope_sql(&scope, &info.generated_fields, 2); // Count let (count_sql, mut count_values) = DynamicTableManager::build_count_sql(&info.table_name, tenant_id); let count_sql = merge_scope_condition(count_sql, &count_scope); count_values.extend(count_scope.1); #[derive(FromQueryResult)] struct CountResult { count: i64, } let total = CountResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, count_sql, count_values, )) .one(db) .await? .map(|r| r.count as u64) .unwrap_or(0); // Query — 使用 Generated Column 路由 let offset = page.saturating_sub(1) * page_size; let (sql, mut values) = DynamicTableManager::build_filtered_query_sql_ex( &info.table_name, tenant_id, page_size, offset, filter, search_tuple, sort_by, sort_order, &info.generated_fields, ) .map_err(AppError::Validation)?; // 注入数据权限条件(scope 参数索引接在 values 之后) let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1); let sql = merge_scope_condition(sql, &scope_condition); values.extend(scope_condition.1); #[derive(FromQueryResult)] struct DataRow { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let rows = DataRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(db) .await?; let items = rows .into_iter() .map(|r| PluginDataResp { id: r.id.to_string(), data: r.data, created_at: Some(r.created_at), updated_at: Some(r.updated_at), version: Some(r.version), }) .collect(); Ok((items, total)) } /// 按 ID 获取 pub async fn get_by_id( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_get_by_id_sql(&info.table_name, id, tenant_id); #[derive(FromQueryResult)] struct DataRow { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let row = DataRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| AppError::NotFound("记录不存在".to_string()))?; Ok(PluginDataResp { id: row.id.to_string(), data: row.data, created_at: Some(row.created_at), updated_at: Some(row.updated_at), version: Some(row.version), }) } /// 更新 #[allow(clippy::too_many_arguments)] pub async fn update( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, operator_id: Uuid, data: serde_json::Value, expected_version: i32, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let fields = info.fields()?; validate_data(&data, &fields)?; validate_ref_entities( &data, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id), ) .await?; // 循环引用检测 for field in &fields { if field.no_cycle == Some(true) && data.get(&field.name).is_some() { check_no_cycle(id, field, &data, &info.table_name, tenant_id, db).await?; } } let (sql, values) = DynamicTableManager::build_update_sql( &info.table_name, id, tenant_id, operator_id, &data, expected_version, ); #[derive(FromQueryResult)] struct UpdateResult { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let result = UpdateResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or(AppError::VersionMismatch)?; audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "plugin.data.update", entity_name, ) .with_resource_id(id), db, ) .await; // 触发事件发布 if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await && let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { emit_trigger_events( &triggers, "update", entity_name, &result.id.to_string(), tenant_id, Some(&result.data), _event_bus, db, &mid, ) .await; } Ok(PluginDataResp { id: result.id.to_string(), data: result.data, created_at: Some(result.created_at), updated_at: Some(result.updated_at), version: Some(result.version), }) } /// 部分更新(PATCH)— 只合并提供的字段 #[allow(clippy::too_many_arguments)] pub async fn partial_update( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, operator_id: Uuid, partial_data: serde_json::Value, expected_version: i32, db: &sea_orm::DatabaseConnection, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let fields = info.fields()?; // 合并现有数据后校验,确保 partial update 也能触发 required/pattern/ref 校验 let existing = Self::get_by_id(plugin_id, entity_name, id, tenant_id, db).await?; let merged = { let mut base = existing.data.as_object().cloned().unwrap_or_default(); if let Some(patch) = partial_data.as_object() { for (k, v) in patch { base.insert(k.clone(), v.clone()); } } serde_json::Value::Object(base) }; validate_data(&merged, &fields)?; validate_ref_entities( &merged, &fields, entity_name, plugin_id, tenant_id, db, false, Some(id), ) .await?; let (sql, values) = DynamicTableManager::build_patch_sql( &info.table_name, id, tenant_id, operator_id, partial_data, expected_version, ); #[derive(FromQueryResult)] struct PatchResult { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let result = PatchResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or(AppError::VersionMismatch)?; Ok(PluginDataResp { id: result.id.to_string(), data: result.data, created_at: Some(result.created_at), updated_at: Some(result.updated_at), version: Some(result.version), }) } /// 删除(软删除) pub async fn delete( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult<()> { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; // 解析 entity schema 获取 relations let entity_def: crate::manifest::PluginEntity = serde_json::from_value(info.schema_json.clone()) .map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?; let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?; // 处理级联关系 for relation in &entity_def.relations { let rel_table = DynamicTableManager::table_name(&manifest_id, &relation.entity); let fk = sanitize_identifier(&relation.foreign_key); match relation.on_delete { crate::manifest::OnDeleteStrategy::Restrict => { let check_sql = format!( "SELECT 1 as chk FROM \"{}\" WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1", rel_table, fk ); #[derive(FromQueryResult)] #[allow(dead_code)] // FromQueryResult 映射需要 chk 字段,仅检查是否存在 struct RefCheck { chk: Option, } let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, check_sql, [id.to_string().into(), tenant_id.into()], )) .one(db) .await?; if has_ref.is_some() { return Err(AppError::Validation(format!( "存在关联的 {} 记录,无法删除", relation.entity ))); } } crate::manifest::OnDeleteStrategy::Nullify => { let nullify_sql = format!( "UPDATE \"{}\" SET data = jsonb_set(data, '{{{}}}', 'null'), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL", rel_table, fk, fk ); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, nullify_sql, [id.to_string().into(), tenant_id.into()], )) .await?; } crate::manifest::OnDeleteStrategy::Cascade => { let cascade_sql = format!( "UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL", rel_table, fk ); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, cascade_sql, [id.to_string().into(), tenant_id.into()], )) .await?; } } } // 软删除主记录 let (sql, values) = DynamicTableManager::build_delete_sql(&info.table_name, id, tenant_id); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await?; audit_service::record( AuditLog::new(tenant_id, None, "plugin.data.delete", entity_name).with_resource_id(id), db, ) .await; // 触发事件发布 if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await && let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { emit_trigger_events( &triggers, "delete", entity_name, &id.to_string(), tenant_id, None, _event_bus, db, &mid, ) .await; } Ok(()) } /// 导出数据(支持 JSON/CSV/XLSX 格式) #[allow(clippy::too_many_arguments)] pub async fn export( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, filter: Option, search: Option, sort_by: Option, sort_order: Option, format: Option, cache: &moka::sync::Cache, scope: Option, ) -> AppResult { use crate::data_dto::ExportPayload; let info = resolve_entity_info_cached(plugin_id, entity_name, tenant_id, db, cache).await?; let entity_fields = info.fields()?; let search_tuple = { let searchable: Vec<&str> = entity_fields .iter() .filter(|f| f.searchable == Some(true)) .map(|f| f.name.as_str()) .collect(); match (searchable.is_empty(), &search) { (false, Some(kw)) => Some((searchable.join(","), kw.clone())), _ => None, } }; let (sql, mut values) = DynamicTableManager::build_filtered_query_sql_ex( &info.table_name, tenant_id, 10000, 0, filter, search_tuple, sort_by, sort_order, &info.generated_fields, ) .map_err(AppError::Validation)?; let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1); let sql = merge_scope_condition(sql, &scope_condition); values.extend(scope_condition.1); #[derive(FromQueryResult)] struct DataRow { data: serde_json::Value, } let rows = DataRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(db) .await?; let data: Vec = rows.into_iter().map(|r| r.data).collect(); let fmt = format.as_deref().unwrap_or("json").to_lowercase(); match fmt.as_str() { "csv" => Ok(ExportPayload::Csv(Self::to_csv(&data, &entity_fields)?)), "xlsx" => Ok(ExportPayload::Xlsx(Self::to_xlsx(&data, &entity_fields)?)), _ => Ok(ExportPayload::Json(data)), } } fn to_csv( rows: &[serde_json::Value], fields: &[crate::manifest::PluginField], ) -> AppResult> { let mut wtr = csv::Writer::from_writer(Vec::new()); let headers: Vec<&str> = fields.iter().map(|f| f.name.as_str()).collect(); wtr.write_record(&headers) .map_err(|e| AppError::Internal(format!("CSV 写头失败: {}", e)))?; for row in rows { let record: Vec = headers .iter() .map(|h| { row.get(*h) .map(|v| match v { serde_json::Value::String(s) => s.clone(), serde_json::Value::Number(n) => n.to_string(), serde_json::Value::Bool(b) => b.to_string(), serde_json::Value::Null => String::new(), other => other.to_string(), }) .unwrap_or_default() }) .collect(); wtr.write_record(&record) .map_err(|e| AppError::Internal(format!("CSV 写行失败: {}", e)))?; } wtr.into_inner() .map_err(|e| AppError::Internal(format!("CSV 刷新失败: {}", e))) } fn to_xlsx( rows: &[serde_json::Value], fields: &[crate::manifest::PluginField], ) -> AppResult> { use rust_xlsxwriter::*; let mut wb = Workbook::new(); let ws = wb.add_worksheet(); let header_fmt = Format::new() .set_bold() .set_background_color(Color::RGB(0x4F46E5)) .set_font_color(Color::White); for (col, field) in fields.iter().enumerate() { let label = field.display_name.as_deref().unwrap_or(&field.name); ws.write_string_with_format(0, col as u16, label, &header_fmt) .map_err(|e| AppError::Internal(format!("XLSX 写头失败: {}", e)))?; } for (row_idx, row) in rows.iter().enumerate() { for (col, field) in fields.iter().enumerate() { let val = row.get(&field.name); let row_num = (row_idx + 1) as u32; match val { Some(serde_json::Value::String(s)) => { ws.write_string(row_num, col as u16, s).ok(); } Some(serde_json::Value::Number(n)) => { if let Some(f) = n.as_f64() { ws.write_number(row_num, col as u16, f).ok(); } else { ws.write_string(row_num, col as u16, n.to_string()).ok(); } } Some(serde_json::Value::Bool(b)) => { ws.write_string(row_num, col as u16, b.to_string()).ok(); } _ => {} } } } let buf = wb .save_to_buffer() .map_err(|e| AppError::Internal(format!("XLSX 保存失败: {}", e)))?; Ok(buf.to_vec()) } /// 批量导入数据(逐行校验 + 逐行插入) pub async fn import( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, operator_id: Uuid, rows: Vec, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> AppResult { use crate::data_dto::{ImportResult, ImportRowError}; if rows.len() > 1000 { return Err(AppError::Validation("单次导入上限 1000 行".to_string())); } let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let fields = info.fields()?; let mut success_count = 0usize; let mut row_errors: Vec = Vec::new(); for (i, row_data) in rows.iter().enumerate() { if let Err(e) = validate_data(row_data, &fields) { row_errors.push(ImportRowError { row: i, errors: vec![e.to_string()], }); continue; } if let Err(e) = validate_ref_entities( row_data, &fields, entity_name, plugin_id, tenant_id, db, true, None, ) .await { row_errors.push(ImportRowError { row: i, errors: vec![e.to_string()], }); continue; } let (sql, values) = DynamicTableManager::build_insert_sql( &info.table_name, tenant_id, operator_id, row_data, ); let result = db .execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await; match result { Ok(_) => success_count += 1, Err(e) => { row_errors.push(ImportRowError { row: i, errors: vec![format!("写入失败: {}", e)], }); } } } audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "plugin.data.import", entity_name, ), db, ) .await; if let Ok(triggers) = find_trigger_events(plugin_id, entity_name, db).await && let Ok(mid) = resolve_manifest_id(plugin_id, tenant_id, db).await { emit_trigger_events( &triggers, "create", entity_name, &format!("batch_import:{}", success_count), tenant_id, None, event_bus, db, &mid, ) .await; } Ok(ImportResult { success_count, error_count: row_errors.len(), errors: row_errors, }) } /// 批量操作 — batch_delete / batch_update pub async fn batch( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, operator_id: Uuid, req: BatchActionReq, db: &sea_orm::DatabaseConnection, ) -> AppResult { if req.ids.is_empty() { return Err(AppError::Validation("ids 不能为空".to_string())); } if req.ids.len() > 100 { return Err(AppError::Validation("批量操作上限 100 条".to_string())); } let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let ids: Vec = req .ids .iter() .map(|s| Uuid::parse_str(s)) .collect::, _>>() .map_err(|_| AppError::Validation("ids 中包含无效的 UUID".to_string()))?; let affected = match req.action.as_str() { "batch_delete" => { // 批量删除前先执行级联策略(逐条,复用 delete 的级联逻辑) let entity_def: crate::manifest::PluginEntity = serde_json::from_value(info.schema_json.clone()).map_err(|e| { AppError::Internal(format!("解析 entity schema 失败: {}", e)) })?; let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?; for &del_id in &ids { for relation in &entity_def.relations { let rel_table = DynamicTableManager::table_name(&manifest_id, &relation.entity); let fk = sanitize_identifier(&relation.foreign_key); match relation.on_delete { crate::manifest::OnDeleteStrategy::Restrict => { let check_sql = format!( "SELECT 1 as chk FROM \"{}\" WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1", rel_table, fk ); #[derive(FromQueryResult)] #[allow(dead_code)] // FromQueryResult 映射需要 chk 字段,仅检查是否存在 struct RefCheck { chk: Option, } let has_ref = RefCheck::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, check_sql, [del_id.to_string().into(), tenant_id.into()], )) .one(db) .await?; if has_ref.is_some() { return Err(AppError::Validation(format!( "记录 {} 存在关联的 {} 记录,无法删除", del_id, relation.entity ))); } } crate::manifest::OnDeleteStrategy::Nullify => { let nullify_sql = format!( "UPDATE \"{}\" SET data = jsonb_set(data, '{{{}}}', 'null'), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL", rel_table, fk, fk ); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, nullify_sql, [del_id.to_string().into(), tenant_id.into()], )) .await?; } crate::manifest::OnDeleteStrategy::Cascade => { let cascade_sql = format!( "UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() WHERE data->>'{}' = $1 AND tenant_id = $2 AND deleted_at IS NULL", rel_table, fk ); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, cascade_sql, [del_id.to_string().into(), tenant_id.into()], )) .await?; } } } } let placeholders: Vec = ids .iter() .enumerate() .map(|(i, _)| format!("${}", i + 2)) .collect(); let sql = format!( "UPDATE \"{}\" SET deleted_at = NOW(), updated_at = NOW() \ WHERE tenant_id = $1 AND id IN ({}) AND deleted_at IS NULL", info.table_name, placeholders.join(", ") ); let mut values = vec![tenant_id.into()]; for id in &ids { values.push((*id).into()); } let result = db .execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await?; result.rows_affected() } "batch_update" => { let update_data = req.data.ok_or_else(|| { AppError::Validation("batch_update 需要 data 字段".to_string()) })?; let mut set_expr = "data".to_string(); if let Some(obj) = update_data.as_object() { for key in obj.keys() { let clean_key = sanitize_identifier(key); set_expr = format!( "jsonb_set({}, '{{{}}}', $2::jsonb->'{}', true)", set_expr, clean_key, clean_key ); } } let placeholders: Vec = ids .iter() .enumerate() .map(|(i, _)| format!("${}", i + 3)) .collect(); let sql = format!( "UPDATE \"{}\" SET data = {}, updated_at = NOW(), updated_by = $1, version = version + 1 \ WHERE tenant_id = $1 AND id IN ({}) AND deleted_at IS NULL", info.table_name, set_expr, placeholders.join(", ") ); let mut values = vec![operator_id.into()]; values.push( serde_json::to_string(&update_data) .unwrap_or_default() .into(), ); for id in &ids { values.push((*id).into()); } let result = db .execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await?; result.rows_affected() } _ => { return Err(AppError::Validation(format!( "不支持的批量操作: {}", req.action ))); } }; Ok(affected) } /// 统计记录数(支持过滤和搜索) pub async fn count( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, filter: Option, search: Option, scope: Option, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let entity_fields = info.fields()?; let search_tuple = { let searchable: Vec<&str> = entity_fields .iter() .filter(|f| f.searchable == Some(true)) .map(|f| f.name.as_str()) .collect(); match (searchable.is_empty(), &search) { (false, Some(kw)) => Some((searchable.join(","), kw.clone())), _ => None, } }; let (mut sql, mut values) = DynamicTableManager::build_filtered_count_sql( &info.table_name, tenant_id, filter, search_tuple, ) .map_err(AppError::Validation)?; // 合并数据权限条件 let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1); if !scope_condition.0.is_empty() { sql = merge_scope_condition(sql, &scope_condition); values.extend(scope_condition.1); } #[derive(FromQueryResult)] struct CountResult { count: i64, } let result = CountResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .map(|r| r.count as u64) .unwrap_or(0); Ok(result) } /// 聚合查询 — 按字段分组计数 /// 返回 [(分组键, 计数), ...] pub async fn aggregate( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, group_by_field: &str, filter: Option, scope: Option, ) -> AppResult> { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let (mut sql, mut values) = DynamicTableManager::build_aggregate_sql( &info.table_name, tenant_id, group_by_field, filter, ) .map_err(AppError::Validation)?; // 合并数据权限条件 let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1); if !scope_condition.0.is_empty() { sql = merge_scope_condition(sql, &scope_condition); values.extend(scope_condition.1); } #[derive(FromQueryResult)] struct AggRow { key: Option, count: i64, } let rows = AggRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(db) .await?; let result = rows .into_iter() .map(|r| (r.key.unwrap_or_default(), r.count)) .collect(); Ok(result) } /// 多聚合查询 — 支持 COUNT + SUM/AVG/MIN/MAX #[allow(clippy::too_many_arguments)] pub async fn aggregate_multi( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, group_by_field: &str, aggregations: &[(String, String)], filter: Option, scope: Option, ) -> AppResult> { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let (mut sql, mut values) = DynamicTableManager::build_aggregate_multi_sql( &info.table_name, tenant_id, group_by_field, aggregations, filter, ) .map_err(AppError::Validation)?; let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1); if !scope_condition.0.is_empty() { sql = merge_scope_condition(sql, &scope_condition); values.extend(scope_condition.1); } // 使用 json_agg 包装整行,返回 JSON 数组 let json_sql = format!("SELECT json_agg(row_to_json(t)) as data FROM ({}) t", sql); #[derive(Debug, FromQueryResult)] struct JsonResult { data: Option, } let result = JsonResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, json_sql, values, )) .one(db) .await?; let json_rows: Vec = result .and_then(|r| r.data) .and_then(|d| d.as_array().cloned()) .unwrap_or_default(); let rows = json_rows .into_iter() .map(|v| AggregateMultiRow { key: v .get("key") .and_then(|k| k.as_str()) .unwrap_or_default() .to_string(), count: v.get("count").and_then(|c| c.as_i64()).unwrap_or(0), metrics: v .as_object() .map(|m| { m.iter() .filter(|(k, _)| *k != "key" && *k != "count") .map(|(k, v)| (k.clone(), v.as_f64().unwrap_or(0.0))) .collect() }) .unwrap_or_default(), }) .collect(); Ok(rows) } /// 聚合查询(预留 Redis 缓存接口) pub async fn aggregate_cached( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, group_by_field: &str, filter: Option, ) -> AppResult> { // TODO: 未来版本添加 Redis 缓存层 Self::aggregate( plugin_id, entity_name, tenant_id, db, group_by_field, filter, None, ) .await } /// 时间序列聚合 — 按时间字段截断为 day/week/month 统计计数 #[allow(clippy::too_many_arguments)] pub async fn timeseries( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, time_field: &str, time_grain: &str, start: Option, end: Option, scope: Option, ) -> AppResult> { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let (mut sql, mut values) = DynamicTableManager::build_timeseries_sql( &info.table_name, tenant_id, time_field, time_grain, start.as_deref(), end.as_deref(), ) .map_err(AppError::Validation)?; // 合并数据权限条件 let scope_condition = build_scope_sql(&scope, &info.generated_fields, values.len() + 1); if !scope_condition.0.is_empty() { sql = merge_scope_condition(sql, &scope_condition); values.extend(scope_condition.1); } #[derive(FromQueryResult)] struct TsRow { period: Option, count: i64, } let rows = TsRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(db) .await?; Ok(rows .into_iter() .map(|r| crate::data_dto::TimeseriesItem { period: r.period.unwrap_or_default(), count: r.count, }) .collect()) } /// 对账扫描: 检查指定插件所有实体的跨插件引用是否有悬空引用 pub async fn reconcile_references( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?; // 获取该插件所有实体 let entities = plugin_entity::Entity::find() .filter(plugin_entity::Column::PluginId.eq(plugin_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::DeletedAt.is_null()) .all(db) .await?; let mut valid_count: i64 = 0; let mut dangling_count: i64 = 0; let mut details = Vec::new(); for entity_rec in &entities { let schema: crate::manifest::PluginEntity = serde_json::from_value(entity_rec.schema_json.clone()) .map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?; // 找出所有有 ref_entity 的字段 let ref_fields: Vec<&PluginField> = schema .fields .iter() .filter(|f| f.ref_entity.is_some()) .collect(); if ref_fields.is_empty() { continue; } let table_name = DynamicTableManager::table_name(&manifest_id, &entity_rec.entity_name); for field in &ref_fields { let col = sanitize_identifier(&field.name); #[derive(FromQueryResult)] #[allow(dead_code)] // FromQueryResult 映射需要 id 字段,通过 is_some 检查 struct RefRow { id: Uuid, // 动态列 — SeaORM 无法直接映射,用 JSON 构建 } // 查询所有有 ref 值的记录 let ref_sql = format!( "SELECT id, {} as ref_val FROM {} WHERE tenant_id = $1 AND deleted_at IS NULL AND {} IS NOT NULL", col, table_name, col, ); #[derive(FromQueryResult)] struct RefValRow { id: Uuid, ref_val: String, } let rows = RefValRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, ref_sql, [tenant_id.into()], )) .all(db) .await?; for row in rows { // 验证 ref_val 是有效的 UUID 且目标记录存在 let Ok(target_uuid) = Uuid::parse_str(&row.ref_val) else { continue; }; let ref_entity_name = field.ref_entity.as_deref().unwrap_or(""); let ref_plugin = field.ref_plugin.as_deref().unwrap_or(&manifest_id); let target_table = DynamicTableManager::table_name(ref_plugin, ref_entity_name); let check_sql = format!( "SELECT COUNT(*) as cnt FROM {} WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL", target_table, ); #[derive(FromQueryResult)] struct CountRow { cnt: i64, } let count_row = CountRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, check_sql, [target_uuid.into(), tenant_id.into()], )) .one(db) .await? .unwrap_or(CountRow { cnt: 0 }); if count_row.cnt > 0 { valid_count += 1; } else { dangling_count += 1; details.push(crate::data_dto::DanglingRef { entity: entity_rec.entity_name.clone(), field: field.name.clone(), record_id: row.id.to_string(), dangling_value: row.ref_val, }); } } } } Ok(crate::data_dto::ReconciliationReport { valid_count, dangling_count, details, }) } } /// 从 plugins 表解析 manifest metadata.id(如 "erp-crm") pub async fn resolve_manifest_id( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let model = plugin::Entity::find() .filter(plugin::Column::Id.eq(plugin_id)) .filter(plugin::Column::TenantId.eq(tenant_id)) .filter(plugin::Column::DeletedAt.is_null()) .one(db) .await? .ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_id)))?; let manifest: crate::manifest::PluginManifest = serde_json::from_value(model.manifest_json) .map_err(|e| AppError::Internal(format!("解析插件 manifest 失败: {}", e)))?; Ok(manifest.metadata.id) } /// 从 plugin_entities 表获取实体完整信息(带租户隔离) /// 注意:此函数不填充 generated_fields,仅用于非 list 场景 async fn resolve_entity_info( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let entity = plugin_entity::Entity::find() .filter(plugin_entity::Column::PluginId.eq(plugin_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::EntityName.eq(entity_name)) .filter(plugin_entity::Column::DeletedAt.is_null()) .one(db) .await? .ok_or_else(|| { AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name)) })?; Ok(EntityInfo { table_name: entity.table_name, schema_json: entity.schema_json, generated_fields: vec![], // 旧路径,不追踪 generated_fields }) } /// 从缓存或数据库获取实体信息(带 generated_fields 解析) pub async fn resolve_entity_info_cached( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, cache: &moka::sync::Cache, ) -> AppResult { let cache_key = format!("{}:{}:{}", plugin_id, entity_name, tenant_id); if let Some(info) = cache.get(&cache_key) { return Ok(info); } let entity = plugin_entity::Entity::find() .filter(plugin_entity::Column::PluginId.eq(plugin_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::EntityName.eq(entity_name)) .filter(plugin_entity::Column::DeletedAt.is_null()) .one(db) .await? .ok_or_else(|| { AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name)) })?; // 解析 generated_fields let entity_def: crate::manifest::PluginEntity = serde_json::from_value(entity.schema_json.clone()) .map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?; let generated_fields: Vec = entity_def .fields .iter() .filter(|f| f.field_type.supports_generated_column()) .filter(|f| { f.unique || f.sortable == Some(true) || f.filterable == Some(true) || (f.required && (f.sortable == Some(true) || f.filterable == Some(true))) }) .map(|f| sanitize_identifier(&f.name)) .collect(); let info = EntityInfo { table_name: entity.table_name, schema_json: entity.schema_json, generated_fields, }; cache.insert(cache_key, info.clone()); Ok(info) } /// 跨插件实体解析 — 按 manifest_id + entity_name 查找目标插件的实体信息 pub async fn resolve_cross_plugin_entity( target_manifest_id: &str, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let entity = plugin_entity::Entity::find() .filter(plugin_entity::Column::ManifestId.eq(target_manifest_id)) .filter(plugin_entity::Column::EntityName.eq(entity_name)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::DeletedAt.is_null()) .one(db) .await? .ok_or_else(|| { AppError::NotFound(format!( "跨插件实体 {}/{} 不存在或未公开", target_manifest_id, entity_name )) })?; let entity_def: crate::manifest::PluginEntity = serde_json::from_value(entity.schema_json.clone()) .map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?; let generated_fields: Vec = entity_def .fields .iter() .filter(|f| f.field_type.supports_generated_column()) .filter(|f| { f.unique || f.sortable == Some(true) || f.filterable == Some(true) || (f.required && (f.sortable == Some(true) || f.filterable == Some(true))) }) .map(|f| sanitize_identifier(&f.name)) .collect(); Ok(EntityInfo { table_name: entity.table_name, schema_json: entity.schema_json, generated_fields, }) } /// 检查目标插件是否安装且活跃 pub async fn is_plugin_active( target_manifest_id: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> bool { // 通过 plugin_entities 的 manifest_id 找到 plugin_id,再检查 plugins 表状态 let entity = plugin_entity::Entity::find() .filter(plugin_entity::Column::ManifestId.eq(target_manifest_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::DeletedAt.is_null()) .one(db) .await; let Some(entity) = entity.ok().flatten() else { return false; }; let plugin = plugin::Entity::find_by_id(entity.plugin_id) .filter(plugin::Column::TenantId.eq(tenant_id)) .filter(plugin::Column::DeletedAt.is_null()) .one(db) .await; matches!(plugin.ok().flatten(), Some(p) if p.status == "running" || p.status == "installed") } /// 校验数据:检查 required 字段 + 正则校验 fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<()> { let obj = data .as_object() .ok_or_else(|| AppError::Validation("data 必须是 JSON 对象".to_string()))?; for field in fields { let label = field.display_name.as_deref().unwrap_or(&field.name); // required 检查 if field.required && !obj.contains_key(&field.name) { return Err(AppError::Validation(format!("字段 '{}' 不能为空", label))); } // 正则校验 if let Some(validation) = &field.validation && let Some(pattern) = &validation.pattern && let Some(val) = obj.get(&field.name) { let str_val = val.as_str().unwrap_or(""); if !str_val.is_empty() { let re = regex::Regex::new(pattern) .map_err(|e| AppError::Internal(format!("正则表达式编译失败: {}", e)))?; if !re.is_match(str_val) { let default_msg = format!("字段 '{}' 格式不正确", label); let msg = validation.message.as_deref().unwrap_or(&default_msg); return Err(AppError::Validation(msg.to_string())); } } } } Ok(()) } /// 校验外键引用 — 检查 ref_entity 字段指向的记录是否存在 /// 支持同插件引用和跨插件引用(ref_plugin 字段) /// 核心原则:跨插件引用目标插件未安装时跳过校验(软警告) #[allow(clippy::too_many_arguments)] async fn validate_ref_entities( data: &serde_json::Value, fields: &[PluginField], current_entity: &str, plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, is_create: bool, record_id: Option, ) -> AppResult<()> { let obj = data .as_object() .ok_or_else(|| AppError::Validation("data 必须是 JSON 对象".to_string()))?; for field in fields { let Some(ref_entity_name) = &field.ref_entity else { continue; }; let Some(val) = obj.get(&field.name) else { continue; }; let str_val = val.as_str().unwrap_or("").trim().to_string(); if str_val.is_empty() && !field.required { continue; } if str_val.is_empty() { continue; } let ref_id = Uuid::parse_str(&str_val).map_err(|_| { AppError::Validation(format!( "字段 '{}' 的值 '{}' 不是有效的 UUID", field.display_name.as_deref().unwrap_or(&field.name), str_val )) })?; // 自引用 + create:跳过(记录尚未存在) if ref_entity_name == current_entity && field.ref_plugin.is_none() && is_create { continue; } // 自引用 + update:检查是否引用自身 if ref_entity_name == current_entity && field.ref_plugin.is_none() && !is_create && let Some(rid) = record_id && ref_id == rid { continue; } // 确定目标表名 let ref_table = if let Some(target_plugin) = &field.ref_plugin { // 跨插件引用 — 检查目标插件是否活跃 if !is_plugin_active(target_plugin, tenant_id, db).await { // 目标插件未安装/禁用 → 跳过校验(软警告,不阻塞) tracing::debug!( field = %field.name, target_plugin = %target_plugin, "跨插件引用目标插件未活跃,跳过校验" ); continue; } // 目标插件活跃 → 解析目标表名 match resolve_cross_plugin_entity(target_plugin, ref_entity_name, tenant_id, db).await { Ok(info) => info.table_name, Err(e) => { tracing::warn!( field = %field.name, target_plugin = %target_plugin, entity = %ref_entity_name, error = %e, "跨插件实体解析失败,跳过校验" ); continue; } } } else { // 同插件引用 — 使用原有逻辑 let manifest_id = resolve_manifest_id(plugin_id, tenant_id, db).await?; DynamicTableManager::table_name(&manifest_id, ref_entity_name) }; let check_sql = format!( "SELECT 1 as check_result FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL LIMIT 1", ref_table ); #[derive(FromQueryResult)] #[allow(dead_code)] // FromQueryResult 映射需要 check_result 字段,仅检查是否存在 struct ExistsCheck { check_result: Option, } let result = ExistsCheck::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, check_sql, [ref_id.into(), tenant_id.into()], )) .one(db) .await?; if result.is_none() { return Err(AppError::Validation(format!( "引用的 {} 记录不存在(ID: {})", ref_entity_name, ref_id ))); } } Ok(()) } /// 循环引用检测 — 用于 no_cycle 字段 async fn check_no_cycle( record_id: Uuid, field: &PluginField, data: &serde_json::Value, table_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult<()> { let Some(val) = data.get(&field.name) else { return Ok(()); }; let new_parent = val.as_str().unwrap_or("").trim().to_string(); if new_parent.is_empty() { return Ok(()); } let new_parent_id = Uuid::parse_str(&new_parent) .map_err(|_| AppError::Validation("parent_id 不是有效的 UUID".to_string()))?; let field_name = sanitize_identifier(&field.name); let mut visited = vec![record_id]; let mut current_id = new_parent_id; for _ in 0..100 { if visited.contains(¤t_id) { let label = field.display_name.as_deref().unwrap_or(&field.name); return Err(AppError::Validation(format!( "字段 '{}' 形成循环引用", label ))); } visited.push(current_id); let query_sql = format!( "SELECT data->>'{}' as parent FROM \"{}\" WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL", field_name, table_name ); #[derive(FromQueryResult)] struct ParentRow { parent: Option, } let row = ParentRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, query_sql, [current_id.into(), tenant_id.into()], )) .one(db) .await?; match row { Some(r) => { let parent = r.parent.unwrap_or_default().trim().to_string(); if parent.is_empty() { break; } current_id = Uuid::parse_str(&parent) .map_err(|_| AppError::Internal("parent_id 不是有效的 UUID".to_string()))?; } None => break, } } Ok(()) } /// 从 DataScopeParams 构建 SQL 条件片段和参数 fn build_scope_sql( scope: &Option, generated_fields: &[String], next_param_idx: usize, ) -> (String, Vec) { match scope { Some(s) => DynamicTableManager::build_data_scope_condition_with_params( &s.scope_level, &s.user_id, &s.owner_field, &s.dept_member_ids, next_param_idx, generated_fields, ), None => (String::new(), vec![]), } } /// 将数据权限条件合并到现有 SQL 中 /// /// `scope_condition` 是 `(sql_fragment, params)` 元组。 /// sql_fragment 格式为 `"field = $N OR ..."`,可直接拼接到 WHERE 子句。 fn merge_scope_condition(sql: String, scope_condition: &(String, Vec)) -> String { if scope_condition.0.is_empty() { return sql; } // 在 "deleted_at IS NULL" 之后追加 scope 条件 // 因为所有查询都包含 WHERE ... AND "deleted_at" IS NULL ... // 我们在合适的位置追加 AND (scope_condition) if sql.contains("\"deleted_at\" IS NULL") { sql.replace( "\"deleted_at\" IS NULL", &format!("\"deleted_at\" IS NULL AND ({})", scope_condition.0), ) } else if sql.contains("deleted_at IS NULL") { sql.replace( "deleted_at IS NULL", &format!("deleted_at IS NULL AND ({})", scope_condition.0), ) } else { // 回退:直接追加到 WHERE 子句末尾 sql } } #[cfg(test)] mod validate_tests { use super::*; use crate::manifest::{FieldValidation, PluginField, PluginFieldType}; fn make_field(name: &str, pattern: Option<&str>, message: Option<&str>) -> PluginField { PluginField { name: name.to_string(), field_type: PluginFieldType::String, required: false, validation: pattern.map(|p| FieldValidation { pattern: Some(p.to_string()), message: message.map(|m| m.to_string()), }), ..PluginField::default_for_field() } } #[test] fn validate_phone_pattern_rejects_invalid() { let fields = vec![make_field( "phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"), )]; let data = serde_json::json!({"phone": "1234"}); let result = validate_data(&data, &fields); assert!(result.is_err()); } #[test] fn validate_phone_pattern_accepts_valid() { let fields = vec![make_field( "phone", Some("^1[3-9]\\d{9}$"), Some("手机号格式不正确"), )]; let data = serde_json::json!({"phone": "13812345678"}); let result = validate_data(&data, &fields); assert!(result.is_ok()); } #[test] fn validate_empty_optional_field_skips_pattern() { let fields = vec![make_field("phone", Some("^1[3-9]\\d{9}$"), None)]; let data = serde_json::json!({"phone": ""}); let result = validate_data(&data, &fields); assert!(result.is_ok()); } }