diff --git a/crates/erp-health/src/entity/mod.rs b/crates/erp-health/src/entity/mod.rs index 62acc9e..cf18c4a 100644 --- a/crates/erp-health/src/entity/mod.rs +++ b/crates/erp-health/src/entity/mod.rs @@ -42,4 +42,5 @@ pub mod offline_event_registration; pub mod medication_record; pub mod medication_reminder; pub mod vital_signs; +pub mod vital_signs_daily; pub mod vital_signs_hourly; diff --git a/crates/erp-health/src/entity/vital_signs_daily.rs b/crates/erp-health/src/entity/vital_signs_daily.rs new file mode 100644 index 0000000..76094db --- /dev/null +++ b/crates/erp-health/src/entity/vital_signs_daily.rs @@ -0,0 +1,45 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "vital_signs_daily")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub tenant_id: Uuid, + pub patient_id: Uuid, + pub device_type: String, + pub date_bucket: chrono::NaiveDate, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub min_val: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub max_val: Option, + pub avg_val: f64, + pub sample_count: i32, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub percentile_95: Option, + pub created_at: DateTimeUtc, + pub updated_at: DateTimeUtc, + pub version: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::patient::Entity", + from = "Column::PatientId", + to = "super::patient::Column::Id" + )] + Patient, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Patient.def() + } + fn via() -> Option { + None + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/erp-health/src/service/mod.rs b/crates/erp-health/src/service/mod.rs index 1b89bbd..94e2b77 100644 --- a/crates/erp-health/src/service/mod.rs +++ b/crates/erp-health/src/service/mod.rs @@ -29,4 +29,5 @@ pub mod seed; pub mod stats_service; pub mod trend_service; pub mod trend_stats; +pub mod vital_signs_daily_service; pub mod validation; diff --git a/crates/erp-health/src/service/vital_signs_daily_service.rs b/crates/erp-health/src/service/vital_signs_daily_service.rs new file mode 100644 index 0000000..9028e2d --- /dev/null +++ b/crates/erp-health/src/service/vital_signs_daily_service.rs @@ -0,0 +1,167 @@ +use chrono::NaiveDate; +use sea_orm::*; +use uuid::Uuid; + +use crate::entity::vital_signs_daily; +use crate::error::HealthResult; + +/// 从 vital_signs_hourly 聚合指定日期的数据到 vital_signs_daily +pub async fn aggregate_daily( + db: &DatabaseConnection, + tenant_id: Uuid, + date: NaiveDate, +) -> HealthResult { + let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc(); + let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc(); + + let hourly_rows = crate::entity::vital_signs_hourly::Entity::find() + .filter(crate::entity::vital_signs_hourly::Column::TenantId.eq(tenant_id)) + .filter(crate::entity::vital_signs_hourly::Column::HourStart.gte(start_of_day)) + .filter(crate::entity::vital_signs_hourly::Column::HourStart.lte(end_of_day)) + .all(db) + .await?; + + let mut grouped: std::collections::HashMap<(Uuid, String), Vec<_>> = + std::collections::HashMap::new(); + for row in &hourly_rows { + let key = (row.patient_id, row.device_type.clone()); + grouped.entry(key).or_default().push(row.clone()); + } + + let mut upserted = 0u64; + for ((patient_id, device_type), rows) in grouped { + let avg_val = rows.iter().map(|r| r.avg_val).sum::() / rows.len() as f64; + let min_val = rows.iter().filter_map(|r| r.min_val).reduce(f64::min); + let max_val = rows.iter().filter_map(|r| r.max_val).reduce(f64::max); + let sample_count: i32 = rows.iter().map(|r| r.sample_count).sum(); + let all_avgs: Vec = rows.iter().map(|r| r.avg_val).collect(); + let percentile_95 = if all_avgs.len() >= 2 { + Some(calculate_percentile(&all_avgs, 95.0)) + } else { + None + }; + + let result = vital_signs_daily::Entity::insert(vital_signs_daily::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + patient_id: Set(patient_id), + device_type: Set(device_type), + date_bucket: Set(date), + min_val: Set(min_val), + max_val: Set(max_val), + avg_val: Set(avg_val), + sample_count: Set(sample_count), + percentile_95: Set(percentile_95), + created_at: Set(chrono::Utc::now()), + updated_at: Set(chrono::Utc::now()), + version: Set(1), + }) + .on_conflict( + sea_orm::sea_query::OnConflict::columns([ + vital_signs_daily::Column::TenantId, + vital_signs_daily::Column::PatientId, + vital_signs_daily::Column::DeviceType, + vital_signs_daily::Column::DateBucket, + ]) + .update_columns([ + vital_signs_daily::Column::MinVal, + vital_signs_daily::Column::MaxVal, + vital_signs_daily::Column::AvgVal, + vital_signs_daily::Column::SampleCount, + vital_signs_daily::Column::Percentile95, + vital_signs_daily::Column::UpdatedAt, + ]) + .to_owned(), + ) + .exec(db) + .await?; + + upserted += 1; + let _ = result; + } + + Ok(upserted) +} + +/// 遍历所有租户执行日聚合 +pub async fn aggregate_daily_for_all_tenants( + db: &DatabaseConnection, + date: NaiveDate, +) -> HealthResult { + let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc(); + let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc(); + + let hourly_rows = crate::entity::vital_signs_hourly::Entity::find() + .filter(crate::entity::vital_signs_hourly::Column::HourStart.gte(start_of_day)) + .filter(crate::entity::vital_signs_hourly::Column::HourStart.lte(end_of_day)) + .all(db) + .await?; + + let tenant_ids: std::collections::HashSet = + hourly_rows.iter().map(|r| r.tenant_id).collect(); + + let mut total = 0u64; + for tenant_id in tenant_ids { + total += aggregate_daily(db, tenant_id, date).await?; + } + Ok(total) +} + +/// 计算百分位数 +fn calculate_percentile(values: &[f64], percentile: f64) -> f64 { + let mut sorted = values.to_vec(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let idx = (percentile / 100.0 * (sorted.len() - 1) as f64).ceil() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +/// 查询日聚合数据 +pub async fn query_daily( + db: &DatabaseConnection, + tenant_id: Uuid, + patient_id: Option, + device_type: Option, + start_date: NaiveDate, + end_date: NaiveDate, +) -> HealthResult> { + let mut query = vital_signs_daily::Entity::find() + .filter(vital_signs_daily::Column::TenantId.eq(tenant_id)) + .filter(vital_signs_daily::Column::DateBucket.gte(start_date)) + .filter(vital_signs_daily::Column::DateBucket.lte(end_date)); + + if let Some(pid) = patient_id { + query = query.filter(vital_signs_daily::Column::PatientId.eq(pid)); + } + if let Some(dt) = device_type { + query = query.filter(vital_signs_daily::Column::DeviceType.eq(dt)); + } + + let results = query.all(db).await?; + Ok(results) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_calculate_percentile_95() { + let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let p95 = calculate_percentile(&values, 95.0); + assert!(p95 > 9.0 && p95 <= 10.0); + } + + #[test] + fn test_calculate_percentile_edge_single() { + let values = vec![5.0]; + let p95 = calculate_percentile(&values, 95.0); + assert_eq!(p95, 5.0); + } + + #[test] + fn test_calculate_percentile_two_values() { + let values = vec![10.0, 20.0]; + let p95 = calculate_percentile(&values, 95.0); + assert_eq!(p95, 20.0); + } +} diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index f0fbf75..326aa80 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -103,6 +103,7 @@ mod m20260501_000100_seed_action_inbox_menu; mod m20260502_000101_seed_health_dictionaries; mod m20260502_000102_seed_warning_thresholds; mod m20260502_000103_seed_follow_up_template_menu; +mod m20260504_000104_create_vital_signs_daily; pub struct Migrator; @@ -213,6 +214,7 @@ impl MigratorTrait for Migrator { Box::new(m20260502_000101_seed_health_dictionaries::Migration), Box::new(m20260502_000102_seed_warning_thresholds::Migration), Box::new(m20260502_000103_seed_follow_up_template_menu::Migration), + Box::new(m20260504_000104_create_vital_signs_daily::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260504_000104_create_vital_signs_daily.rs b/crates/erp-server/migration/src/m20260504_000104_create_vital_signs_daily.rs new file mode 100644 index 0000000..7ed4d26 --- /dev/null +++ b/crates/erp-server/migration/src/m20260504_000104_create_vital_signs_daily.rs @@ -0,0 +1,73 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Alias::new("vital_signs_daily")) + .col( + ColumnDef::new(Alias::new("id")) + .uuid() + .not_null() + .default(Expr::val("gen_random_uuid()")), + ) + .col(ColumnDef::new(Alias::new("tenant_id")).uuid().not_null()) + .col(ColumnDef::new(Alias::new("patient_id")).uuid().not_null()) + .col(ColumnDef::new(Alias::new("device_type")).string().not_null()) + .col(ColumnDef::new(Alias::new("date_bucket")).date().not_null()) + .col(ColumnDef::new(Alias::new("min_val")).double()) + .col(ColumnDef::new(Alias::new("max_val")).double()) + .col(ColumnDef::new(Alias::new("avg_val")).double().not_null()) + .col(ColumnDef::new(Alias::new("sample_count")).integer().not_null()) + .col(ColumnDef::new(Alias::new("percentile_95")).double()) + .col( + ColumnDef::new(Alias::new("created_at")) + .timestamp_with_time_zone() + .not_null() + .default(Expr::val("NOW()")), + ) + .col( + ColumnDef::new(Alias::new("updated_at")) + .timestamp_with_time_zone() + .not_null() + .default(Expr::val("NOW()")), + ) + .col( + ColumnDef::new(Alias::new("version")) + .integer() + .not_null() + .default(1), + ) + .primary_key(Index::create().col(Alias::new("id"))) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_vital_signs_daily_unique") + .table(Alias::new("vital_signs_daily")) + .col(Alias::new("tenant_id")) + .col(Alias::new("patient_id")) + .col(Alias::new("device_type")) + .col(Alias::new("date_bucket")) + .unique() + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Alias::new("vital_signs_daily")).to_owned()) + .await + } +}