use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement}; use uuid::Uuid; use erp_core::error::AppResult; use erp_core::events::EventBus; use crate::data_dto::PluginDataResp; use crate::dynamic_table::DynamicTableManager; use crate::entity::plugin_entity; use crate::error::PluginError; pub struct PluginDataService; impl PluginDataService { /// 创建插件数据 pub async fn create( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, operator_id: Uuid, data: serde_json::Value, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult { let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_insert_sql(&table_name, tenant_id, operator_id, &data); #[derive(FromQueryResult)] struct InsertResult { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let result = InsertResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?; Ok(PluginDataResp { id: result.id.to_string(), data: result.data, created_at: Some(result.created_at), updated_at: Some(result.updated_at), version: Some(result.version), }) } /// 列表查询 pub async fn list( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, page: u64, page_size: u64, db: &sea_orm::DatabaseConnection, ) -> AppResult<(Vec, u64)> { let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?; // Count let (count_sql, count_values) = DynamicTableManager::build_count_sql(&table_name, tenant_id); #[derive(FromQueryResult)] struct CountResult { count: i64, } let total = CountResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, count_sql, count_values, )) .one(db) .await? .map(|r| r.count as u64) .unwrap_or(0); // Query let offset = (page.saturating_sub(1)) * page_size; let (sql, values) = DynamicTableManager::build_query_sql(&table_name, tenant_id, page_size, offset); #[derive(FromQueryResult)] struct DataRow { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let rows = DataRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(db) .await?; let items = rows .into_iter() .map(|r| PluginDataResp { id: r.id.to_string(), data: r.data, created_at: Some(r.created_at), updated_at: Some(r.updated_at), version: Some(r.version), }) .collect(); Ok((items, total)) } /// 按 ID 获取 pub async fn get_by_id( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_get_by_id_sql(&table_name, id, tenant_id); #[derive(FromQueryResult)] struct DataRow { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let row = DataRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| erp_core::error::AppError::NotFound("记录不存在".to_string()))?; Ok(PluginDataResp { id: row.id.to_string(), data: row.data, created_at: Some(row.created_at), updated_at: Some(row.updated_at), version: Some(row.version), }) } /// 更新 pub async fn update( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, operator_id: Uuid, data: serde_json::Value, expected_version: i32, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult { let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_update_sql( &table_name, id, tenant_id, operator_id, &data, expected_version, ); #[derive(FromQueryResult)] struct UpdateResult { id: Uuid, data: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, version: i32, } let result = UpdateResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .one(db) .await? .ok_or_else(|| erp_core::error::AppError::VersionMismatch)?; Ok(PluginDataResp { id: result.id.to_string(), data: result.data, created_at: Some(result.created_at), updated_at: Some(result.updated_at), version: Some(result.version), }) } /// 删除(软删除) pub async fn delete( plugin_id: Uuid, entity_name: &str, id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult<()> { let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?; let (sql, values) = DynamicTableManager::build_delete_sql(&table_name, id, tenant_id); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .await?; Ok(()) } } /// 从 plugin_entities 表解析 table_name(带租户隔离) async fn resolve_table_name( plugin_id: Uuid, entity_name: &str, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let entity = plugin_entity::Entity::find() .filter(plugin_entity::Column::PluginId.eq(plugin_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::EntityName.eq(entity_name)) .filter(plugin_entity::Column::DeletedAt.is_null()) .one(db) .await? .ok_or_else(|| { erp_core::error::AppError::NotFound(format!( "插件实体 {}/{} 不存在", plugin_id, entity_name )) })?; Ok(entity.table_name) }