Files
erp/crates/erp-plugin/src/data_service.rs
iven 9effa9f942 feat(plugin): 新增数据统计 REST API — count 和 aggregate 端点
- dynamic_table: 新增 build_filtered_count_sql(带过滤/搜索的 COUNT)和 build_aggregate_sql(按字段分组计数)
- data_service: 新增 count 和 aggregate 方法,支持实时统计查询
- data_handler: 新增 count_plugin_data 和 aggregate_plugin_data REST handler
- data_dto: 新增 AggregateItem、AggregateQueryParams、CountQueryParams 类型
- module: 注册 /plugins/{plugin_id}/{entity}/count 和 /aggregate 路由
- 包含 8 个新增单元测试,全部通过
2026-04-16 16:22:33 +08:00

418 lines
13 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, Statement};
use uuid::Uuid;
use erp_core::error::{AppError, AppResult};
use erp_core::events::EventBus;
use crate::data_dto::PluginDataResp;
use crate::dynamic_table::DynamicTableManager;
use crate::entity::plugin_entity;
use crate::error::PluginError;
use crate::manifest::PluginField;
pub struct PluginDataService;
impl PluginDataService {
/// 创建插件数据
pub async fn create(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
// 数据校验
let fields = resolve_entity_fields(plugin_id, entity_name, tenant_id, db).await?;
validate_data(&data, &fields)?;
let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) =
DynamicTableManager::build_insert_sql(&table_name, tenant_id, operator_id, &data);
#[derive(FromQueryResult)]
struct InsertResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = InsertResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| PluginError::DatabaseError("INSERT 未返回结果".to_string()))?;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 列表查询(支持过滤/搜索/排序)
pub async fn list(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
page: u64,
page_size: u64,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
sort_by: Option<String>,
sort_order: Option<String>,
) -> AppResult<(Vec<PluginDataResp>, u64)> {
let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?;
// 获取 searchable 字段列表
let entity_fields = resolve_entity_fields(plugin_id, entity_name, tenant_id, db).await?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
// Count使用基础条件不含 filter/search 以保持计数一致性)
let (count_sql, count_values) =
DynamicTableManager::build_count_sql(&table_name, tenant_id);
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let total = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
count_sql,
count_values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
// Query带过滤/搜索/排序)
let offset = page.saturating_sub(1) * page_size;
let (sql, values) = DynamicTableManager::build_filtered_query_sql(
&table_name,
tenant_id,
page_size,
offset,
filter,
search_tuple,
sort_by,
sort_order,
)
.map_err(|e| AppError::Validation(e))?;
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let rows = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let items = rows
.into_iter()
.map(|r| PluginDataResp {
id: r.id.to_string(),
data: r.data,
created_at: Some(r.created_at),
updated_at: Some(r.updated_at),
version: Some(r.version),
})
.collect();
Ok((items, total))
}
/// 按 ID 获取
pub async fn get_by_id(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<PluginDataResp> {
let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_get_by_id_sql(&table_name, id, tenant_id);
#[derive(FromQueryResult)]
struct DataRow {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let row = DataRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::NotFound("记录不存在".to_string()))?;
Ok(PluginDataResp {
id: row.id.to_string(),
data: row.data,
created_at: Some(row.created_at),
updated_at: Some(row.updated_at),
version: Some(row.version),
})
}
/// 更新
pub async fn update(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
data: serde_json::Value,
expected_version: i32,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<PluginDataResp> {
// 数据校验
let fields = resolve_entity_fields(plugin_id, entity_name, tenant_id, db).await?;
validate_data(&data, &fields)?;
let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_update_sql(
&table_name,
id,
tenant_id,
operator_id,
&data,
expected_version,
);
#[derive(FromQueryResult)]
struct UpdateResult {
id: Uuid,
data: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
version: i32,
}
let result = UpdateResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.ok_or_else(|| AppError::VersionMismatch)?;
Ok(PluginDataResp {
id: result.id.to_string(),
data: result.data,
created_at: Some(result.created_at),
updated_at: Some(result.updated_at),
version: Some(result.version),
})
}
/// 删除(软删除)
pub async fn delete(
plugin_id: Uuid,
entity_name: &str,
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_delete_sql(&table_name, id, tenant_id);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.await?;
Ok(())
}
/// 统计记录数(支持过滤和搜索)
pub async fn count(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
filter: Option<serde_json::Value>,
search: Option<String>,
) -> AppResult<u64> {
let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?;
// 获取 searchable 字段列表,构建搜索条件
let entity_fields = resolve_entity_fields(plugin_id, entity_name, tenant_id, db).await?;
let search_tuple = {
let searchable: Vec<&str> = entity_fields
.iter()
.filter(|f| f.searchable == Some(true))
.map(|f| f.name.as_str())
.collect();
match (searchable.is_empty(), &search) {
(false, Some(kw)) => Some((searchable.join(","), kw.clone())),
_ => None,
}
};
let (sql, values) = DynamicTableManager::build_filtered_count_sql(
&table_name,
tenant_id,
filter,
search_tuple,
)
.map_err(|e| AppError::Validation(e))?;
#[derive(FromQueryResult)]
struct CountResult {
count: i64,
}
let result = CountResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.one(db)
.await?
.map(|r| r.count as u64)
.unwrap_or(0);
Ok(result)
}
/// 聚合查询 — 按字段分组计数
/// 返回 [(分组键, 计数), ...]
pub async fn aggregate(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
group_by_field: &str,
filter: Option<serde_json::Value>,
) -> AppResult<Vec<(String, i64)>> {
let table_name = resolve_table_name(plugin_id, entity_name, tenant_id, db).await?;
let (sql, values) = DynamicTableManager::build_aggregate_sql(
&table_name,
tenant_id,
group_by_field,
filter,
)
.map_err(|e| AppError::Validation(e))?;
#[derive(FromQueryResult)]
struct AggRow {
key: Option<String>,
count: i64,
}
let rows = AggRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(db)
.await?;
let result = rows
.into_iter()
.map(|r| (r.key.unwrap_or_default(), r.count))
.collect();
Ok(result)
}
}
/// 从 plugin_entities 表解析 table_name带租户隔离
async fn resolve_table_name(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<String> {
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
Ok(entity.table_name)
}
/// 从 plugin_entities 表获取 entity 的字段定义
async fn resolve_entity_fields(
plugin_id: Uuid,
entity_name: &str,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> AppResult<Vec<PluginField>> {
let entity_model = plugin_entity::Entity::find()
.filter(plugin_entity::Column::PluginId.eq(plugin_id))
.filter(plugin_entity::Column::TenantId.eq(tenant_id))
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("插件实体 {}/{} 不存在", plugin_id, entity_name))
})?;
let entity_def: crate::manifest::PluginEntity =
serde_json::from_value(entity_model.schema_json)
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
Ok(entity_def.fields)
}
/// 校验数据:检查 required 字段
fn validate_data(data: &serde_json::Value, fields: &[PluginField]) -> AppResult<()> {
let obj = data.as_object().ok_or_else(|| {
AppError::Validation("data 必须是 JSON 对象".to_string())
})?;
for field in fields {
if field.required && !obj.contains_key(&field.name) {
let label = field.display_name.as_deref().unwrap_or(&field.name);
return Err(AppError::Validation(format!("字段 '{}' 不能为空", label)));
}
}
Ok(())
}