feat(plugin): timeseries 聚合 API — date_trunc 时间序列
This commit is contained in:
@@ -83,3 +83,25 @@ pub struct BatchActionReq {
|
||||
/// batch_update 时的更新数据
|
||||
pub data: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// 时间序列查询参数
|
||||
#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams)]
|
||||
pub struct TimeseriesParams {
|
||||
/// 时间字段名
|
||||
pub time_field: String,
|
||||
/// 时间粒度: "day" / "week" / "month"
|
||||
pub time_grain: String,
|
||||
/// 开始日期 (ISO)
|
||||
pub start: Option<String>,
|
||||
/// 结束日期 (ISO)
|
||||
pub end: Option<String>,
|
||||
}
|
||||
|
||||
/// 时间序列数据项
|
||||
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct TimeseriesItem {
|
||||
/// 时间周期
|
||||
pub period: String,
|
||||
/// 计数
|
||||
pub count: i64,
|
||||
}
|
||||
|
||||
@@ -581,6 +581,52 @@ impl PluginDataService {
|
||||
// TODO: 未来版本添加 Redis 缓存层
|
||||
Self::aggregate(plugin_id, entity_name, tenant_id, db, group_by_field, filter).await
|
||||
}
|
||||
|
||||
/// 时间序列聚合 — 按时间字段截断为 day/week/month 统计计数
|
||||
pub async fn timeseries(
|
||||
plugin_id: Uuid,
|
||||
entity_name: &str,
|
||||
tenant_id: Uuid,
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
time_field: &str,
|
||||
time_grain: &str,
|
||||
start: Option<String>,
|
||||
end: Option<String>,
|
||||
) -> AppResult<Vec<crate::data_dto::TimeseriesItem>> {
|
||||
let info = resolve_entity_info(plugin_id, entity_name, tenant_id, db).await?;
|
||||
|
||||
let (sql, values) = DynamicTableManager::build_timeseries_sql(
|
||||
&info.table_name,
|
||||
tenant_id,
|
||||
time_field,
|
||||
time_grain,
|
||||
start.as_deref(),
|
||||
end.as_deref(),
|
||||
)
|
||||
.map_err(|e| AppError::Validation(e))?;
|
||||
|
||||
#[derive(FromQueryResult)]
|
||||
struct TsRow {
|
||||
period: Option<String>,
|
||||
count: i64,
|
||||
}
|
||||
|
||||
let rows = TsRow::find_by_statement(Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
sql,
|
||||
values,
|
||||
))
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|r| crate::data_dto::TimeseriesItem {
|
||||
period: r.period.unwrap_or_default(),
|
||||
count: r.count,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// 从 plugins 表解析 manifest metadata.id(如 "erp-crm")
|
||||
|
||||
@@ -783,6 +783,61 @@ impl DynamicTableManager {
|
||||
|
||||
Ok((sql, values))
|
||||
}
|
||||
|
||||
/// 构建时间序列查询 SQL
|
||||
pub fn build_timeseries_sql(
|
||||
table_name: &str,
|
||||
tenant_id: Uuid,
|
||||
time_field: &str,
|
||||
time_grain: &str,
|
||||
start: Option<&str>,
|
||||
end: Option<&str>,
|
||||
) -> Result<(String, Vec<Value>), String> {
|
||||
let clean_field = sanitize_identifier(time_field);
|
||||
let grain = match time_grain {
|
||||
"day" | "week" | "month" => time_grain,
|
||||
_ => return Err(format!("不支持的 time_grain: {}", time_grain)),
|
||||
};
|
||||
|
||||
let mut conditions = vec![
|
||||
"\"tenant_id\" = $1".to_string(),
|
||||
"\"deleted_at\" IS NULL".to_string(),
|
||||
];
|
||||
let mut values: Vec<Value> = vec![tenant_id.into()];
|
||||
let mut param_idx = 2;
|
||||
|
||||
if let Some(s) = start {
|
||||
conditions.push(format!(
|
||||
"(data->>'{}')::timestamp >= ${}",
|
||||
clean_field, param_idx
|
||||
));
|
||||
values.push(Value::String(Some(Box::new(s.to_string()))));
|
||||
param_idx += 1;
|
||||
}
|
||||
if let Some(e) = end {
|
||||
conditions.push(format!(
|
||||
"(data->>'{}')::timestamp < ${}",
|
||||
clean_field, param_idx
|
||||
));
|
||||
values.push(Value::String(Some(Box::new(e.to_string()))));
|
||||
}
|
||||
|
||||
let sql = format!(
|
||||
"SELECT to_char(date_trunc('{}', (data->>'{}')::timestamp), 'YYYY-MM-DD') as period, \
|
||||
COUNT(*) as count \
|
||||
FROM \"{}\" WHERE {} \
|
||||
GROUP BY date_trunc('{}', (data->>'{}')::timestamp) \
|
||||
ORDER BY period",
|
||||
grain,
|
||||
clean_field,
|
||||
table_name,
|
||||
conditions.join(" AND "),
|
||||
grain,
|
||||
clean_field,
|
||||
);
|
||||
|
||||
Ok((sql, values))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1375,4 +1430,67 @@ mod tests {
|
||||
sql
|
||||
);
|
||||
}
|
||||
|
||||
// ===== build_timeseries_sql 测试 =====
|
||||
|
||||
#[test]
|
||||
fn test_build_timeseries_sql_day_grain() {
|
||||
let (sql, values) = DynamicTableManager::build_timeseries_sql(
|
||||
"plugin_test",
|
||||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||||
"occurred_at",
|
||||
"day",
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(sql.contains("date_trunc('day'"), "应有 day 粒度");
|
||||
assert!(sql.contains("GROUP BY"), "应有 GROUP BY");
|
||||
assert!(sql.contains("ORDER BY period"), "应按 period 排序");
|
||||
assert_eq!(values.len(), 1, "仅 tenant_id");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_timeseries_sql_month_grain() {
|
||||
let (sql, _) = DynamicTableManager::build_timeseries_sql(
|
||||
"plugin_test",
|
||||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||||
"created_date",
|
||||
"month",
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(sql.contains("date_trunc('month'"), "应有 month 粒度");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_timeseries_sql_with_date_range() {
|
||||
let (sql, values) = DynamicTableManager::build_timeseries_sql(
|
||||
"plugin_test",
|
||||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||||
"occurred_at",
|
||||
"week",
|
||||
Some("2026-01-01"),
|
||||
Some("2026-04-01"),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(sql.contains("date_trunc('week'"), "应有 week 粒度");
|
||||
assert!(sql.contains(">="), "应有 start 条件");
|
||||
assert!(sql.contains("<"), "应有 end 条件");
|
||||
assert_eq!(values.len(), 3, "tenant_id + start + end");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_timeseries_sql_invalid_grain() {
|
||||
let result = DynamicTableManager::build_timeseries_sql(
|
||||
"plugin_test",
|
||||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||||
"occurred_at",
|
||||
"hour",
|
||||
None,
|
||||
None,
|
||||
);
|
||||
assert!(result.is_err(), "不支持的 grain 应报错");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,8 @@ use erp_core::types::{ApiResponse, PaginatedResponse, TenantContext};
|
||||
|
||||
use crate::data_dto::{
|
||||
AggregateItem, AggregateQueryParams, BatchActionReq, CountQueryParams, CreatePluginDataReq,
|
||||
PatchPluginDataReq, PluginDataListParams, PluginDataResp, UpdatePluginDataReq,
|
||||
PatchPluginDataReq, PluginDataListParams, PluginDataResp, TimeseriesItem, TimeseriesParams,
|
||||
UpdatePluginDataReq,
|
||||
};
|
||||
use crate::data_service::{PluginDataService, resolve_manifest_id};
|
||||
use crate::state::PluginState;
|
||||
@@ -457,3 +458,43 @@ where
|
||||
|
||||
Ok(Json(ApiResponse::ok(items)))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/api/v1/plugins/{plugin_id}/{entity}/timeseries",
|
||||
params(TimeseriesParams),
|
||||
responses(
|
||||
(status = 200, description = "时间序列数据", body = ApiResponse<Vec<TimeseriesItem>>),
|
||||
),
|
||||
security(("bearer_auth" = [])),
|
||||
tag = "插件数据"
|
||||
)]
|
||||
/// GET /api/v1/plugins/{plugin_id}/{entity}/timeseries — 时间序列聚合
|
||||
pub async fn get_plugin_timeseries<S>(
|
||||
State(state): State<PluginState>,
|
||||
Extension(ctx): Extension<TenantContext>,
|
||||
Path((plugin_id, entity)): Path<(Uuid, String)>,
|
||||
Query(params): Query<TimeseriesParams>,
|
||||
) -> Result<Json<ApiResponse<Vec<TimeseriesItem>>>, AppError>
|
||||
where
|
||||
PluginState: FromRef<S>,
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
let manifest_id = resolve_manifest_id(plugin_id, ctx.tenant_id, &state.db).await?;
|
||||
let fine_perm = compute_permission_code(&manifest_id, &entity, "list");
|
||||
require_permission(&ctx, &fine_perm)?;
|
||||
|
||||
let result = PluginDataService::timeseries(
|
||||
plugin_id,
|
||||
&entity,
|
||||
ctx.tenant_id,
|
||||
&state.db,
|
||||
¶ms.time_field,
|
||||
¶ms.time_grain,
|
||||
params.start,
|
||||
params.end,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Json(ApiResponse::ok(result)))
|
||||
}
|
||||
|
||||
@@ -91,6 +91,11 @@ impl PluginModule {
|
||||
.route(
|
||||
"/plugins/{plugin_id}/{entity}/batch",
|
||||
post(crate::handler::data_handler::batch_plugin_data::<S>),
|
||||
)
|
||||
// 时间序列路由
|
||||
.route(
|
||||
"/plugins/{plugin_id}/{entity}/timeseries",
|
||||
get(crate::handler::data_handler::get_plugin_timeseries::<S>),
|
||||
);
|
||||
|
||||
admin_routes.merge(data_routes)
|
||||
|
||||
Reference in New Issue
Block a user