Files
hms/crates/erp-plugin/src/handler/data_handler.rs
iven 62eea3d20d feat(auth,plugin): Q3 行级数据权限 — user_departments 表 + JWT 注入 department_ids + data_scope 接线
- 新增 user_departments 关联表(migration + entity)
- JWT 中间件查询用户部门并注入 TenantContext.department_ids
- role_permission entity 添加 data_scope 字段
- data_handler 接线行级数据权限过滤(list/count/aggregate)
- DataScopeParams + build_scope_sql + merge_scope_condition 实现全链路
2026-04-17 21:42:40 +08:00

551 lines
17 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::Extension;
use axum::extract::{FromRef, Path, Query, State};
use axum::response::Json;
use uuid::Uuid;
use erp_core::error::AppError;
use erp_core::rbac::require_permission;
use erp_core::types::{ApiResponse, PaginatedResponse, TenantContext};
use crate::data_dto::{
AggregateItem, AggregateQueryParams, BatchActionReq, CountQueryParams, CreatePluginDataReq,
PatchPluginDataReq, PluginDataListParams, PluginDataResp, TimeseriesItem, TimeseriesParams,
UpdatePluginDataReq,
};
use crate::data_service::{DataScopeParams, PluginDataService, resolve_manifest_id};
use crate::state::PluginState;
/// 获取当前用户对指定权限的 data_scope 等级
///
/// 查询 user_roles -> role_permissions -> permissions 链路,
/// 返回匹配权限的 data_scope 设置,默认 "all"。
async fn get_data_scope(
ctx: &TenantContext,
permission_code: &str,
db: &sea_orm::DatabaseConnection,
) -> Result<String, AppError> {
use sea_orm::{FromQueryResult, Statement};
#[derive(FromQueryResult)]
struct ScopeResult {
data_scope: Option<String>,
}
let result = ScopeResult::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"SELECT rp.data_scope
FROM user_roles ur
JOIN role_permissions rp ON rp.role_id = ur.role_id
JOIN permissions p ON p.id = rp.permission_id
WHERE ur.user_id = $1 AND ur.tenant_id = $2 AND p.code = $3
LIMIT 1"#,
[
ctx.user_id.into(),
ctx.tenant_id.into(),
permission_code.into(),
],
))
.one(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
Ok(result
.and_then(|r| r.data_scope)
.unwrap_or_else(|| "all".to_string()))
}
/// 获取部门成员 ID 列表
///
/// 当前返回 TenantContext 中的 department_ids。
/// 未来实现递归查询部门树时将支持 include_sub_depts 参数。
async fn get_dept_members(
ctx: &TenantContext,
_include_sub_depts: bool,
) -> Vec<Uuid> {
// 当前 department_ids 为空时返回空列表
// 未来实现递归查询部门树
if ctx.department_ids.is_empty() {
return vec![];
}
ctx.department_ids.clone()
}
/// 计算插件数据操作所需的权限码
/// 格式:{manifest_id}.{entity}.{action},如 erp-crm.customer.list
fn compute_permission_code(manifest_id: &str, entity_name: &str, action: &str) -> String {
let action_suffix = match action {
"list" | "get" => "list",
_ => "manage",
};
format!("{}.{}.{}", manifest_id, entity_name, action_suffix)
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}",
params(PluginDataListParams),
responses(
(status = 200, description = "成功", body = ApiResponse<PaginatedResponse<PluginDataResp>>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity} — 列表
pub async fn list_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<PluginDataListParams>,
) -> Result<Json<ApiResponse<PaginatedResponse<PluginDataResp>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 解析数据权限范围
let scope = resolve_data_scope(
&ctx, &manifest_id, &entity, &fine_perm, &state.db,
).await?;
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
.filter
.as_ref()
.and_then(|f| serde_json::from_str(f).ok());
let (items, total) = PluginDataService::list(
plugin_id,
&entity,
ctx.tenant_id,
page,
page_size,
&state.db,
filter,
params.search,
params.sort_by,
params.sort_order,
&state.entity_cache,
scope,
)
.await?;
Ok(Json(ApiResponse::ok(PaginatedResponse {
data: items,
total,
page,
page_size,
total_pages: (total as f64 / page_size as f64).ceil() as u64,
})))
}
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/{entity}",
request_body = CreatePluginDataReq,
responses(
(status = 200, description = "创建成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// POST /api/v1/plugins/{plugin_id}/{entity} — 创建
pub async fn create_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Json(req): Json<CreatePluginDataReq>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "create");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::create(
plugin_id,
&entity,
ctx.tenant_id,
ctx.user_id,
req.data,
&state.db,
&state.event_bus,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
responses(
(status = 200, description = "成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/{id} — 详情
pub async fn get_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "get");
require_permission(&ctx, &fine_perm)?;
let result =
PluginDataService::get_by_id(plugin_id, &entity, id, ctx.tenant_id, &state.db).await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
put,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
request_body = UpdatePluginDataReq,
responses(
(status = 200, description = "更新成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// PUT /api/v1/plugins/{plugin_id}/{entity}/{id} — 更新
pub async fn update_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
Json(req): Json<UpdatePluginDataReq>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "update");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::update(
plugin_id,
&entity,
id,
ctx.tenant_id,
ctx.user_id,
req.data,
req.version,
&state.db,
&state.event_bus,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
patch,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
request_body = PatchPluginDataReq,
responses(
(status = 200, description = "部分更新成功", body = ApiResponse<PluginDataResp>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// PATCH /api/v1/plugins/{plugin_id}/{entity}/{id} — 部分更新jsonb_set 合并字段)
pub async fn patch_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
Json(req): Json<PatchPluginDataReq>,
) -> Result<Json<ApiResponse<PluginDataResp>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "update");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::partial_update(
plugin_id, &entity, id, ctx.tenant_id, ctx.user_id,
req.data, req.version, &state.db,
).await?;
Ok(Json(ApiResponse::ok(result)))
}
#[utoipa::path(
delete,
path = "/api/v1/plugins/{plugin_id}/{entity}/{id}",
responses(
(status = 200, description = "删除成功"),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// DELETE /api/v1/plugins/{plugin_id}/{entity}/{id} — 删除
pub async fn delete_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity, id)): Path<(Uuid, String, Uuid)>,
) -> Result<Json<ApiResponse<()>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "delete");
require_permission(&ctx, &fine_perm)?;
PluginDataService::delete(
plugin_id,
&entity,
id,
ctx.tenant_id,
&state.db,
&state.event_bus,
)
.await?;
Ok(Json(ApiResponse::ok(())))
}
#[utoipa::path(
post,
path = "/api/v1/plugins/{plugin_id}/{entity}/batch",
request_body = BatchActionReq,
responses(
(status = 200, description = "批量操作成功", body = ApiResponse<u64>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// POST /api/v1/plugins/{plugin_id}/{entity}/batch — 批量操作 (batch_delete / batch_update)
pub async fn batch_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Json(req): Json<BatchActionReq>,
) -> Result<Json<ApiResponse<u64>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let action_perm = match req.action.as_str() {
"batch_delete" => "delete",
"batch_update" => "update",
_ => "update",
};
let fine_perm = compute_permission_code(&manifest_id, &entity, action_perm);
require_permission(&ctx, &fine_perm)?;
let affected = PluginDataService::batch(
plugin_id,
&entity,
ctx.tenant_id,
ctx.user_id,
req,
&state.db,
)
.await?;
Ok(Json(ApiResponse::ok(affected)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/count",
params(CountQueryParams),
responses(
(status = 200, description = "成功", body = ApiResponse<u64>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/count — 统计计数
pub async fn count_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<CountQueryParams>,
) -> Result<Json<ApiResponse<u64>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
.filter
.as_ref()
.and_then(|f| serde_json::from_str(f).ok());
let total = PluginDataService::count(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
filter,
params.search,
)
.await?;
Ok(Json(ApiResponse::ok(total)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/aggregate",
params(AggregateQueryParams),
responses(
(status = 200, description = "成功", body = ApiResponse<Vec<AggregateItem>>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/aggregate — 聚合查询
pub async fn aggregate_plugin_data<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<AggregateQueryParams>,
) -> Result<Json<ApiResponse<Vec<AggregateItem>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
// 解析 filter JSON
let filter: Option<serde_json::Value> = params
.filter
.as_ref()
.and_then(|f| serde_json::from_str(f).ok());
let rows = PluginDataService::aggregate(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
&params.group_by,
filter,
)
.await?;
let items = rows
.into_iter()
.map(|(key, count)| AggregateItem { key, count })
.collect();
Ok(Json(ApiResponse::ok(items)))
}
#[utoipa::path(
get,
path = "/api/v1/plugins/{plugin_id}/{entity}/timeseries",
params(TimeseriesParams),
responses(
(status = 200, description = "时间序列数据", body = ApiResponse<Vec<TimeseriesItem>>),
),
security(("bearer_auth" = [])),
tag = "插件数据"
)]
/// GET /api/v1/plugins/{plugin_id}/{entity}/timeseries — 时间序列聚合
pub async fn get_plugin_timeseries<S>(
State(state): State<PluginState>,
Extension(ctx): Extension<TenantContext>,
Path((plugin_id, entity)): Path<(Uuid, String)>,
Query(params): Query<TimeseriesParams>,
) -> Result<Json<ApiResponse<Vec<TimeseriesItem>>>, AppError>
where
PluginState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
require_permission(&ctx, &fine_perm)?;
let result = PluginDataService::timeseries(
plugin_id,
&entity,
ctx.tenant_id,
&state.db,
&params.time_field,
&params.time_grain,
params.start,
params.end,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
/// 解析数据权限范围 — 检查 entity 是否启用 data_scope
/// 若启用则查询用户对该权限的 scope 等级,返回 DataScopeParams。
async fn resolve_data_scope(
ctx: &TenantContext,
manifest_id: &str,
entity: &str,
fine_perm: &str,
db: &sea_orm::DatabaseConnection,
) -> Result<Option<DataScopeParams>, AppError> {
let entity_has_scope = check_entity_data_scope(manifest_id, entity, db).await?;
if !entity_has_scope {
return Ok(None);
}
let scope_level = get_data_scope(ctx, fine_perm, db).await?;
if scope_level == "all" {
return Ok(None);
}
let dept_members = get_dept_members(ctx, false).await;
Ok(Some(DataScopeParams {
scope_level,
user_id: ctx.user_id,
dept_member_ids: dept_members,
owner_field: "owner_id".to_string(),
}))
}
/// 查询 entity 定义是否启用了 data_scope
async fn check_entity_data_scope(
_manifest_id: &str,
entity_name: &str,
db: &sea_orm::DatabaseConnection,
) -> Result<bool, AppError> {
use crate::entity::plugin_entity;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let entity = plugin_entity::Entity::find()
.filter(plugin_entity::Column::EntityName.eq(entity_name))
.filter(plugin_entity::Column::DeletedAt.is_null())
.one(db)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
let Some(e) = entity else { return Ok(false) };
let schema: crate::manifest::PluginEntity =
serde_json::from_value(e.schema_json)
.map_err(|e| AppError::Internal(format!("解析 entity schema 失败: {}", e)))?;
Ok(schema.data_scope.unwrap_or(false))
}