Files
hms/docs/superpowers/plans/2026-05-04-iot-fhir-v1-plan1-data-layer.md
iven feab61b132 docs(plan): IoT + FHIR V1 Plan 1 — 数据层增强实施计划
6 个 Task:vital_signs_daily 表迁移 + Entity + Service +
patient_devices 增强 + 日聚合 background task + 查询 API。
TDD 流程,每步有具体代码和验证命令。
2026-05-04 01:14:15 +08:00

653 lines
21 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# IoT + FHIR V1 Plan 1: 数据层增强
> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 新增 vital_signs_daily 日聚合表、增强 patient_devices 表字段、实现日聚合 background task为 FHIR API 和监控看板提供数据基础。
**Architecture:** 在现有 erp-health 模块内扩展。新增 SeaORM Entity + Migration 对应日聚合表;为 patient_devices 添加迁移扩展列;日聚合任务复用现有 `tokio::spawn` + interval 模式,从 vital_signs_hourly 聚合为日级数据。
**Tech Stack:** Rust / SeaORM / PostgreSQL / tokio / Axum
**Spec:** `docs/superpowers/specs/2026-05-04-iot-fhir-platform-ecosystem-design.md` §3
---
## Chunk 1: vital_signs_daily 表 + Entity
### Task 1: 日聚合表迁移
**Files:**
- Create: `crates/erp-server/migration/src/m20260504_000105_create_vital_signs_daily.rs`
- Modify: `crates/erp-server/migration/src/lib.rs`
- [ ] **Step 1: 创建迁移文件**
```rust
// crates/erp-server/migration/src/m20260504_000105_create_vital_signs_daily.rs
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Alias::new("vital_signs_daily"))
.col(
ColumnDef::new(Alias::new("id"))
.uuid()
.not_null()
.default(Expr::val("gen_random_uuid()")),
)
.col(ColumnDef::new(Alias::new("tenant_id")).uuid().not_null())
.col(ColumnDef::new(Alias::new("patient_id")).uuid().not_null())
.col(ColumnDef::new(Alias::new("device_type")).string().not_null())
.col(ColumnDef::new(Alias::new("date_bucket")).date().not_null())
.col(ColumnDef::new(Alias::new("min_val")).double())
.col(ColumnDef::new(Alias::new("max_val")).double())
.col(ColumnDef::new(Alias::new("avg_val")).double().not_null())
.col(ColumnDef::new(Alias::new("sample_count")).integer().not_null())
.col(ColumnDef::new(Alias::new("percentile_95")).double())
.col(
ColumnDef::new(Alias::new("created_at"))
.timestamp_with_time_zone()
.not_null()
.default(Expr::val("NOW()")),
)
.col(
ColumnDef::new(Alias::new("updated_at"))
.timestamp_with_time_zone()
.not_null()
.default(Expr::val("NOW()")),
)
.col(
ColumnDef::new(Alias::new("version"))
.integer()
.not_null()
.default(1),
)
.primary_key(Index::create().col(Alias::new("id")))
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_vital_signs_daily_unique")
.table(Alias::new("vital_signs_daily"))
.col(Alias::new("tenant_id"))
.col(Alias::new("patient_id"))
.col(Alias::new("device_type"))
.col(Alias::new("date_bucket"))
.unique()
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Alias::new("vital_signs_daily")).to_owned())
.await
}
}
```
- [ ] **Step 2: 注册迁移**
`crates/erp-server/migration/src/lib.rs` 顶部添加:
```rust
mod m20260504_000105_create_vital_signs_daily;
```
`Migrator::migrations()``vec![]` 末尾追加:
```rust
Box::new(m20260504_000105_create_vital_signs_daily::Migration),
```
- [ ] **Step 3: 验证迁移编译**
Run: `cargo check -p erp-server`
Expected: 编译通过,无错误
- [ ] **Step 4: Commit**
```bash
git add crates/erp-server/migration/src/m20260504_000105_create_vital_signs_daily.rs crates/erp-server/migration/src/lib.rs
git commit -m "feat(db): 新增 vital_signs_daily 日聚合表迁移"
```
---
### Task 2: 日聚合 Entity
**Files:**
- Create: `crates/erp-health/src/entity/vital_signs_daily.rs`
- Modify: `crates/erp-health/src/entity/mod.rs`
- [ ] **Step 1: 创建 Entity 文件**
参考 `vital_signs_hourly.rs` 的结构,创建 `vital_signs_daily.rs`
```rust
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "vital_signs_daily")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Uuid,
pub tenant_id: Uuid,
pub patient_id: Uuid,
pub device_type: String,
pub date_bucket: chrono::NaiveDate,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub min_val: Option<f64>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub max_val: Option<f64>,
pub avg_val: f64,
pub sample_count: i32,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub percentile_95: Option<f64>,
pub created_at: DateTimeUtc,
pub updated_at: DateTimeUtc,
pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::patient::Entity",
from = "Column::PatientId",
to = "super::patient::Column::Id"
)]
Patient,
}
impl Related<super::patient::Entity> for Entity {
fn to() -> RelationDef {
Relation::Patient.def()
}
fn via() -> Option<RelationDef> {
None
}
}
impl ActiveModelBehavior for ActiveModel {}
```
- [ ] **Step 2: 注册 Entity 模块**
`crates/erp-health/src/entity/mod.rs` 添加:
```rust
pub mod vital_signs_daily;
```
- [ ] **Step 3: 验证编译**
Run: `cargo check -p erp-health`
Expected: 编译通过
- [ ] **Step 4: Commit**
```bash
git add crates/erp-health/src/entity/vital_signs_daily.rs crates/erp-health/src/entity/mod.rs
git commit -m "feat(health): 新增 VitalSignsDaily SeaORM Entity"
```
---
### Task 3: 日聚合 Service 函数
**Files:**
- Create: `crates/erp-health/src/service/vital_signs_daily_service.rs`
- Modify: `crates/erp-health/src/service/mod.rs`
- [ ] **Step 1: 编写日聚合 service 单元测试**
```rust
// 文件末尾的 tests 模块
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_daily_agg_query_basic() {
// 验证聚合查询构建器产生正确的 SQL 片段
let query = build_daily_agg_query(Uuid::new_v4(), chrono::NaiveDate::from_ymd_opt(2026, 5, 4).unwrap());
assert!(query.contains("vital_signs_hourly"));
assert!(query.contains("date_bucket"));
}
#[test]
fn test_calculate_percentile_95() {
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
let p95 = calculate_percentile(&values, 95.0);
assert!(p95 > 9.0 && p95 <= 10.0);
}
}
```
- [ ] **Step 2: 运行测试验证失败**
Run: `cargo test -p erp-health vital_signs_daily`
Expected: 编译失败(函数未定义)
- [ ] **Step 3: 实现 service**
```rust
use chrono::NaiveDate;
use sea_orm::*;
use uuid::Uuid;
use crate::entity::vital_signs_daily;
use crate::error::HealthResult;
/// 从 vital_signs_hourly 聚合指定日期的数据到 vital_signs_daily
pub async fn aggregate_daily(
db: &DatabaseConnection,
tenant_id: Uuid,
date: NaiveDate,
) -> HealthResult<u64> {
let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc();
// 查询当天所有小时聚合数据
let hourly_rows = entity::vital_signs_hourly::Entity::find()
.filter(entity::vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(entity::vital_signs_hourly::Column::HourStart.gte(start_of_day))
.filter(entity::vital_signs_hourly::Column::HourStart.lte(end_of_day))
.all(db)
.await?;
// 按 (patient_id, device_type) 分组聚合
let mut grouped: std::collections::HashMap<(Uuid, String), Vec<&_>> = std::collections::HashMap::new();
for row in &hourly_rows {
let key = (row.patient_id, row.device_type.clone());
grouped.entry(key).or_default().push(row);
}
let mut upserted = 0u64;
for ((patient_id, device_type), rows) in grouped {
let avg_val = rows.iter().map(|r| r.avg_val).sum::<f64>() / rows.len() as f64;
let min_val = rows.iter().filter_map(|r| r.min_val).reduce(f64::min);
let max_val = rows.iter().filter_map(|r| r.max_val).reduce(f64::max);
let sample_count: i32 = rows.iter().map(|r| r.sample_count).sum();
let all_avgs: Vec<f64> = rows.iter().map(|r| r.avg_val).collect();
let percentile_95 = if all_avgs.len() >= 2 {
Some(calculate_percentile(&all_avgs, 95.0))
} else {
None
};
// Upsert: ON CONFLICT UPDATE
let result = vital_signs_daily::Entity::insert(
vital_signs_daily::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
patient_id: Set(patient_id),
device_type: Set(device_type),
date_bucket: Set(date),
min_val: Set(min_val),
max_val: Set(max_val),
avg_val: Set(avg_val),
sample_count: Set(sample_count),
percentile_95: Set(percentile_95),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
version: Set(1),
},
)
.on_conflict(
sea_orm::sea_query::OnConflict::columns([
vital_signs_daily::Column::TenantId,
vital_signs_daily::Column::PatientId,
vital_signs_daily::Column::DeviceType,
vital_signs_daily::Column::DateBucket,
])
.update_columns([
vital_signs_daily::Column::MinVal,
vital_signs_daily::Column::MaxVal,
vital_signs_daily::Column::AvgVal,
vital_signs_daily::Column::SampleCount,
vital_signs_daily::Column::Percentile95,
vital_signs_daily::Column::UpdatedAt,
])
.to_owned(),
)
.exec(db)
.await?;
upserted += result.last_insert_id as u64;
}
Ok(upserted)
}
/// 计算百分位数
fn calculate_percentile(values: &[f64], percentile: f64) -> f64 {
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
let idx = (percentile / 100.0 * (sorted.len() - 1) as f64).ceil() as usize;
sorted[idx.min(sorted.len() - 1)]
}
/// 查询日聚合数据
pub async fn query_daily(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Option<Uuid>,
device_type: Option<String>,
start_date: NaiveDate,
end_date: NaiveDate,
) -> HealthResult<Vec<vital_signs_daily::Model>> {
let mut query = vital_signs_daily::Entity::find()
.filter(vital_signs_daily::Column::TenantId.eq(tenant_id))
.filter(vital_signs_daily::Column::DateBucket.gte(start_date))
.filter(vital_signs_daily::Column::DateBucket.lte(end_date));
if let Some(pid) = patient_id {
query = query.filter(vital_signs_daily::Column::PatientId.eq(pid));
}
if let Some(dt) = device_type {
query = query.filter(vital_signs_daily::Column::DeviceType.eq(dt));
}
let results = query.all(db).await?;
Ok(results)
}
```
- [ ] **Step 4: 注册 service 模块**
`crates/erp-health/src/service/mod.rs` 添加:
```rust
pub mod vital_signs_daily_service;
```
- [ ] **Step 5: 验证编译 + 测试通过**
Run: `cargo test -p erp-health vital_signs_daily`
Expected: 测试通过
- [ ] **Step 6: Commit**
```bash
git add crates/erp-health/src/service/vital_signs_daily_service.rs crates/erp-health/src/service/mod.rs
git commit -m "feat(health): 日聚合 service — 从 hourly 聚合到 daily"
```
---
## Chunk 2: patient_devices 表增强
### Task 4: patient_devices 迁移(新增列)
**Files:**
- Create: `crates/erp-server/migration/src/m20260504_000106_alter_patient_devices_add_status.rs`
- Modify: `crates/erp-server/migration/src/lib.rs`
- [ ] **Step 1: 创建迁移文件**
```rust
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Alias::new("patient_devices"))
.add_column(ColumnDef::new(Alias::new("status")).string().not_null().default("active"))
.add_column(ColumnDef::new(Alias::new("firmware_version")).string())
.add_column(ColumnDef::new(Alias::new("manufacturer")).string())
.add_column(ColumnDef::new(Alias::new("connection_type")).string().not_null().default("ble"))
.add_column(ColumnDef::new(Alias::new("metadata")).json_binary())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Alias::new("patient_devices"))
.drop_column(Alias::new("status"))
.drop_column(Alias::new("firmware_version"))
.drop_column(Alias::new("manufacturer"))
.drop_column(Alias::new("connection_type"))
.drop_column(Alias::new("metadata"))
.to_owned(),
)
.await
}
}
```
- [ ] **Step 2: 注册迁移**
`lib.rs` 添加 `mod m20260504_000106_alter_patient_devices_add_status;` 并在 migrations vec 末尾追加对应 `Box::new(...)`
- [ ] **Step 3: 更新 Entity**
`crates/erp-health/src/entity/patient_devices.rs``Model` struct 中添加字段:
```rust
pub status: String,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub firmware_version: Option<String>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub manufacturer: Option<String>,
pub connection_type: String,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
```
- [ ] **Step 4: 验证编译**
Run: `cargo check -p erp-health`
Expected: 编译通过
- [ ] **Step 5: Commit**
```bash
git add crates/erp-server/migration/src/m20260504_000106_alter_patient_devices_add_status.rs crates/erp-server/migration/src/lib.rs crates/erp-health/src/entity/patient_devices.rs
git commit -m "feat(health): patient_devices 增强 — status/firmware/manufacturer/connection_type/metadata"
```
---
## Chunk 3: 日聚合 Background Task
### Task 5: 日聚合定时任务
**Files:**
- Modify: `crates/erp-health/src/module.rs`
- [ ] **Step 1: 添加 start_daily_aggregation 任务**
参考现有 `start_overdue_checker` 模式,在 `module.rs` 中添加:
```rust
pub fn start_daily_aggregation(db: sea_orm::DatabaseConnection) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
// 每天凌晨 2:00 执行(用 interval 模拟:每 24 小时)
let mut interval = tokio::time::interval(Duration::from_secs(24 * 3600));
loop {
tokio::select! {
_ = interval.tick() => {
let yesterday = chrono::Local::now().date_naive() - chrono::Duration::days(1);
// TODO: 遍历所有租户,对每个租户调用 aggregate_daily
// 当前单租户阶段,使用配置的默认租户 ID
tracing::info!(date = %yesterday, "Running daily aggregation");
if let Err(e) = vital_signs_daily_service::aggregate_daily_for_all_tenants(&db, yesterday).await {
tracing::warn!(error = %e, "Daily aggregation task failed");
}
}
_ = tokio::signal::ctrl_c() => {
break;
}
}
}
})
}
```
- [ ] **Step 2: 添加 aggregate_daily_for_all_tenants 辅助函数**
`vital_signs_daily_service.rs` 中添加:
```rust
/// 遍历所有租户执行日聚合
pub async fn aggregate_daily_for_all_tenants(
db: &DatabaseConnection,
date: NaiveDate,
) -> HealthResult<u64> {
// 查询有 hourly 数据的活跃租户
let tenant_ids: Vec<Uuid> = entity::vital_signs_hourly::Entity::find()
.filter(entity::vital_signs_hourly::Column::HourStart.gte(
date.and_hms_opt(0, 0, 0).unwrap().and_utc(),
))
.filter(entity::vital_signs_hourly::Column::HourStart.lte(
date.and_hms_opt(23, 59, 59).unwrap().and_utc(),
))
.all(db)
.await?
.iter()
.map(|r| r.tenant_id)
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let mut total = 0u64;
for tenant_id in tenant_ids {
total += aggregate_daily(db, tenant_id, date).await?;
}
Ok(total)
}
```
- [ ] **Step 3: 在 on_startup 中启动任务**
`HealthModule``on_startup()` 方法中追加:
```rust
Self::start_daily_aggregation(state.db.clone());
```
- [ ] **Step 4: 验证编译**
Run: `cargo check -p erp-health`
Expected: 编译通过
- [ ] **Step 5: Commit**
```bash
git add crates/erp-health/src/service/vital_signs_daily_service.rs crates/erp-health/src/module.rs
git commit -m "feat(health): 日聚合 background task — 每天自动从 hourly 聚合到 daily"
```
---
## Chunk 4: 日聚合查询 API
### Task 6: 日聚合查询 Handler + 路由
**Files:**
- Create: `crates/erp-health/src/handler/vital_signs_daily_handler.rs`
- Modify: `crates/erp-health/src/handler/mod.rs`
- Modify: `crates/erp-health/src/module.rs`(路由注册)
- [ ] **Step 1: 创建查询 DTO**
```rust
use serde::Deserialize;
use utoipa::IntoParams;
#[derive(Debug, Deserialize, IntoParams)]
pub struct DailyAggQuery {
pub patient_id: Option<uuid::Uuid>,
pub device_type: Option<String>,
pub start_date: String, // YYYY-MM-DD
pub end_date: String, // YYYY-MM-DD
}
```
- [ ] **Step 2: 创建 Handler**
```rust
pub async fn get_daily_aggregations<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(query): Query<DailyAggQuery>,
) -> Result<impl IntoResponse, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.device-readings.list")?;
let start = query.start_date.parse::<chrono::NaiveDate>()
.map_err(|_| HealthError::Validation("Invalid start_date format, expected YYYY-MM-DD".into()))?;
let end = query.end_date.parse::<chrono::NaiveDate>()
.map_err(|_| HealthError::Validation("Invalid end_date format, expected YYYY-MM-DD".into()))?;
let results = vital_signs_daily_service::query_daily(
&state.db, ctx.tenant_id, query.patient_id, query.device_type, start, end,
).await?;
Ok(axum::Json(ApiResponse::ok(results)))
}
```
- [ ] **Step 3: 注册 handler 模块和路由**
`handler/mod.rs` 添加 `pub mod vital_signs_daily_handler;`
`module.rs``protected_routes()` 中追加路由:
```rust
.route("/health/vital-signs/daily", axum::routing::get(vital_signs_daily_handler::get_daily_aggregations))
```
- [ ] **Step 4: 验证编译**
Run: `cargo check -p erp-health`
Expected: 编译通过
- [ ] **Step 5: 运行全量测试**
Run: `cargo test --workspace`
Expected: 全部通过
- [ ] **Step 6: Commit**
```bash
git add crates/erp-health/src/handler/vital_signs_daily_handler.rs crates/erp-health/src/handler/mod.rs crates/erp-health/src/module.rs
git commit -m "feat(health): 日聚合查询 API — GET /health/vital-signs/daily"
```
---
## 验证清单
- [ ] `cargo check --workspace` — 编译通过
- [ ] `cargo test --workspace` — 全部测试通过
- [ ] 数据库迁移可正/反向执行
- [ ] 日聚合 background task 可手动触发验证
- [ ] GET `/api/v1/health/vital-signs/daily` 返回正确的聚合数据
- [ ] `git push` 推送到远程仓库