feat(health): vital_signs_daily 日聚合表 + Entity + service
- 新增 vital_signs_daily 表迁移(带唯一索引 tenant+patient+device_type+date) - 新增 SeaORM Entity(含 percentile_95 统计字段) - 实现日聚合 service:从 hourly 聚合到 daily(支持 upsert) - 实现 aggregate_daily_for_all_tenants 多租户遍历聚合 - 实现 query_daily 范围查询 - 单元测试:percentile 计算验证
This commit is contained in:
@@ -42,4 +42,5 @@ pub mod offline_event_registration;
|
|||||||
pub mod medication_record;
|
pub mod medication_record;
|
||||||
pub mod medication_reminder;
|
pub mod medication_reminder;
|
||||||
pub mod vital_signs;
|
pub mod vital_signs;
|
||||||
|
pub mod vital_signs_daily;
|
||||||
pub mod vital_signs_hourly;
|
pub mod vital_signs_hourly;
|
||||||
|
|||||||
45
crates/erp-health/src/entity/vital_signs_daily.rs
Normal file
45
crates/erp-health/src/entity/vital_signs_daily.rs
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
use sea_orm::entity::prelude::*;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
|
||||||
|
#[sea_orm(table_name = "vital_signs_daily")]
|
||||||
|
pub struct Model {
|
||||||
|
#[sea_orm(primary_key, auto_increment = false)]
|
||||||
|
pub id: Uuid,
|
||||||
|
pub tenant_id: Uuid,
|
||||||
|
pub patient_id: Uuid,
|
||||||
|
pub device_type: String,
|
||||||
|
pub date_bucket: chrono::NaiveDate,
|
||||||
|
#[sea_orm(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub min_val: Option<f64>,
|
||||||
|
#[sea_orm(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub max_val: Option<f64>,
|
||||||
|
pub avg_val: f64,
|
||||||
|
pub sample_count: i32,
|
||||||
|
#[sea_orm(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub percentile_95: Option<f64>,
|
||||||
|
pub created_at: DateTimeUtc,
|
||||||
|
pub updated_at: DateTimeUtc,
|
||||||
|
pub version: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
pub enum Relation {
|
||||||
|
#[sea_orm(
|
||||||
|
belongs_to = "super::patient::Entity",
|
||||||
|
from = "Column::PatientId",
|
||||||
|
to = "super::patient::Column::Id"
|
||||||
|
)]
|
||||||
|
Patient,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Related<super::patient::Entity> for Entity {
|
||||||
|
fn to() -> RelationDef {
|
||||||
|
Relation::Patient.def()
|
||||||
|
}
|
||||||
|
fn via() -> Option<RelationDef> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActiveModelBehavior for ActiveModel {}
|
||||||
@@ -29,4 +29,5 @@ pub mod seed;
|
|||||||
pub mod stats_service;
|
pub mod stats_service;
|
||||||
pub mod trend_service;
|
pub mod trend_service;
|
||||||
pub mod trend_stats;
|
pub mod trend_stats;
|
||||||
|
pub mod vital_signs_daily_service;
|
||||||
pub mod validation;
|
pub mod validation;
|
||||||
|
|||||||
167
crates/erp-health/src/service/vital_signs_daily_service.rs
Normal file
167
crates/erp-health/src/service/vital_signs_daily_service.rs
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
use chrono::NaiveDate;
|
||||||
|
use sea_orm::*;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::entity::vital_signs_daily;
|
||||||
|
use crate::error::HealthResult;
|
||||||
|
|
||||||
|
/// 从 vital_signs_hourly 聚合指定日期的数据到 vital_signs_daily
|
||||||
|
pub async fn aggregate_daily(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
tenant_id: Uuid,
|
||||||
|
date: NaiveDate,
|
||||||
|
) -> HealthResult<u64> {
|
||||||
|
let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
|
||||||
|
let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc();
|
||||||
|
|
||||||
|
let hourly_rows = crate::entity::vital_signs_hourly::Entity::find()
|
||||||
|
.filter(crate::entity::vital_signs_hourly::Column::TenantId.eq(tenant_id))
|
||||||
|
.filter(crate::entity::vital_signs_hourly::Column::HourStart.gte(start_of_day))
|
||||||
|
.filter(crate::entity::vital_signs_hourly::Column::HourStart.lte(end_of_day))
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut grouped: std::collections::HashMap<(Uuid, String), Vec<_>> =
|
||||||
|
std::collections::HashMap::new();
|
||||||
|
for row in &hourly_rows {
|
||||||
|
let key = (row.patient_id, row.device_type.clone());
|
||||||
|
grouped.entry(key).or_default().push(row.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut upserted = 0u64;
|
||||||
|
for ((patient_id, device_type), rows) in grouped {
|
||||||
|
let avg_val = rows.iter().map(|r| r.avg_val).sum::<f64>() / rows.len() as f64;
|
||||||
|
let min_val = rows.iter().filter_map(|r| r.min_val).reduce(f64::min);
|
||||||
|
let max_val = rows.iter().filter_map(|r| r.max_val).reduce(f64::max);
|
||||||
|
let sample_count: i32 = rows.iter().map(|r| r.sample_count).sum();
|
||||||
|
let all_avgs: Vec<f64> = rows.iter().map(|r| r.avg_val).collect();
|
||||||
|
let percentile_95 = if all_avgs.len() >= 2 {
|
||||||
|
Some(calculate_percentile(&all_avgs, 95.0))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = vital_signs_daily::Entity::insert(vital_signs_daily::ActiveModel {
|
||||||
|
id: Set(Uuid::now_v7()),
|
||||||
|
tenant_id: Set(tenant_id),
|
||||||
|
patient_id: Set(patient_id),
|
||||||
|
device_type: Set(device_type),
|
||||||
|
date_bucket: Set(date),
|
||||||
|
min_val: Set(min_val),
|
||||||
|
max_val: Set(max_val),
|
||||||
|
avg_val: Set(avg_val),
|
||||||
|
sample_count: Set(sample_count),
|
||||||
|
percentile_95: Set(percentile_95),
|
||||||
|
created_at: Set(chrono::Utc::now()),
|
||||||
|
updated_at: Set(chrono::Utc::now()),
|
||||||
|
version: Set(1),
|
||||||
|
})
|
||||||
|
.on_conflict(
|
||||||
|
sea_orm::sea_query::OnConflict::columns([
|
||||||
|
vital_signs_daily::Column::TenantId,
|
||||||
|
vital_signs_daily::Column::PatientId,
|
||||||
|
vital_signs_daily::Column::DeviceType,
|
||||||
|
vital_signs_daily::Column::DateBucket,
|
||||||
|
])
|
||||||
|
.update_columns([
|
||||||
|
vital_signs_daily::Column::MinVal,
|
||||||
|
vital_signs_daily::Column::MaxVal,
|
||||||
|
vital_signs_daily::Column::AvgVal,
|
||||||
|
vital_signs_daily::Column::SampleCount,
|
||||||
|
vital_signs_daily::Column::Percentile95,
|
||||||
|
vital_signs_daily::Column::UpdatedAt,
|
||||||
|
])
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
upserted += 1;
|
||||||
|
let _ = result;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(upserted)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 遍历所有租户执行日聚合
|
||||||
|
pub async fn aggregate_daily_for_all_tenants(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
date: NaiveDate,
|
||||||
|
) -> HealthResult<u64> {
|
||||||
|
let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
|
||||||
|
let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc();
|
||||||
|
|
||||||
|
let hourly_rows = crate::entity::vital_signs_hourly::Entity::find()
|
||||||
|
.filter(crate::entity::vital_signs_hourly::Column::HourStart.gte(start_of_day))
|
||||||
|
.filter(crate::entity::vital_signs_hourly::Column::HourStart.lte(end_of_day))
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let tenant_ids: std::collections::HashSet<Uuid> =
|
||||||
|
hourly_rows.iter().map(|r| r.tenant_id).collect();
|
||||||
|
|
||||||
|
let mut total = 0u64;
|
||||||
|
for tenant_id in tenant_ids {
|
||||||
|
total += aggregate_daily(db, tenant_id, date).await?;
|
||||||
|
}
|
||||||
|
Ok(total)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 计算百分位数
|
||||||
|
fn calculate_percentile(values: &[f64], percentile: f64) -> f64 {
|
||||||
|
let mut sorted = values.to_vec();
|
||||||
|
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
|
||||||
|
let idx = (percentile / 100.0 * (sorted.len() - 1) as f64).ceil() as usize;
|
||||||
|
sorted[idx.min(sorted.len() - 1)]
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 查询日聚合数据
|
||||||
|
pub async fn query_daily(
|
||||||
|
db: &DatabaseConnection,
|
||||||
|
tenant_id: Uuid,
|
||||||
|
patient_id: Option<Uuid>,
|
||||||
|
device_type: Option<String>,
|
||||||
|
start_date: NaiveDate,
|
||||||
|
end_date: NaiveDate,
|
||||||
|
) -> HealthResult<Vec<vital_signs_daily::Model>> {
|
||||||
|
let mut query = vital_signs_daily::Entity::find()
|
||||||
|
.filter(vital_signs_daily::Column::TenantId.eq(tenant_id))
|
||||||
|
.filter(vital_signs_daily::Column::DateBucket.gte(start_date))
|
||||||
|
.filter(vital_signs_daily::Column::DateBucket.lte(end_date));
|
||||||
|
|
||||||
|
if let Some(pid) = patient_id {
|
||||||
|
query = query.filter(vital_signs_daily::Column::PatientId.eq(pid));
|
||||||
|
}
|
||||||
|
if let Some(dt) = device_type {
|
||||||
|
query = query.filter(vital_signs_daily::Column::DeviceType.eq(dt));
|
||||||
|
}
|
||||||
|
|
||||||
|
let results = query.all(db).await?;
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_calculate_percentile_95() {
|
||||||
|
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
|
||||||
|
let p95 = calculate_percentile(&values, 95.0);
|
||||||
|
assert!(p95 > 9.0 && p95 <= 10.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_calculate_percentile_edge_single() {
|
||||||
|
let values = vec![5.0];
|
||||||
|
let p95 = calculate_percentile(&values, 95.0);
|
||||||
|
assert_eq!(p95, 5.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_calculate_percentile_two_values() {
|
||||||
|
let values = vec![10.0, 20.0];
|
||||||
|
let p95 = calculate_percentile(&values, 95.0);
|
||||||
|
assert_eq!(p95, 20.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -103,6 +103,7 @@ mod m20260501_000100_seed_action_inbox_menu;
|
|||||||
mod m20260502_000101_seed_health_dictionaries;
|
mod m20260502_000101_seed_health_dictionaries;
|
||||||
mod m20260502_000102_seed_warning_thresholds;
|
mod m20260502_000102_seed_warning_thresholds;
|
||||||
mod m20260502_000103_seed_follow_up_template_menu;
|
mod m20260502_000103_seed_follow_up_template_menu;
|
||||||
|
mod m20260504_000104_create_vital_signs_daily;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@@ -213,6 +214,7 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20260502_000101_seed_health_dictionaries::Migration),
|
Box::new(m20260502_000101_seed_health_dictionaries::Migration),
|
||||||
Box::new(m20260502_000102_seed_warning_thresholds::Migration),
|
Box::new(m20260502_000102_seed_warning_thresholds::Migration),
|
||||||
Box::new(m20260502_000103_seed_follow_up_template_menu::Migration),
|
Box::new(m20260502_000103_seed_follow_up_template_menu::Migration),
|
||||||
|
Box::new(m20260504_000104_create_vital_signs_daily::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,73 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.create_table(
|
||||||
|
Table::create()
|
||||||
|
.table(Alias::new("vital_signs_daily"))
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Alias::new("id"))
|
||||||
|
.uuid()
|
||||||
|
.not_null()
|
||||||
|
.default(Expr::val("gen_random_uuid()")),
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(Alias::new("tenant_id")).uuid().not_null())
|
||||||
|
.col(ColumnDef::new(Alias::new("patient_id")).uuid().not_null())
|
||||||
|
.col(ColumnDef::new(Alias::new("device_type")).string().not_null())
|
||||||
|
.col(ColumnDef::new(Alias::new("date_bucket")).date().not_null())
|
||||||
|
.col(ColumnDef::new(Alias::new("min_val")).double())
|
||||||
|
.col(ColumnDef::new(Alias::new("max_val")).double())
|
||||||
|
.col(ColumnDef::new(Alias::new("avg_val")).double().not_null())
|
||||||
|
.col(ColumnDef::new(Alias::new("sample_count")).integer().not_null())
|
||||||
|
.col(ColumnDef::new(Alias::new("percentile_95")).double())
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Alias::new("created_at"))
|
||||||
|
.timestamp_with_time_zone()
|
||||||
|
.not_null()
|
||||||
|
.default(Expr::val("NOW()")),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Alias::new("updated_at"))
|
||||||
|
.timestamp_with_time_zone()
|
||||||
|
.not_null()
|
||||||
|
.default(Expr::val("NOW()")),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Alias::new("version"))
|
||||||
|
.integer()
|
||||||
|
.not_null()
|
||||||
|
.default(1),
|
||||||
|
)
|
||||||
|
.primary_key(Index::create().col(Alias::new("id")))
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
manager
|
||||||
|
.create_index(
|
||||||
|
Index::create()
|
||||||
|
.name("idx_vital_signs_daily_unique")
|
||||||
|
.table(Alias::new("vital_signs_daily"))
|
||||||
|
.col(Alias::new("tenant_id"))
|
||||||
|
.col(Alias::new("patient_id"))
|
||||||
|
.col(Alias::new("device_type"))
|
||||||
|
.col(Alias::new("date_bucket"))
|
||||||
|
.unique()
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.drop_table(Table::drop().table(Alias::new("vital_signs_daily")).to_owned())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user