use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement}; use uuid::Uuid; use erp_core::error::{AppError, AppResult}; use erp_core::events::EventBus; use crate::data_dto::PluginDataResp; use crate::dynamic_table::DynamicTableManager; use crate::entity::plugin; use crate::entity::plugin_entity; use crate::error::PluginError; use crate::manifest::PluginField; pub struct PluginDataService; /// 插件实体信息(合并查询减少 DB 调用) struct EntityInfo { table_name: String, schema_json: serde_json::Value, } impl EntityInfo { fn fields(&self) -> AppResult> { let entity_def: crate::manifest::PluginEntity = serde_json::from_value(self.schema_json.clone()) .map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?; Ok(entity_def.fields) } } impl PluginDataService { /// 创建插件数据 pub async fn create( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, operator_id: Uuid, data: serde_json::Value, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let fields = info.fields()?; validate_data(&data, &fields)?; let (sql, values) = DynamicTableManager::build_insert_sql(&info.table_name, tenant_id, operator_id, &data); #[derive(FromQueryResult)] struct InsertResult { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let result = InsertResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?; Ok(PluginDataResp { id: result.id.to_string(), data: result.data, created_at: Some(result.created_at), updated_at: Some(result.updated_at), version: Some(result.version), }) } /// 列表查询(支持过滤/搜索/排序) pub async fn list( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, page: u64, page_size: u64, db: &sea_orm::DatabaseConnection, filter: Option, search: Option, sort_by: Option, sort_order: Option, ) -> AppResult<(Vec, u64)> { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; // 获取 searchable 字段列表 let entity_fields = info.fields()?; let search_tuple = { let searchable: Vec<&str> = entity_fields .iter() .filter(|f| f.searchable == Some(true)) .map(|f| f.name.as_str()) .collect(); match (searchable.is_empty(), &search) { (false, Some(kw)) => Some((searchable.join(","), kw.clone())), _ => None, } }; // Count let (count_sql, count_values) = DynamicTableManager::build_count_sql(&info.table_name, tenant_id); #[derive(FromQueryResult)] struct CountResult { count: i64, } let total = CountResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, count_sql, count_values, )) .one(db) .await? .map(|r| r.count as u64) .unwrap_or(0); // Query let offset = page.saturating_sub(1) * page_size; let (sql, values) = DynamicTableManager::build_filtered_query_sql( &info.table_name, tenant_id, page_size, offset, filter, search_tuple, sort_by, sort_order, ) .map_err(|e| AppError::Validation(e))?; #[derive(FromQueryResult)] struct DataRow { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let rows = DataRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(db) .await?; let items = rows .into_iter() .map(|r| PluginDataResp { id: r.id.to_string(), data: r.data, created_at: Some(r.created_at), updated_at: Some(r.updated_at), version: Some(r.version), }) .collect(); Ok((items, total)) } /// 按 ID 获取 pub async fn get_by_id( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_get_by_id_sql(&info.table_name, id, tenant_id); #[derive(FromQueryResult)] struct DataRow { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let row = DataRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| AppError::NotFound("记录不存在".to_string()))?; Ok(PluginDataResp { id: row.id.to_string(), data: row.data, created_at: Some(row.created_at), updated_at: Some(row.updated_at), version: Some(row.version), }) } /// 更新 pub async fn update( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, operator_id: Uuid, data: serde_json::Value, expected_version: i32, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let fields = info.fields()?; validate_data(&data, &fields)?; let (sql, values) = DynamicTableManager::build_update_sql( &info.table_name, id, tenant_id, operator_id, &data, expected_version, ); #[derive(FromQueryResult)] struct UpdateResult { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let result = UpdateResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| AppError::VersionMismatch)?; Ok(PluginDataResp { id: result.id.to_string(), data: result.data, created_at: Some(result.created_at), updated_at: Some(result.updated_at), version: Some(result.version), }) } /// 删除(软删除) pub async fn delete( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult<()> { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_delete_sql(&info.table_name, id, tenant_id); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await?; Ok(()) } /// 统计记录数(支持过滤和搜索) pub async fn count( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, filter: Option, search: Option, ) -> AppResult { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let entity_fields = info.fields()?; let search_tuple = { let searchable: Vec<&str> = entity_fields .iter() .filter(|f| f.searchable == Some(true)) .map(|f| f.name.as_str()) .collect(); match (searchable.is_empty(), &search) { (false, Some(kw)) => Some((searchable.join(","), kw.clone())), _ => None, } }; let (sql, values) = DynamicTableManager::build_filtered_count_sql( &info.table_name, tenant_id, filter, search_tuple, ) .map_err(|e| AppError::Validation(e))?; #[derive(FromQueryResult)] struct CountResult { count: i64, } let result = CountResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .map(|r| r.count as u64) .unwrap_or(0); Ok(result) } /// 聚合查询 — 按字段分组计数 /// 返回 [(分组键, 计数), ...] pub async fn aggregate( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, group_by_field: &str, filter: Option, ) -> AppResult> { let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_aggregate_sql( &info.table_name, tenant_id, group_by_field, filter, ) .map_err(|e| AppError::Validation(e))?; #[derive(FromQueryResult)] struct AggRow { key: Option, count: i64, } let rows = AggRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(db) .await?; let result = rows .into_iter() .map(|r| (r.key.unwrap_or_default(), r.count)) .collect(); Ok(result) } } /// 从 plugins 表解析 manifest metadata.id(如 "erp-crm") pub async fn resolve_manifest_id( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let model = plugin::Entity::find() .filter(plugin::Column::Id.eq(plugin_id)) .filter(plugin::Column::TenantId.eq(tenant_id)) .filter(plugin::Column::DeletedAt.is_null()) .one(db) .await? .ok_or_else(|| AppError::NotFound(format!("插件 {} 不存在", plugin_id)))?; let manifest: crate::manifest::PluginManifest = serde_json::from_value(model.manifest_json) .map_err(|e| AppError::Internal(format!("解析插件 manifest 失败: {}", e)))?; Ok(manifest.metadata.id) } /// 从 plugin_entities 表获取实体完整信息(带租户隔离) async fn resolve_entity_info( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let entity = plugin_entity::Entity::find() .filter(plugin_entity::Column::PluginId.eq(plugin_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::EntityName.eq(entity_name)) .filter(plugin_entity::Column::DeletedAt.is_null()) .one(db) .await? .ok_or_else(|| { AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name)) })?; Ok(EntityInfo { table_name: entity.table_name, schema_json: entity.schema_json, }) } /// 校验数据:检查 required 字段 fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<()> { let obj = data.as_object().ok_or_else(|| { AppError::Validation("data 必须是 JSON 对象".to_string()) })?; for field in fields { if field.required && !obj.contains_key(&field.name) { let label = field.display_name.as_deref().unwrap_or(&field.name); return Err(AppError::Validation(format!("字段 '{}' 不能为空", label))); } } Ok(()) }