From feab61b13264164e7c829110986537473932f873 Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 4 May 2026 01:14:15 +0800 Subject: [PATCH] =?UTF-8?q?docs(plan):=20IoT=20+=20FHIR=20V1=20Plan=201=20?= =?UTF-8?q?=E2=80=94=20=E6=95=B0=E6=8D=AE=E5=B1=82=E5=A2=9E=E5=BC=BA?= =?UTF-8?q?=E5=AE=9E=E6=96=BD=E8=AE=A1=E5=88=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 6 个 Task:vital_signs_daily 表迁移 + Entity + Service + patient_devices 增强 + 日聚合 background task + 查询 API。 TDD 流程,每步有具体代码和验证命令。 --- ...2026-05-04-iot-fhir-v1-plan1-data-layer.md | 652 ++++++++++++++++++ 1 file changed, 652 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-04-iot-fhir-v1-plan1-data-layer.md diff --git a/docs/superpowers/plans/2026-05-04-iot-fhir-v1-plan1-data-layer.md b/docs/superpowers/plans/2026-05-04-iot-fhir-v1-plan1-data-layer.md new file mode 100644 index 0000000..dce6af0 --- /dev/null +++ b/docs/superpowers/plans/2026-05-04-iot-fhir-v1-plan1-data-layer.md @@ -0,0 +1,652 @@ +# IoT + FHIR V1 Plan 1: 数据层增强 + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 新增 vital_signs_daily 日聚合表、增强 patient_devices 表字段、实现日聚合 background task,为 FHIR API 和监控看板提供数据基础。 + +**Architecture:** 在现有 erp-health 模块内扩展。新增 SeaORM Entity + Migration 对应日聚合表;为 patient_devices 添加迁移扩展列;日聚合任务复用现有 `tokio::spawn` + interval 模式,从 vital_signs_hourly 聚合为日级数据。 + +**Tech Stack:** Rust / SeaORM / PostgreSQL / tokio / Axum + +**Spec:** `docs/superpowers/specs/2026-05-04-iot-fhir-platform-ecosystem-design.md` §3 + +--- + +## Chunk 1: vital_signs_daily 表 + Entity + +### Task 1: 日聚合表迁移 + +**Files:** +- Create: `crates/erp-server/migration/src/m20260504_000105_create_vital_signs_daily.rs` +- Modify: `crates/erp-server/migration/src/lib.rs` + +- [ ] **Step 1: 创建迁移文件** + +```rust +// crates/erp-server/migration/src/m20260504_000105_create_vital_signs_daily.rs +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Alias::new("vital_signs_daily")) + .col( + ColumnDef::new(Alias::new("id")) + .uuid() + .not_null() + .default(Expr::val("gen_random_uuid()")), + ) + .col(ColumnDef::new(Alias::new("tenant_id")).uuid().not_null()) + .col(ColumnDef::new(Alias::new("patient_id")).uuid().not_null()) + .col(ColumnDef::new(Alias::new("device_type")).string().not_null()) + .col(ColumnDef::new(Alias::new("date_bucket")).date().not_null()) + .col(ColumnDef::new(Alias::new("min_val")).double()) + .col(ColumnDef::new(Alias::new("max_val")).double()) + .col(ColumnDef::new(Alias::new("avg_val")).double().not_null()) + .col(ColumnDef::new(Alias::new("sample_count")).integer().not_null()) + .col(ColumnDef::new(Alias::new("percentile_95")).double()) + .col( + ColumnDef::new(Alias::new("created_at")) + .timestamp_with_time_zone() + .not_null() + .default(Expr::val("NOW()")), + ) + .col( + ColumnDef::new(Alias::new("updated_at")) + .timestamp_with_time_zone() + .not_null() + .default(Expr::val("NOW()")), + ) + .col( + ColumnDef::new(Alias::new("version")) + .integer() + .not_null() + .default(1), + ) + .primary_key(Index::create().col(Alias::new("id"))) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_vital_signs_daily_unique") + .table(Alias::new("vital_signs_daily")) + .col(Alias::new("tenant_id")) + .col(Alias::new("patient_id")) + .col(Alias::new("device_type")) + .col(Alias::new("date_bucket")) + .unique() + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Alias::new("vital_signs_daily")).to_owned()) + .await + } +} +``` + +- [ ] **Step 2: 注册迁移** + +在 `crates/erp-server/migration/src/lib.rs` 顶部添加: +```rust +mod m20260504_000105_create_vital_signs_daily; +``` + +在 `Migrator::migrations()` 的 `vec![]` 末尾追加: +```rust +Box::new(m20260504_000105_create_vital_signs_daily::Migration), +``` + +- [ ] **Step 3: 验证迁移编译** + +Run: `cargo check -p erp-server` +Expected: 编译通过,无错误 + +- [ ] **Step 4: Commit** + +```bash +git add crates/erp-server/migration/src/m20260504_000105_create_vital_signs_daily.rs crates/erp-server/migration/src/lib.rs +git commit -m "feat(db): 新增 vital_signs_daily 日聚合表迁移" +``` + +--- + +### Task 2: 日聚合 Entity + +**Files:** +- Create: `crates/erp-health/src/entity/vital_signs_daily.rs` +- Modify: `crates/erp-health/src/entity/mod.rs` + +- [ ] **Step 1: 创建 Entity 文件** + +参考 `vital_signs_hourly.rs` 的结构,创建 `vital_signs_daily.rs`: + +```rust +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "vital_signs_daily")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub tenant_id: Uuid, + pub patient_id: Uuid, + pub device_type: String, + pub date_bucket: chrono::NaiveDate, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub min_val: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub max_val: Option, + pub avg_val: f64, + pub sample_count: i32, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub percentile_95: Option, + pub created_at: DateTimeUtc, + pub updated_at: DateTimeUtc, + pub version: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::patient::Entity", + from = "Column::PatientId", + to = "super::patient::Column::Id" + )] + Patient, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Patient.def() + } + fn via() -> Option { + None + } +} + +impl ActiveModelBehavior for ActiveModel {} +``` + +- [ ] **Step 2: 注册 Entity 模块** + +在 `crates/erp-health/src/entity/mod.rs` 添加: +```rust +pub mod vital_signs_daily; +``` + +- [ ] **Step 3: 验证编译** + +Run: `cargo check -p erp-health` +Expected: 编译通过 + +- [ ] **Step 4: Commit** + +```bash +git add crates/erp-health/src/entity/vital_signs_daily.rs crates/erp-health/src/entity/mod.rs +git commit -m "feat(health): 新增 VitalSignsDaily SeaORM Entity" +``` + +--- + +### Task 3: 日聚合 Service 函数 + +**Files:** +- Create: `crates/erp-health/src/service/vital_signs_daily_service.rs` +- Modify: `crates/erp-health/src/service/mod.rs` + +- [ ] **Step 1: 编写日聚合 service 单元测试** + +```rust +// 文件末尾的 tests 模块 +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_daily_agg_query_basic() { + // 验证聚合查询构建器产生正确的 SQL 片段 + let query = build_daily_agg_query(Uuid::new_v4(), chrono::NaiveDate::from_ymd_opt(2026, 5, 4).unwrap()); + assert!(query.contains("vital_signs_hourly")); + assert!(query.contains("date_bucket")); + } + + #[test] + fn test_calculate_percentile_95() { + let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let p95 = calculate_percentile(&values, 95.0); + assert!(p95 > 9.0 && p95 <= 10.0); + } +} +``` + +- [ ] **Step 2: 运行测试验证失败** + +Run: `cargo test -p erp-health vital_signs_daily` +Expected: 编译失败(函数未定义) + +- [ ] **Step 3: 实现 service** + +```rust +use chrono::NaiveDate; +use sea_orm::*; +use uuid::Uuid; + +use crate::entity::vital_signs_daily; +use crate::error::HealthResult; + +/// 从 vital_signs_hourly 聚合指定日期的数据到 vital_signs_daily +pub async fn aggregate_daily( + db: &DatabaseConnection, + tenant_id: Uuid, + date: NaiveDate, +) -> HealthResult { + let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc(); + let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc(); + + // 查询当天所有小时聚合数据 + let hourly_rows = entity::vital_signs_hourly::Entity::find() + .filter(entity::vital_signs_hourly::Column::TenantId.eq(tenant_id)) + .filter(entity::vital_signs_hourly::Column::HourStart.gte(start_of_day)) + .filter(entity::vital_signs_hourly::Column::HourStart.lte(end_of_day)) + .all(db) + .await?; + + // 按 (patient_id, device_type) 分组聚合 + let mut grouped: std::collections::HashMap<(Uuid, String), Vec<&_>> = std::collections::HashMap::new(); + for row in &hourly_rows { + let key = (row.patient_id, row.device_type.clone()); + grouped.entry(key).or_default().push(row); + } + + let mut upserted = 0u64; + for ((patient_id, device_type), rows) in grouped { + let avg_val = rows.iter().map(|r| r.avg_val).sum::() / rows.len() as f64; + let min_val = rows.iter().filter_map(|r| r.min_val).reduce(f64::min); + let max_val = rows.iter().filter_map(|r| r.max_val).reduce(f64::max); + let sample_count: i32 = rows.iter().map(|r| r.sample_count).sum(); + let all_avgs: Vec = rows.iter().map(|r| r.avg_val).collect(); + let percentile_95 = if all_avgs.len() >= 2 { + Some(calculate_percentile(&all_avgs, 95.0)) + } else { + None + }; + + // Upsert: ON CONFLICT UPDATE + let result = vital_signs_daily::Entity::insert( + vital_signs_daily::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + patient_id: Set(patient_id), + device_type: Set(device_type), + date_bucket: Set(date), + min_val: Set(min_val), + max_val: Set(max_val), + avg_val: Set(avg_val), + sample_count: Set(sample_count), + percentile_95: Set(percentile_95), + created_at: Set(chrono::Utc::now()), + updated_at: Set(chrono::Utc::now()), + version: Set(1), + }, + ) + .on_conflict( + sea_orm::sea_query::OnConflict::columns([ + vital_signs_daily::Column::TenantId, + vital_signs_daily::Column::PatientId, + vital_signs_daily::Column::DeviceType, + vital_signs_daily::Column::DateBucket, + ]) + .update_columns([ + vital_signs_daily::Column::MinVal, + vital_signs_daily::Column::MaxVal, + vital_signs_daily::Column::AvgVal, + vital_signs_daily::Column::SampleCount, + vital_signs_daily::Column::Percentile95, + vital_signs_daily::Column::UpdatedAt, + ]) + .to_owned(), + ) + .exec(db) + .await?; + + upserted += result.last_insert_id as u64; + } + + Ok(upserted) +} + +/// 计算百分位数 +fn calculate_percentile(values: &[f64], percentile: f64) -> f64 { + let mut sorted = values.to_vec(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let idx = (percentile / 100.0 * (sorted.len() - 1) as f64).ceil() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +/// 查询日聚合数据 +pub async fn query_daily( + db: &DatabaseConnection, + tenant_id: Uuid, + patient_id: Option, + device_type: Option, + start_date: NaiveDate, + end_date: NaiveDate, +) -> HealthResult> { + let mut query = vital_signs_daily::Entity::find() + .filter(vital_signs_daily::Column::TenantId.eq(tenant_id)) + .filter(vital_signs_daily::Column::DateBucket.gte(start_date)) + .filter(vital_signs_daily::Column::DateBucket.lte(end_date)); + + if let Some(pid) = patient_id { + query = query.filter(vital_signs_daily::Column::PatientId.eq(pid)); + } + if let Some(dt) = device_type { + query = query.filter(vital_signs_daily::Column::DeviceType.eq(dt)); + } + + let results = query.all(db).await?; + Ok(results) +} +``` + +- [ ] **Step 4: 注册 service 模块** + +在 `crates/erp-health/src/service/mod.rs` 添加: +```rust +pub mod vital_signs_daily_service; +``` + +- [ ] **Step 5: 验证编译 + 测试通过** + +Run: `cargo test -p erp-health vital_signs_daily` +Expected: 测试通过 + +- [ ] **Step 6: Commit** + +```bash +git add crates/erp-health/src/service/vital_signs_daily_service.rs crates/erp-health/src/service/mod.rs +git commit -m "feat(health): 日聚合 service — 从 hourly 聚合到 daily" +``` + +--- + +## Chunk 2: patient_devices 表增强 + +### Task 4: patient_devices 迁移(新增列) + +**Files:** +- Create: `crates/erp-server/migration/src/m20260504_000106_alter_patient_devices_add_status.rs` +- Modify: `crates/erp-server/migration/src/lib.rs` + +- [ ] **Step 1: 创建迁移文件** + +```rust +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("patient_devices")) + .add_column(ColumnDef::new(Alias::new("status")).string().not_null().default("active")) + .add_column(ColumnDef::new(Alias::new("firmware_version")).string()) + .add_column(ColumnDef::new(Alias::new("manufacturer")).string()) + .add_column(ColumnDef::new(Alias::new("connection_type")).string().not_null().default("ble")) + .add_column(ColumnDef::new(Alias::new("metadata")).json_binary()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("patient_devices")) + .drop_column(Alias::new("status")) + .drop_column(Alias::new("firmware_version")) + .drop_column(Alias::new("manufacturer")) + .drop_column(Alias::new("connection_type")) + .drop_column(Alias::new("metadata")) + .to_owned(), + ) + .await + } +} +``` + +- [ ] **Step 2: 注册迁移** + +在 `lib.rs` 添加 `mod m20260504_000106_alter_patient_devices_add_status;` 并在 migrations vec 末尾追加对应 `Box::new(...)`。 + +- [ ] **Step 3: 更新 Entity** + +在 `crates/erp-health/src/entity/patient_devices.rs` 的 `Model` struct 中添加字段: + +```rust +pub status: String, +#[sea_orm(skip_serializing_if = "Option::is_none")] +pub firmware_version: Option, +#[sea_orm(skip_serializing_if = "Option::is_none")] +pub manufacturer: Option, +pub connection_type: String, +#[sea_orm(skip_serializing_if = "Option::is_none")] +pub metadata: Option, +``` + +- [ ] **Step 4: 验证编译** + +Run: `cargo check -p erp-health` +Expected: 编译通过 + +- [ ] **Step 5: Commit** + +```bash +git add crates/erp-server/migration/src/m20260504_000106_alter_patient_devices_add_status.rs crates/erp-server/migration/src/lib.rs crates/erp-health/src/entity/patient_devices.rs +git commit -m "feat(health): patient_devices 增强 — status/firmware/manufacturer/connection_type/metadata" +``` + +--- + +## Chunk 3: 日聚合 Background Task + +### Task 5: 日聚合定时任务 + +**Files:** +- Modify: `crates/erp-health/src/module.rs` + +- [ ] **Step 1: 添加 start_daily_aggregation 任务** + +参考现有 `start_overdue_checker` 模式,在 `module.rs` 中添加: + +```rust +pub fn start_daily_aggregation(db: sea_orm::DatabaseConnection) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // 每天凌晨 2:00 执行(用 interval 模拟:每 24 小时) + let mut interval = tokio::time::interval(Duration::from_secs(24 * 3600)); + + loop { + tokio::select! { + _ = interval.tick() => { + let yesterday = chrono::Local::now().date_naive() - chrono::Duration::days(1); + // TODO: 遍历所有租户,对每个租户调用 aggregate_daily + // 当前单租户阶段,使用配置的默认租户 ID + tracing::info!(date = %yesterday, "Running daily aggregation"); + if let Err(e) = vital_signs_daily_service::aggregate_daily_for_all_tenants(&db, yesterday).await { + tracing::warn!(error = %e, "Daily aggregation task failed"); + } + } + _ = tokio::signal::ctrl_c() => { + break; + } + } + } + }) +} +``` + +- [ ] **Step 2: 添加 aggregate_daily_for_all_tenants 辅助函数** + +在 `vital_signs_daily_service.rs` 中添加: + +```rust +/// 遍历所有租户执行日聚合 +pub async fn aggregate_daily_for_all_tenants( + db: &DatabaseConnection, + date: NaiveDate, +) -> HealthResult { + // 查询有 hourly 数据的活跃租户 + let tenant_ids: Vec = entity::vital_signs_hourly::Entity::find() + .filter(entity::vital_signs_hourly::Column::HourStart.gte( + date.and_hms_opt(0, 0, 0).unwrap().and_utc(), + )) + .filter(entity::vital_signs_hourly::Column::HourStart.lte( + date.and_hms_opt(23, 59, 59).unwrap().and_utc(), + )) + .all(db) + .await? + .iter() + .map(|r| r.tenant_id) + .collect::>() + .into_iter() + .collect(); + + let mut total = 0u64; + for tenant_id in tenant_ids { + total += aggregate_daily(db, tenant_id, date).await?; + } + Ok(total) +} +``` + +- [ ] **Step 3: 在 on_startup 中启动任务** + +在 `HealthModule` 的 `on_startup()` 方法中追加: + +```rust +Self::start_daily_aggregation(state.db.clone()); +``` + +- [ ] **Step 4: 验证编译** + +Run: `cargo check -p erp-health` +Expected: 编译通过 + +- [ ] **Step 5: Commit** + +```bash +git add crates/erp-health/src/service/vital_signs_daily_service.rs crates/erp-health/src/module.rs +git commit -m "feat(health): 日聚合 background task — 每天自动从 hourly 聚合到 daily" +``` + +--- + +## Chunk 4: 日聚合查询 API + +### Task 6: 日聚合查询 Handler + 路由 + +**Files:** +- Create: `crates/erp-health/src/handler/vital_signs_daily_handler.rs` +- Modify: `crates/erp-health/src/handler/mod.rs` +- Modify: `crates/erp-health/src/module.rs`(路由注册) + +- [ ] **Step 1: 创建查询 DTO** + +```rust +use serde::Deserialize; +use utoipa::IntoParams; + +#[derive(Debug, Deserialize, IntoParams)] +pub struct DailyAggQuery { + pub patient_id: Option, + pub device_type: Option, + pub start_date: String, // YYYY-MM-DD + pub end_date: String, // YYYY-MM-DD +} +``` + +- [ ] **Step 2: 创建 Handler** + +```rust +pub async fn get_daily_aggregations( + State(state): State, + Extension(ctx): Extension, + Query(query): Query, +) -> Result +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.device-readings.list")?; + + let start = query.start_date.parse::() + .map_err(|_| HealthError::Validation("Invalid start_date format, expected YYYY-MM-DD".into()))?; + let end = query.end_date.parse::() + .map_err(|_| HealthError::Validation("Invalid end_date format, expected YYYY-MM-DD".into()))?; + + let results = vital_signs_daily_service::query_daily( + &state.db, ctx.tenant_id, query.patient_id, query.device_type, start, end, + ).await?; + + Ok(axum::Json(ApiResponse::ok(results))) +} +``` + +- [ ] **Step 3: 注册 handler 模块和路由** + +在 `handler/mod.rs` 添加 `pub mod vital_signs_daily_handler;` + +在 `module.rs` 的 `protected_routes()` 中追加路由: + +```rust +.route("/health/vital-signs/daily", axum::routing::get(vital_signs_daily_handler::get_daily_aggregations)) +``` + +- [ ] **Step 4: 验证编译** + +Run: `cargo check -p erp-health` +Expected: 编译通过 + +- [ ] **Step 5: 运行全量测试** + +Run: `cargo test --workspace` +Expected: 全部通过 + +- [ ] **Step 6: Commit** + +```bash +git add crates/erp-health/src/handler/vital_signs_daily_handler.rs crates/erp-health/src/handler/mod.rs crates/erp-health/src/module.rs +git commit -m "feat(health): 日聚合查询 API — GET /health/vital-signs/daily" +``` + +--- + +## 验证清单 + +- [ ] `cargo check --workspace` — 编译通过 +- [ ] `cargo test --workspace` — 全部测试通过 +- [ ] 数据库迁移可正/反向执行 +- [ ] 日聚合 background task 可手动触发验证 +- [ ] GET `/api/v1/health/vital-signs/daily` 返回正确的聚合数据 +- [ ] `git push` 推送到远程仓库