Files
hms/crates/erp-plugin/src/handler/data_handler.rs
iven 6d5a711d2c
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix: 修复测试发现的 7 个问题 + 全 workspace clippy 清零
功能修复:
1. 患者创建空名称验证:后端添加 name.trim().is_empty() 检查
2. 仪表盘统计容错:单个查询失败返回零值而非 500
3. FHIR 路由修复:从 /fhir 移到 /api/v1/fhir 保持一致
4. 冻结模块后端中间件:新增 frozen_module_middleware 拦截冻结路径
5. 积分端点权限码:health.health-data.list → health.points.list
6. 角色权限迁移:护士补充 devices.list,运营补充 points.list/manage
7. 测试结果文档:R01-R05 角色测试 + T00/T10 结果归档

Clippy 全 workspace 清零(14→0 errors):
- erp-core: 修复 empty doc line、collapsible if、redundant closure 等 9 处
- erp-health: 修复 too_many_arguments、unused var、unnecessary parens 等 58 处
- erp-ai: 修复 dead_code、unused import 等 11 处
- erp-plugin: 修复 too_many_arguments、wildcard pattern 等 11 处
- erp-server-migration: 修复 enum_variant_names 5 处
- erp-auth/config/workflow/message: 各 1-3 处

工程改进:
- lint-staged 配置迁移到 .lintstagedrc.js(函数式避免文件列表传给 clippy)
- cargo fmt 统一格式化
2026-05-07 23:43:14 +08:00

1122 lines
36 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::Extension;
use axum::extract::{FromRef, Path, Query, State};
use axum::response::Json;
use uuid::Uuid;
use erp_core::error::AppError;
use erp_core::rbac::require_permission;
use erp_core::types::{ApiResponse, PaginatedResponse, TenantContext};
use crate::data_dto::{
AggregateItem, AggregateMultiReq, AggregateMultiRow, AggregateQueryParams, BatchActionReq,
CountQueryParams, CreatePluginDataReq, ExportParams, ImportReq, ImportResult,
PatchPluginDataReq, PluginDataListParams, PluginDataResp, PublicEntityResp,
ReconciliationReport, ResolveLabelsReq, ResolveLabelsResp, TimeseriesItem, TimeseriesParams,
UpdatePluginDataReq, UserViewReq, UserViewResp,
};
use crate::data_service::{DataScopeParams, PluginDataService, resolve_manifest_id};
use crate::state::PluginState;
use sea_orm::{ConnectionTrait, Statement};
/// 获取当前用户对指定权限的 data_scope 等级
///
/// 查询 user_roles -> role_permissions -> permissions 链路,
/// 返回匹配权限的 data_scope 设置,默认 "all"。
async fn get_data_scope(
ctx: &TenantContext,
permission_code: &str,
db: &sea_orm::DatabaseConnection,
) -> Result<String, AppError> {
use sea_orm::{FromQueryResult, Statement};
#[derive(FromQueryResult)]
struct ScopeResult {
data_scope: Option<String>,
}
let result = ScopeResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"SELECT rp.data_scope
FROM user_roles ur
JOIN role_permissions rp ON rp.role_id = ur.role_id
JOIN permissions p ON p.id = rp.permission_id
WHERE ur.user_id = $1 AND ur.tenant_id = $2 AND p.code = $3
LIMIT 1"#,
[
ctx.user_id.into(),
ctx.tenant_id.into(),
permission_code.into(),
],
))
.one(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(result
.and_then(|r| r.data_scope)
.unwrap_or_else(|| "all".to_string()))
}
/// 获取部门成员 ID 列表
///
/// 当前返回 TenantContext 中的 department_ids。
/// 未来实现递归查询部门树时将支持 include_sub_depts 参数。
async fn get_dept_members(ctx: &TenantContext, _include_sub_depts: bool) -> Vec<Uuid> {
// 当前 department_ids 为空时返回空列表
// 未来实现递归查询部门树
if ctx.department_ids.is_empty() {
return vec![];
}
ctx.department_ids.clone()
}
/// 计算插件数据操作所需的权限码
/// 格式:{manifest_id}.{entity}.{action},如 erp-crm.customer.list
fn compute_permission_code(manifest_id: &str, entity_name: &str, action: &str) -> String {
let action_suffix = match action {
"list" | "get" => "list",
_ => "manage",
};
format!("{}.{}.{}", manifest_id, entity_name, action_suffix)
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}",
params(PluginDataListParams),
responses(
(status = 200, description = "成功", body = ApiResponse<PaginatedResponse<PluginDataResp>>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity} — 列表
pub async fn list_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<PluginDataListParams>,
) -> Result<Json<ApiResponse<PaginatedResponse<PluginDataResp>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
.filter
.as_ref()
.and_then(|f| serde_json::from_str(f).ok());
let (items, total) = PluginDataService::list(
plugin_id,
&entity,
ctx.tenant_id,
page,
page_size,
&state.db,
filter,
params.search,
params.sort_by,
params.sort_order,
&state.entity_cache,
scope,
)
.await?;
Ok(Json(ApiResponse::ok(PaginatedResponse {
data: items,
total,
page,
page_size,
total_pages: (total as f64 / page_size as f64).ceil() as u64,
})))
}
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/{entity}",
request_body = CreatePluginDataReq,
responses(
(status = 200, description = "创建成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// POST /api/v1/plugins/{plugin_id}/{entity} — 创建
pub async fn create_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Json(req): Json<CreatePluginDataReq>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "create");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::create(
plugin_id,
&entity,
ctx.tenant_id,
ctx.user_id,
req.data,
&state.db,
&state.event_bus,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
responses(
(status = 200, description = "成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/{id} — 详情
pub async fn get_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "get");
require_permission(&ctx, &fine_perm)?;
let result =
PluginDataService::get_by_id(plugin_id, &entity, id, ctx.tenant_id, &state.db).await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
put,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
request_body = UpdatePluginDataReq,
responses(
(status = 200, description = "更新成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// PUT /api/v1/plugins/{plugin_id}/{entity}/{id} — 更新
pub async fn update_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
Json(req): Json<UpdatePluginDataReq>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "update");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::update(
plugin_id,
&entity,
id,
ctx.tenant_id,
ctx.user_id,
req.data,
req.version,
&state.db,
&state.event_bus,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
patch,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
request_body = PatchPluginDataReq,
responses(
(status = 200, description = "部分更新成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// PATCH /api/v1/plugins/{plugin_id}/{entity}/{id} — 部分更新jsonb_set 合并字段)
pub async fn patch_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
Json(req): Json<PatchPluginDataReq>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "update");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::partial_update(
plugin_id,
&entity,
id,
ctx.tenant_id,
ctx.user_id,
req.data,
req.version,
&state.db,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
delete,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
responses(
(status = 200, description = "删除成功"),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// DELETE /api/v1/plugins/{plugin_id}/{entity}/{id} — 删除
pub async fn delete_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
) -> Result<Json<ApiResponse<()>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "delete");
require_permission(&ctx, &fine_perm)?;
PluginDataService::delete(
plugin_id,
&entity,
id,
ctx.tenant_id,
&state.db,
&state.event_bus,
)
.await?;
Ok(Json(ApiResponse::ok(())))
}
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/{entity}/batch",
request_body = BatchActionReq,
responses(
(status = 200, description = "批量操作成功", body = ApiResponse<u64>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// POST /api/v1/plugins/{plugin_id}/{entity}/batch — 批量操作 (batch_delete / batch_update)
pub async fn batch_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Json(req): Json<BatchActionReq>,
) -> Result<Json<ApiResponse<u64>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let action_perm = match req.action.as_str() {
"batch_delete" => "delete",
"batch_update" => "update",
_ => "update",
};
let fine_perm = compute_permission_code(&manifest_id, &entity, action_perm);
require_permission(&ctx, &fine_perm)?;
let affected = PluginDataService::batch(
plugin_id,
&entity,
ctx.tenant_id,
ctx.user_id,
req,
&state.db,
)
.await?;
Ok(Json(ApiResponse::ok(affected)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/count",
params(CountQueryParams),
responses(
(status = 200, description = "成功", body = ApiResponse<u64>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/count — 统计计数
pub async fn count_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<CountQueryParams>,
) -> Result<Json<ApiResponse<u64>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
.filter
.as_ref()
.and_then(|f| serde_json::from_str(f).ok());
let total = PluginDataService::count(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
filter,
params.search,
scope,
)
.await?;
Ok(Json(ApiResponse::ok(total)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/aggregate",
params(AggregateQueryParams),
responses(
(status = 200, description = "成功", body = ApiResponse<Vec<AggregateItem>>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/aggregate — 聚合查询
pub async fn aggregate_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<AggregateQueryParams>,
) -> Result<Json<ApiResponse<Vec<AggregateItem>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
.filter
.as_ref()
.and_then(|f| serde_json::from_str(f).ok());
let rows = PluginDataService::aggregate(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
&params.group_by,
filter,
scope,
)
.await?;
let items = rows
.into_iter()
.map(|(key, count)| AggregateItem { key, count })
.collect();
Ok(Json(ApiResponse::ok(items)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/timeseries",
params(TimeseriesParams),
responses(
(status = 200, description = "时间序列数据", body = ApiResponse<Vec<TimeseriesItem>>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/timeseries — 时间序列聚合
pub async fn get_plugin_timeseries<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<TimeseriesParams>,
) -> Result<Json<ApiResponse<Vec<TimeseriesItem>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let result = PluginDataService::timeseries(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
&params.time_field,
&params.time_grain,
params.start,
params.end,
scope,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
/// 解析数据权限范围 — 检查 entity 是否启用 data_scope
/// 若启用则查询用户对该权限的 scope 等级,返回 DataScopeParams。
async fn resolve_data_scope(
ctx: &TenantContext,
manifest_id: &str,
entity: &str,
fine_perm: &str,
db: &sea_orm::DatabaseConnection,
) -> Result<Option<DataScopeParams>, AppError> {
let entity_has_scope = check_entity_data_scope(manifest_id, entity, db).await?;
if !entity_has_scope {
return Ok(None);
}
let scope_level = get_data_scope(ctx, fine_perm, db).await?;
if scope_level == "all" {
return Ok(None);
}
let dept_members = get_dept_members(ctx, false).await;
Ok(Some(DataScopeParams {
scope_level,
user_id: ctx.user_id,
dept_member_ids: dept_members,
owner_field: "owner_id".to_string(),
}))
}
/// 查询 entity 定义是否启用了 data_scope
async fn check_entity_data_scope(
_manifest_id: &str,
entity_name: &str,
db: &sea_orm::DatabaseConnection,
) -> Result<bool, AppError> {
use crate::entity::plugin_entity;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
let Some(e) = entity else { return Ok(false) };
let schema: crate::manifest::PluginEntity = serde_json::from_value(e.schema_json)
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
Ok(schema.data_scope.unwrap_or(false))
}
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/{entity}/aggregate-multi",
request_body = AggregateMultiReq,
responses(
(status = 200, description = "成功", body = ApiResponse<Vec<AggregateMultiRow>>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// POST /api/v1/plugins/{plugin_id}/{entity}/aggregate-multi — 多聚合查询
pub async fn aggregate_multi_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Json(body): Json<AggregateMultiReq>,
) -> Result<Json<ApiResponse<Vec<AggregateMultiRow>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let aggregations: Vec<(String, String)> = body
.aggregations
.iter()
.map(|a| (a.func.clone(), a.field.clone()))
.collect();
let rows = PluginDataService::aggregate_multi(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
&body.group_by,
&aggregations,
body.filter,
scope,
)
.await?;
Ok(Json(ApiResponse::ok(rows)))
}
// ─── 跨插件引用:批量标签解析 ────────────────────────────────────────
/// 批量解析引用字段的显示标签
///
/// POST /api/v1/plugins/{plugin_id}/{entity}/resolve-labels
pub async fn resolve_ref_labels<S>(
Path((plugin_id, entity)): Path<(Uuid, String)>,
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Json(body): Json<ResolveLabelsReq>,
) -> Result<Json<ApiResponse<ResolveLabelsResp>>, AppError>
where
PluginState: FromRef<S>,
{
use crate::data_service::{is_plugin_active, resolve_cross_plugin_entity};
use crate::manifest::PluginEntity;
use sea_orm::{FromQueryResult, Statement};
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 获取当前实体的 schema
let entity_info = crate::data_service::resolve_entity_info_cached(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
&state.entity_cache,
)
.await?;
let entity_def: PluginEntity = serde_json::from_value(entity_info.schema_json)
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
let mut labels = serde_json::Map::<String, serde_json::Value>::new();
let mut meta = serde_json::Map::<String, serde_json::Value>::new();
for (field_name, uuids) in &body.fields {
// 查找字段定义
let field_def = entity_def.fields.iter().find(|f| &f.name == field_name);
let Some(field_def) = field_def else { continue };
let Some(ref_entity_name) = &field_def.ref_entity else {
continue;
};
let target_plugin = field_def.ref_plugin.as_deref().unwrap_or(&manifest_id);
let label_field = field_def.ref_label_field.as_deref().unwrap_or("name");
let installed = is_plugin_active(target_plugin, ctx.tenant_id, &state.db).await;
// meta 信息
meta.insert(
field_name.clone(),
serde_json::json!({
"target_plugin": target_plugin,
"target_entity": ref_entity_name,
"label_field": label_field,
"plugin_installed": installed,
}),
);
if !installed {
// 目标插件未安装 → 所有 UUID 返回 null
let nulls: serde_json::Map<String, serde_json::Value> = uuids
.iter()
.map(|u| (u.clone(), serde_json::Value::Null))
.collect();
labels.insert(field_name.clone(), serde_json::Value::Object(nulls));
continue;
}
// 解析目标表名
let target_table = if field_def.ref_plugin.is_some() {
match resolve_cross_plugin_entity(
target_plugin,
ref_entity_name,
ctx.tenant_id,
&state.db,
)
.await
{
Ok(info) => info.table_name,
Err(_) => {
let nulls: serde_json::Map<String, serde_json::Value> = uuids
.iter()
.map(|u| (u.clone(), serde_json::Value::Null))
.collect();
labels.insert(field_name.clone(), serde_json::Value::Object(nulls));
continue;
}
}
} else {
crate::dynamic_table::DynamicTableManager::table_name(target_plugin, ref_entity_name)
};
// 批量查询标签
let uuid_strs: Vec<String> = uuids
.iter()
.filter_map(|u| Uuid::parse_str(u).ok())
.map(|u| u.to_string())
.collect();
if uuid_strs.is_empty() {
labels.insert(field_name.clone(), serde_json::json!({}));
continue;
}
// 构建 IN 子句参数
let placeholders: Vec<String> = (2..uuid_strs.len() + 2)
.map(|i| format!("${}", i))
.collect();
let sql = format!(
"SELECT id::text, data->>'{}' as label FROM \"{}\" WHERE id IN ({}) AND tenant_id = $1 AND deleted_at IS NULL",
label_field,
target_table,
placeholders.join(", ")
);
let mut values: Vec<sea_orm::Value> = vec![ctx.tenant_id.into()];
for u in &uuid_strs {
let uuid: Uuid = u
.parse()
.map_err(|e| AppError::Internal(format!("invalid uuid: {}", e)))?;
values.push(uuid.into());
}
#[derive(FromQueryResult)]
struct LabelRow {
id: String,
label: Option<String>,
}
let rows = LabelRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(&state.db)
.await?;
let mut field_labels: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
// 初始化所有请求的 UUID 为 null
for u in uuids {
field_labels.insert(u.clone(), serde_json::Value::Null);
}
// 用查询结果填充
for row in rows {
field_labels.insert(
row.id,
serde_json::Value::String(row.label.unwrap_or_default()),
);
}
labels.insert(field_name.clone(), serde_json::Value::Object(field_labels));
}
Ok(Json(ApiResponse::ok(ResolveLabelsResp {
labels: serde_json::Value::Object(labels),
meta: serde_json::Value::Object(meta),
})))
}
// ─── 跨插件引用:实体注册表查询 ────────────────────────────────────────
/// 查询所有可跨插件引用的公开实体
///
/// GET /api/v1/plugin-registry/entities
pub async fn list_public_entities<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
) -> Result<Json<ApiResponse<Vec<PublicEntityResp>>>, AppError>
where
PluginState: FromRef<S>,
{
use crate::entity::plugin_entity;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let entities = plugin_entity::Entity::find()
.filter(plugin_entity::Column::TenantId.eq(ctx.tenant_id))
.filter(plugin_entity::Column::IsPublic.eq(true))
.filter(plugin_entity::Column::DeletedAt.is_null())
.all(&state.db)
.await?;
let result: Vec<PublicEntityResp> = entities
.iter()
.map(|e| {
let display_name = e
.schema_json
.get("display_name")
.and_then(|v| v.as_str())
.unwrap_or(&e.entity_name)
.to_string();
PublicEntityResp {
manifest_id: e.manifest_id.clone(),
plugin_id: e.plugin_id.to_string(),
entity_name: e.entity_name.clone(),
display_name,
}
})
.collect();
Ok(Json(ApiResponse::ok(result)))
}
// ─── 数据导入导出 ──────────────────────────────────────────────────────
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/export",
params(ExportParams),
responses(
(status = 200, description = "导出成功"),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/export — 导出数据 (JSON/CSV/XLSX)
pub async fn export_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<ExportParams>,
) -> Result<axum::response::Response, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
use crate::data_dto::ExportPayload;
use axum::body::Body;
use axum::http::{StatusCode, header};
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
let scope = resolve_data_scope(&ctx, &manifest_id, &entity, &fine_perm, &state.db).await?;
let filter: Option<serde_json::Value> = params
.filter
.as_ref()
.and_then(|f| serde_json::from_str(f).ok());
let payload = PluginDataService::export(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
filter,
params.search,
params.sort_by,
params.sort_order,
params.format,
&state.entity_cache,
scope,
)
.await?;
let filename = format!(
"{}_export_{}",
entity,
chrono::Utc::now().format("%Y%m%d%H%M%S")
);
match payload {
ExportPayload::Json(data) => {
let body = serde_json::to_string(&ApiResponse::ok(data))
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(body))
.unwrap())
}
ExportPayload::Csv(bytes) => Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/csv; charset=utf-8")
.header(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}.csv\"", filename),
)
.body(Body::from(bytes))
.unwrap()),
ExportPayload::Xlsx(bytes) => Ok(axum::response::Response::builder()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
.header(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}.xlsx\"", filename),
)
.body(Body::from(bytes))
.unwrap()),
}
}
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/{entity}/import",
request_body = ImportReq,
responses(
(status = 200, description = "导入完成", body = ApiResponse<ImportResult>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// POST /api/v1/plugins/{plugin_id}/{entity}/import — 导入数据
pub async fn import_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Json(req): Json<ImportReq>,
) -> Result<Json<ApiResponse<ImportResult>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "create");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::import(
plugin_id,
&entity,
ctx.tenant_id,
ctx.user_id,
req.rows,
&state.db,
&state.event_bus,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
/// POST /api/v1/plugins/{plugin_id}/reconcile — 对账扫描
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/reconcile",
responses(
(status = 200, description = "对账报告", body = ApiResponse<ReconciliationReport>)
),
tag = "Plugin Data",
)]
pub async fn reconcile_refs<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path(plugin_id): Path<Uuid>,
) -> Result<Json<ApiResponse<crate::data_dto::ReconciliationReport>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "plugin.admin")?;
let report =
PluginDataService::reconcile_references(plugin_id, ctx.tenant_id, &state.db).await?;
Ok(Json(ApiResponse::ok(report)))
}
// ─── 用户自定义视图 CRUD ──────────────────────────────────────────────────
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/views",
responses(
(status = 200, description = "视图列表", body = ApiResponse<Vec<UserViewResp>>)
),
tag = "Plugin Views",
)]
pub async fn list_user_views<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
) -> Result<Json<ApiResponse<Vec<crate::data_dto::UserViewResp>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
use sea_orm::FromQueryResult;
#[derive(FromQueryResult)]
struct ViewRow {
id: Uuid,
view_name: String,
view_config: serde_json::Value,
is_default: bool,
created_at: Option<chrono::DateTime<chrono::Utc>>,
updated_at: Option<chrono::DateTime<chrono::Utc>>,
}
let rows = ViewRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"SELECT id, view_name, view_config, is_default, created_at, updated_at \
FROM plugin_user_views WHERE tenant_id = $1 AND user_id = $2 AND plugin_id = $3 AND entity_name = $4 \
ORDER BY created_at DESC",
[ctx.tenant_id.into(), ctx.user_id.into(), manifest_id.clone().into(), entity.clone().into()],
))
.all(&state.db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
let mid = manifest_id.clone();
let ent = entity.clone();
let items = rows
.into_iter()
.map(|r| UserViewResp {
id: r.id.to_string(),
plugin_id: mid.clone(),
entity_name: ent.clone(),
view_name: r.view_name,
view_config: r.view_config,
is_default: r.is_default,
created_at: r.created_at,
updated_at: r.updated_at,
})
.collect();
Ok(Json(ApiResponse::ok(items)))
}
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/{entity}/views",
request_body = UserViewReq,
responses(
(status = 200, description = "创建视图", body = ApiResponse<UserViewResp>)
),
tag = "Plugin Views",
)]
pub async fn create_user_view<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Json(req): Json<crate::data_dto::UserViewReq>,
) -> Result<Json<ApiResponse<crate::data_dto::UserViewResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let view_id = Uuid::now_v7();
let now = chrono::Utc::now();
let is_default = req.is_default.unwrap_or(false);
let mid = manifest_id.clone();
let ent = entity.clone();
let view_name = req.view_name.clone();
let view_config = req.view_config.clone();
if is_default {
state.db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"UPDATE plugin_user_views SET is_default = false \
WHERE tenant_id = $1 AND user_id = $2 AND plugin_id = $3 AND entity_name = $4 AND is_default = true",
[ctx.tenant_id.into(), ctx.user_id.into(), mid.clone().into(), ent.clone().into()],
)).await.map_err(|e| AppError::Internal(e.to_string()))?;
}
state.db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"INSERT INTO plugin_user_views (id, tenant_id, user_id, plugin_id, entity_name, view_name, view_config, is_default, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
[
view_id.into(), ctx.tenant_id.into(), ctx.user_id.into(),
mid.into(), ent.into(),
view_name.into(), view_config.into(), is_default.into(),
now.into(), now.into(),
],
)).await.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(Json(ApiResponse::ok(UserViewResp {
id: view_id.to_string(),
plugin_id: manifest_id,
entity_name: entity,
view_name: req.view_name,
view_config: req.view_config,
is_default,
created_at: Some(now),
updated_at: Some(now),
})))
}
/// DELETE /api/v1/plugins/{plugin_id}/{entity}/views/{view_id} — 删除视图
pub async fn delete_user_view<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((_plugin_id, _entity, view_id)): Path<(Uuid, String, Uuid)>,
) -> Result<Json<ApiResponse<()>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
state
.db
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"DELETE FROM plugin_user_views WHERE id = $1 AND tenant_id = $2 AND user_id = $3",
[view_id.into(), ctx.tenant_id.into(), ctx.user_id.into()],
))
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(Json(ApiResponse::ok(())))
}