From d93cddc03593a52c1eb03b487d521f34c274600d Mon Sep 17 00:00:00 2001 From: iven Date: Sun, 26 Apr 2026 22:35:52 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E5=AE=9E=E6=97=B6=E4=BD=93=E5=BE=81?= =?UTF-8?q?=E9=87=87=E9=9B=86=E5=AE=9E=E6=96=BD=E8=AE=A1=E5=88=92=20?= =?UTF-8?q?=E2=80=94=20Phase=201-2=20=E5=85=B1=2022=20=E4=B8=AA=20Task?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 覆盖数据库迁移(5)、Entity+校验(5)、Service(2)、Handler+路由(2)、 告警引擎(2)、SSE扩展(1)、小程序BLE(3)、Web前端+集成验证(2)。 经 spec review 修正 Critical/Important 问题。 --- ...026-04-26-realtime-vital-signs-pipeline.md | 1066 +++++++++++++++++ 1 file changed, 1066 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-26-realtime-vital-signs-pipeline.md diff --git a/docs/superpowers/plans/2026-04-26-realtime-vital-signs-pipeline.md b/docs/superpowers/plans/2026-04-26-realtime-vital-signs-pipeline.md new file mode 100644 index 0000000..d822599 --- /dev/null +++ b/docs/superpowers/plans/2026-04-26-realtime-vital-signs-pipeline.md @@ -0,0 +1,1066 @@ +# 实时体征采集与智能告警系统 — 实施计划 + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 实现"健康手环 → 小程序 BLE → REST API → PostgreSQL 分区表 → 降采样聚合 → 规则告警 → SSE 推送"的完整闭环。 + +**Architecture:** 小程序通过 BLE 适配器抽象层采集手环数据,批量提交到 HMS REST API。后端分快速路径(校验+存储+降采样+事件)和异步路径(告警评估)。数据存储采用分区表(原始)+ 聚合表(小时级)双轨制。告警引擎基于 PostgreSQL 窗口函数实现滑动窗口规则评估。 + +**Tech Stack:** Rust/Axum/SeaORM/PostgreSQL(后端)+ Taro 4.2/React 18/微信 BLE API(小程序)+ React 19/Ant Design(Web 前端) + +**Spec:** `docs/superpowers/specs/2026-04-26-realtime-vital-signs-pipeline-design.md` + +--- + +## File Structure + +### 新建文件 + +``` +crates/erp-server/migration/src/ + m20260426_000073_create_device_readings.rs # 分区表迁移 + m20260426_000074_create_vital_signs_hourly.rs # 降采样表迁移 + m20260426_000075_create_patient_devices.rs # 设备绑定表迁移 + m20260426_000076_create_alert_rules.rs # 告警规则表迁移 + m20260426_000077_create_alerts.rs # 告警记录表迁移 + +crates/erp-health/src/entity/ + device_readings.rs # 原始设备数据 Entity + vital_signs_hourly.rs # 小时聚合 Entity + patient_devices.rs # 设备绑定 Entity + alert_rules.rs # 告警规则 Entity + alerts.rs # 告警记录 Entity + +crates/erp-health/src/service/ + device_reading_service.rs # 摄入+降采样+查询 service + alert_engine.rs # 告警引擎核心 + alert_service.rs # 告警 CRUD service + +crates/erp-health/src/handler/ + device_reading_handler.rs # 设备数据摄入+查询 handler + alert_handler.rs # 告警 handler + alert_rule_handler.rs # 规则管理 handler + +apps/miniprogram/src/services/ + ble/ # BLE 采集模块目录 + types.ts # 统一类型定义 + BLEManager.ts # 连接管理器 + adapters/ + XiaomiBandAdapter.ts # 小米手环适配器 + device-sync.ts # 设备数据同步 API + +apps/miniprogram/src/pages/ + device-sync/ # 设备同步页面 + index.tsx + index.config.ts + index.scss + +apps/web/src/api/health/ + deviceReadings.ts # 设备数据 API + alerts.ts # 告警 API +``` + +### 修改文件 + +``` +crates/erp-server/migration/src/lib.rs # 注册新迁移 +crates/erp-health/src/entity/mod.rs # 导出新 Entity +crates/erp-health/src/service/mod.rs # 导出新 service +crates/erp-health/src/handler/mod.rs # 导出新 handler +crates/erp-health/src/module.rs # 注册路由+权限+事件订阅 +crates/erp-health/src/service/validation.rs # 新增枚举校验 +crates/erp-message/src/handler/sse_handler.rs # SSE 扩展 +apps/web/src/stores/message.ts # 前端 SSE 监听扩展 +``` + +--- + +## Chunk 1: 数据库迁移(Task 1-5) + +### Task 1: device_readings 分区表迁移 + +**Files:** +- Create: `crates/erp-server/migration/src/m20260426_000073_create_device_readings.rs` +- Modify: `crates/erp-server/migration/src/lib.rs` + +- [ ] **Step 1: 创建迁移文件** + +```rust +// m20260426_000073_create_device_readings.rs +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // 创建分区主表 + manager.exec_stmt( + Table::create() + .table(Alias::new("device_readings")) + .engine("Incompatible") // 占位,用 raw SQL + .to_owned(), + ).await.ok(); // 忽略,下面用 raw SQL + + let sql = r#" + CREATE TABLE IF NOT EXISTS device_readings ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + patient_id UUID NOT NULL, + device_id VARCHAR(64), + device_type VARCHAR(32) NOT NULL, + device_model VARCHAR(64), + raw_value JSONB NOT NULL, + measured_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW(), + deleted_at TIMESTAMPTZ + ) PARTITION BY RANGE (measured_at); + "#; + manager.get_connection().execute_unprepared(sql).await?; + + // 创建唯一约束(去重) + manager.get_connection().execute_unprepared( + "ALTER TABLE device_readings DROP CONSTRAINT IF EXISTS device_readings_pkey;" + ).await.ok(); + + // 用 (device_id, device_type, measured_at) 作为分区键下的唯一约束 + // 注意:分区表的唯一索引必须包含分区键 measured_at + manager.get_connection().execute_unprepared( + "ALTER TABLE device_readings ADD PRIMARY KEY (id, measured_at);" + ).await?; + + // 去重索引 — 分区表唯一索引必须包含分区键,此处已包含 measured_at + // 但 tenant_id/patient_id 不是分区键,不能用 UNIQUE INDEX + // 改用应用层去重(batch_insert 用 ON CONFLICT DO NOTHING) + // 不创建全局唯一索引 + + // 核心查询索引 + manager.get_connection().execute_unprepared( + "CREATE INDEX idx_dr_tenant_patient ON device_readings (tenant_id, patient_id, measured_at DESC);" + ).await?; + + manager.get_connection().execute_unprepared( + "CREATE INDEX idx_dr_device_type ON device_readings (tenant_id, device_type, measured_at DESC);" + ).await?; + + // 创建初始分区(当前月 + 未来 3 个月) + for (suffix, start, end) in [ + ("2026_05", "2026-05-01", "2026-06-01"), + ("2026_06", "2026-06-01", "2026-07-01"), + ("2026_07", "2026-07-01", "2026-08-01"), + ("2026_08", "2026-08-01", "2026-09-01"), + ] { + let partition_sql = format!( + "CREATE TABLE IF NOT EXISTS device_readings_{suffix} PARTITION OF device_readings FOR VALUES FROM ('{start}') TO ('{end}');" + ); + manager.get_connection().execute_unprepared(&partition_sql).await?; + } + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // 分区表需要先删分区再删主表 + for suffix in ["2026_05", "2026_06", "2026_07", "2026_08"] { + manager.get_connection().execute_unprepared( + &format!("DROP TABLE IF EXISTS device_readings_{suffix};") + ).await.ok(); + } + manager.get_connection().execute_unprepared( + "DROP TABLE IF EXISTS device_readings;" + ).await + } +} +``` + +- [ ] **Step 2: 注册迁移到 lib.rs** + +在 `migration/src/lib.rs` 中: +1. 添加 `mod m20260426_000073_create_device_readings;` +2. 在 `migrations()` 的 `vec![]` 末尾追加 `Box::new(m20260426_000073_create_device_readings::Migration),` + +- [ ] **Step 3: 验证编译** + +Run: `cargo check` +Expected: 编译通过 + +- [ ] **Step 4: 运行迁移** + +Run: `cd crates/erp-server && cargo run`(启动后端自动执行迁移) +Expected: 日志中看到 `apply migration m20260426_000073_create_device_readings` + +- [ ] **Step 5: 验证表结构** + +Run: `D:\postgreSQL\bin\psql.exe -U postgres -h localhost -d erp -c "\d device_readings"` +Expected: 表存在,含分区信息 + +- [ ] **Step 6: 提交** + +```bash +git add crates/erp-server/migration/src/m20260426_000073_create_device_readings.rs crates/erp-server/migration/src/lib.rs +git commit -m "feat(db): 创建 device_readings 分区表 — 原始设备数据存储" +``` + +--- + +### Task 2: vital_signs_hourly 降采样表迁移 + +**Files:** +- Create: `crates/erp-server/migration/src/m20260426_000074_create_vital_signs_hourly.rs` +- Modify: `crates/erp-server/migration/src/lib.rs` + +- [ ] **Step 1: 创建迁移文件** + +```rust +// m20260426_000074_create_vital_signs_hourly.rs +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager.create_table( + Table::create() + .table(Alias::new("vital_signs_hourly")) + .col(ColumnDef::new(Alias::new("id")).uuid().not_null().primary_key().default(Expr::cust("gen_random_uuid()"))) + .col(ColumnDef::new(Alias::new("tenant_id")).uuid().not_null()) + .col(ColumnDef::new(Alias::new("patient_id")).uuid().not_null()) + .col(ColumnDef::new(Alias::new("device_type")).string().not_null()) + .col(ColumnDef::new(Alias::new("hour_start")).timestamp_with_time_zone().not_null()) + .col(ColumnDef::new(Alias::new("min_val")).double()) + .col(ColumnDef::new(Alias::new("max_val")).double()) + .col(ColumnDef::new(Alias::new("avg_val")).double().not_null()) + .col(ColumnDef::new(Alias::new("sample_count")).integer().not_null().default(1)) + .col(ColumnDef::new(Alias::new("created_at")).timestamp_with_time_zone().default(Expr::cust("NOW()"))) + .col(ColumnDef::new(Alias::new("updated_at")).timestamp_with_time_zone().default(Expr::cust("NOW()"))) + .col(ColumnDef::new(Alias::new("version")).integer().not_null().default(1)) + .to_owned(), + ).await?; + + // UNIQUE 约束 + manager.create_index( + Index::create() + .name("idx_vsh_unique") + .table(Alias::new("vital_signs_hourly")) + .col(Alias::new("tenant_id")) + .col(Alias::new("patient_id")) + .col(Alias::new("device_type")) + .col(Alias::new("hour_start")) + .unique() + .to_owned(), + ).await?; + + // 查询索引 + manager.create_index( + Index::create() + .name("idx_vsh_tenant_patient") + .table(Alias::new("vital_signs_hourly")) + .col(Alias::new("tenant_id")) + .col(Alias::new("patient_id")) + .col(Alias::new("device_type")) + .col(Alias::new("hour_start")) + .to_owned(), + ).await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager.drop_table(Table::drop().table(Alias::new("vital_signs_hourly")).to_owned()).await + } +} +``` + +- [ ] **Step 2: 注册迁移 + 编译 + 迁移 + 验证 + 提交** + +同 Task 1 的 Step 2-6 模式。提交信息:`feat(db): 创建 vital_signs_hourly 降采样表` + +--- + +### Task 3: patient_devices 设备绑定表迁移 + +**Files:** +- Create: `crates/erp-server/migration/src/m20260426_000075_create_patient_devices.rs` + +- [ ] **Step 1: 创建迁移文件** + +标准 HMS 表结构(id, tenant_id, patient_id, device_id, device_model, device_type, bound_at, created_at, updated_at, created_by, updated_by, deleted_at, version)。含 `UNIQUE (tenant_id, patient_id, device_id)` 约束。 + +遵循 Task 2 的 `Table::create()` 模式。 + +- [ ] **Step 2-6: 注册 + 编译 + 迁移 + 验证 + 提交** + +提交信息:`feat(db): 创建 patient_devices 设备绑定表` + +--- + +### Task 4: alert_rules 告警规则表迁移 + +**Files:** +- Create: `crates/erp-server/migration/src/m20260426_000076_create_alert_rules.rs` + +- [ ] **Step 1: 创建迁移文件** + +字段:id, tenant_id, name, description, device_type, condition_type, condition_params(JSONB), severity, is_active, apply_tags(JSONB), notify_roles(JSONB), created_at, updated_at, created_by, updated_by, deleted_at, version。 + +- [ ] **Step 2-6: 注册 + 编译 + 迁移 + 验证 + 提交** + +提交信息:`feat(db): 创建 alert_rules 告警规则表` + +--- + +### Task 5: alerts 告警记录表迁移 + +**Files:** +- Create: `crates/erp-server/migration/src/m20260426_000077_create_alerts.rs` + +- [ ] **Step 1: 创建迁移文件** + +字段:id, tenant_id, patient_id, rule_id(FK→alert_rules ON DELETE RESTRICT), severity, title, detail(JSONB), status, acknowledged_by, acknowledged_at, resolved_at, created_at, updated_at, deleted_at, version。 + +索引:`idx_alerts_tenant_patient(tenant_id, patient_id, created_at DESC)`, `idx_alerts_status(tenant_id, status, created_at DESC)`。 + +- [ ] **Step 2-6: 注册 + 编译 + 迁移 + 验证 + 提交** + +提交信息:`feat(db): 创建 alerts 告警记录表` + +--- + +## Chunk 2: Entity + Validation(Task 6-10) + +### Task 6: device_readings Entity + +**Files:** +- Create: `crates/erp-health/src/entity/device_readings.rs` +- Modify: `crates/erp-health/src/entity/mod.rs` + +- [ ] **Step 1: 创建 Entity 文件** + +遵循 `vital_signs.rs` 模式: +- `#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]` +- `#[sea_orm(table_name = "device_readings")]` +- 所有字段用 `Option` 包裹可空字段 +- 不含 `updated_at`/`updated_by`/`version`(不可变数据) +- 主键:`#[sea_orm(primary_key, auto_increment = false)]` id: Uuid +- `Relation` 为空 `#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]` + +- [ ] **Step 2: 在 mod.rs 中导出** + +添加 `pub mod device_readings;` + +- [ ] **Step 3: 验证 `cargo check` 通过** + +- [ ] **Step 4: 提交** + +```bash +git add crates/erp-health/src/entity/device_readings.rs crates/erp-health/src/entity/mod.rs +git commit -m "feat(health): device_readings Entity — 原始设备数据" +``` + +--- + +### Task 7: vital_signs_hourly Entity + +**Files:** +- Create: `crates/erp-health/src/entity/vital_signs_hourly.rs` +- Modify: `crates/erp-health/src/entity/mod.rs` + +同 Task 6 模式,含标准字段(id, tenant_id, created_at, updated_at, version)。不含 `deleted_at`(聚合数据不软删除)。 + +提交信息:`feat(health): vital_signs_hourly Entity — 小时降采样` + +--- + +### Task 8: patient_devices Entity + +**Files:** +- Create: `crates/erp-health/src/entity/patient_devices.rs` + +标准 HMS Entity 模式,含所有标准字段。 + +提交信息:`feat(health): patient_devices Entity — 设备绑定` + +--- + +### Task 9: alert_rules + alerts Entity + +**Files:** +- Create: `crates/erp-health/src/entity/alert_rules.rs` +- Create: `crates/erp-health/src/entity/alerts.rs` + +`alerts` 的 `Relation` 中添加 `belongs_to` 关联到 `alert_rules`。 + +提交信息:`feat(health): alert_rules + alerts Entity — 告警规则与记录` + +--- + +### Task 10: 新增枚举校验 + +**Files:** +- Modify: `crates/erp-health/src/service/validation.rs` + +- [ ] **Step 1: 添加枚举校验函数** + +在 `validation.rs` 中新增: + +```rust +// 使用 validate_enum! 宏添加以下校验 +validate_enum!(validate_device_type, "device_type", &[ + "heart_rate", "blood_oxygen", "steps", "sleep", "temperature", "stress" +]); + +validate_enum!(validate_condition_type, "condition_type", &[ + "single_threshold", "consecutive", "trend" +]); + +validate_enum!(validate_alert_severity, "alert_severity", &[ + "info", "warning", "critical", "urgent" +]); + +validate_enum!(validate_alert_status, "alert_status", &[ + "pending", "acknowledged", "resolved", "dismissed" +]); + +validate_enum!(validate_direction, "direction", &[ + "above", "below", "up", "down" +]); + +// 告警状态转换校验 +pub fn validate_alert_status_transition(current: &str, next: &str) -> HealthResult<()> { + match current { + "pending" => matches!(next, "acknowledged" | "dismissed") + .then_some(()) + .ok_or_else(|| HealthError::InvalidStatusTransition( + format!("alert: {} -> {}", current, next) + )), + "acknowledged" => matches!(next, "resolved" | "dismissed") + .then_some(()) + .ok_or_else(|| HealthError::InvalidStatusTransition( + format!("alert: {} -> {}", current, next) + )), + _ => Err(HealthError::InvalidStatusTransition( + format!("alert: terminal state {} cannot transition", current) + )), + } +} +``` + +- [ ] **Step 2: 添加单元测试** + +在 `#[cfg(test)] mod tests` 中新增对应的测试用例。 + +- [ ] **Step 3: 运行测试 `cargo test -p erp-health -- validation`** + +- [ ] **Step 4: 提交** + +```bash +git add crates/erp-health/src/service/validation.rs +git commit -m "feat(health): 设备类型/告警枚举校验 + 状态转换验证" +``` + +--- + +## Chunk 3: Service 层 — 数据摄入与降采样(Task 11-12) + +### Task 11: device_reading_service — 数据摄入 + 降采样 + +**Files:** +- Create: `crates/erp-health/src/service/device_reading_service.rs` +- Modify: `crates/erp-health/src/service/mod.rs` + +- [ ] **Step 1: 创建 service 文件骨架** + +```rust +// device_reading_service.rs +use crate::entity::{device_readings, vital_signs_hourly, patient_devices, patient}; +use crate::error::{HealthResult, HealthError}; +use crate::state::HealthState; +use erp_core::events::{EventBus, DomainEvent}; +use sea_orm::*; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use chrono::{DateTime, Utc, Timelike}; +use std::collections::HashMap; + +// ── DTO ── + +#[derive(Debug, Deserialize, utoipa::ToSchema)] +pub struct BatchReadingRequest { + pub device_id: String, + pub device_model: Option, + pub readings: Vec, +} + +#[derive(Debug, Deserialize, utoipa::ToSchema)] +pub struct ReadingInput { + pub device_type: String, + pub values: serde_json::Value, + pub measured_at: String, // ISO 8601 +} + +#[derive(Debug, Serialize, utoipa::ToSchema)] +pub struct BatchResult { + pub accepted: u64, + pub duplicates: u64, + pub earliest: Option, + pub latest: Option, +} + +// ── 核心函数 ── + +pub async fn batch_create_readings( + state: &HealthState, + tenant_id: Uuid, + patient_id: Uuid, + req: BatchReadingRequest, +) -> HealthResult { + // 1. 校验患者存在 + let _patient = patient::Entity::find_by_id(patient_id) + .filter(patient::Column::TenantId.eq(tenant_id)) + .filter(patient::Column::DeletedAt.is_null()) + .one(&state.db) + .await? + .ok_or_else(|| HealthError::PatientNotFound)?; + + // 2. 校验/创建设备绑定 + ensure_device_binding( + &state.db, tenant_id, patient_id, + &req.device_id, req.device_model.as_deref(), + ).await?; + + // 3. 解析 + 校验 readings + let mut parsed_readings = Vec::with_capacity(req.readings.len().min(500)); + let mut earliest: Option> = None; + let mut latest: Option> = None; + + for r in &req.readings { + // 校验 device_type 枚举 + crate::service::validation::validate_device_type(&r.device_type)?; + + // 解析 measured_at + let measured_at: DateTime = r.measured_at.parse() + .map_err(|_| HealthError::Validation("measured_at 格式无效".into()))?; + + // 校验不接受未来时间 + if measured_at > Utc::now() { + return Err(HealthError::Validation("measured_at 不能是未来时间".into())); + } + + earliest = earliest.map_or(Some(measured_at), |e| Some(e.min(measured_at))); + latest = latest.map_or(Some(measured_at), |l| Some(l.max(measured_at))); + + parsed_readings.push((r, measured_at)); + } + + // 4. 批量插入(ON CONFLICT DO NOTHING) + let total = parsed_readings.len() as u64; + let inserted = batch_insert_readings( + &state.db, tenant_id, patient_id, + &req.device_id, req.device_model.as_deref(), + &parsed_readings, + ).await?; + + // 5. 降采样 upsert + upsert_hourly_aggregates( + &state.db, tenant_id, patient_id, &parsed_readings, + ).await?; + + // 6. 发布 EventBus 事件 + let event = DomainEvent::new( + "device.readings.synced", + tenant_id, + serde_json::json!({ + "patient_id": patient_id, + "count": inserted, + "device_model": req.device_model, + "date_range": { + "from": earliest.map(|t| t.to_rfc3339()), + "to": latest.map(|t| t.to_rfc3339()), + } + }), + ); + // publish 返回 (),fire-and-forget(outbox 持久化在 publish 内部处理) + state.event_bus.publish(event, &state.db).await; + + Ok(BatchResult { + accepted: inserted, + duplicates: total.saturating_sub(inserted), + earliest: earliest.map(|t| t.to_rfc3339()), + latest: latest.map(|t| t.to_rfc3339()), + }) +} +``` + +- [ ] **Step 2: 实现 ensure_device_binding** + +```rust +async fn ensure_device_binding( + db: &DatabaseConnection, + tenant_id: Uuid, + patient_id: Uuid, + device_id: &str, + device_model: Option<&str>, +) -> HealthResult<()> { + let existing = patient_devices::Entity::find() + .filter(patient_devices::Column::TenantId.eq(tenant_id)) + .filter(patient_devices::Column::PatientId.eq(patient_id)) + .filter(patient_devices::Column::DeviceId.eq(device_id)) + .filter(patient_devices::Column::DeletedAt.is_null()) + .one(db) + .await?; + + if existing.is_none() { + // 首次同步,创建绑定 + let binding = patient_devices::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + patient_id: Set(patient_id), + device_id: Set(device_id.to_string()), + device_model: Set(device_model.map(String::from)), + device_type: Set(None), // 由 readings 中推断 + bound_at: Set(Utc::now()), + created_at: Set(Utc::now()), + updated_at: Set(Utc::now()), + created_by: Set(None), + updated_by: Set(None), + deleted_at: Set(None), + version: Set(1), + }; + binding.insert(db).await?; + } + Ok(()) +} +``` + +- [ ] **Step 3: 实现 batch_insert_readings(raw SQL 批量插入)** + +使用 `INSERT ... ON CONFLICT DO NOTHING` 实现。由于 SeaORM 不原生支持分区表的批量 upsert,这里用 raw SQL。 + +- [ ] **Step 4: 实现 upsert_hourly_aggregates** + +按 `(device_type, hour_start)` 分组,计算 min/max/avg/count,执行 `INSERT ... ON CONFLICT DO UPDATE`。 + +- [ ] **Step 5: 在 mod.rs 中导出** + +添加 `pub mod device_reading_service;` + +- [ ] **Step 6: `cargo check` 通过 + 提交** + +```bash +git add crates/erp-health/src/service/device_reading_service.rs crates/erp-health/src/service/mod.rs +git commit -m "feat(health): device_reading_service — 批量摄入+降采样+事件发布" +``` + +--- + +### Task 12: device_reading_service — 查询 API + +**Files:** +- Modify: `crates/erp-health/src/service/device_reading_service.rs` + +在同一个 service 文件中添加查询函数: + +```rust +pub async fn query_device_readings( + state: &HealthState, + tenant_id: Uuid, + patient_id: Uuid, + device_type: Option<&str>, + hours: Option, +) -> HealthResult> { ... } + +pub async fn query_hourly_readings( + state: &HealthState, + tenant_id: Uuid, + patient_id: Uuid, + device_type: &str, + days: i64, +) -> HealthResult> { ... } +``` + +遵循现有 `health_data_service.rs` 的分页查询模式。提交信息:`feat(health): 设备数据查询 API — 原始+降采样` + +--- + +## Chunk 4: Handler 层 + 路由注册(Task 13-14) + +### Task 13: device_reading_handler + +**Files:** +- Create: `crates/erp-health/src/handler/device_reading_handler.rs` +- Modify: `crates/erp-health/src/handler/mod.rs` + +- [ ] **Step 1: 创建 handler** + +遵循 `health_data_handler.rs` 模式: + +```rust +use axum::{extract::{State, Extension, Path, Query}, Json}; +use erp_core::rbac::require_permission; +use erp_core::types::TenantContext; +use erp_core::types::ApiResponse; +use crate::state::HealthState; +use crate::service::device_reading_service; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +pub struct BatchReadingPath { + pub patient_id: uuid::Uuid, +} + +pub async fn batch_create( + State(state): State, + Extension(ctx): Extension, + Path(path): Path, + Json(body): Json, +) -> Result>, AppError> +where HealthState: std::ops::Deref, // FromRef 模式 +{ + require_permission(&ctx, "health.device-readings.manage")?; + let tenant_id = ctx.tenant_id(); + let result = device_reading_service::batch_create_readings( + &state, tenant_id, path.patient_id, body, + ).await?; + Ok(Json(ApiResponse::ok(result))) +} + +// 查询原始数据 +pub async fn list_readings(...) { ... } + +// 查询降采样数据 +pub async fn list_hourly(...) { ... } +``` + +- [ ] **Step 2: 在 mod.rs 中导出** + +添加 `pub mod device_reading_handler;` + +- [ ] **Step 3: `cargo check` + 提交** + +```bash +git add crates/erp-health/src/handler/device_reading_handler.rs crates/erp-health/src/handler/mod.rs +git commit -m "feat(health): device_reading_handler — 批量摄入+查询端点" +``` + +--- + +### Task 14: 路由注册 + 权限 + 事件订阅 + +**Files:** +- Modify: `crates/erp-health/src/module.rs` + +- [ ] **Step 1: 注册路由** + +在 `protected_routes()` 中添加: + +```rust +// 设备数据路由 +.nest("/health", Router::new() + // ... 现有路由 + .route("/patients/{patient_id}/device-readings/batch", + post(device_reading_handler::batch_create)) + .route("/patients/{patient_id}/device-readings", + get(device_reading_handler::list_readings)) + .route("/patients/{patient_id}/device-readings/hourly", + get(device_reading_handler::list_hourly)) + // 告警路由在 Phase 2 Task 16 中注册 +) +``` + +- [ ] **Step 2: 注册权限** + +在 `permissions()` 中添加: + +```rust +PermissionDescriptor { + code: "health.device-readings.list".into(), + name: "查看设备数据".into(), + description: Some("查看患者的设备采集数据".into()), +}, +PermissionDescriptor { + code: "health.device-readings.manage".into(), + name: "管理设备数据".into(), + description: Some("提交设备采集数据".into()), +}, +PermissionDescriptor { + code: "health.alerts.list".into(), + name: "查看告警".into(), + description: Some("查看告警记录".into()), +}, +PermissionDescriptor { + code: "health.alerts.manage".into(), + name: "管理告警".into(), + description: Some("确认/处置告警".into()), +}, +PermissionDescriptor { + code: "health.alert-rules.list".into(), + name: "查看告警规则".into(), + description: Some("查看告警规则配置".into()), +}, +PermissionDescriptor { + code: "health.alert-rules.manage".into(), + name: "管理告警规则".into(), + description: Some("创建/编辑/启停告警规则".into()), +}, +``` + +- [ ] **Step 3: 注册事件订阅** + +在 `register_event_handlers()` 或 `on_startup()` 中添加对 `device.readings.synced` 事件的订阅(Phase 2 告警引擎消费)。 + +- [ ] **Step 4: `cargo check` + `cargo test --workspace` + 提交** + +```bash +git add crates/erp-health/src/module.rs +git commit -m "feat(health): 注册设备数据路由+权限+事件订阅" +``` + +--- + +## Chunk 5: 告警引擎(Task 15-16) + +### Task 15: alert_engine — 规则评估核心 + +**Files:** +- Create: `crates/erp-health/src/service/alert_engine.rs` + +- [ ] **Step 1: 创建告警引擎** + +核心函数 `evaluate_rules(db, tenant_id, patient_id, device_type, event_bus) -> Vec`: + +1. `load_active_rules()` — 查询 `alert_rules` 表 +2. 遍历规则,检查冷却期(查询最近一条同规则同患者的 alert) +3. 按条件类型调用 `evaluate_single()` / `evaluate_consecutive()` / `evaluate_trend()` +4. 匹配则 `create_alert()` + `event_bus.publish("alert.triggered", ...)` + +滑动窗口查询使用 raw SQL 查询 `vital_signs_hourly` 表。 + +- [ ] **Step 2: 在 mod.rs 导出 + `cargo check` + 提交** + +```bash +git add crates/erp-health/src/service/alert_engine.rs crates/erp-health/src/service/mod.rs +git commit -m "feat(health): alert_engine — 规则评估核心(单次/连续/趋势)" +``` + +--- + +### Task 16: alert_service + alert_rule_service — CRUD + +**Files:** +- Create: `crates/erp-health/src/service/alert_service.rs` +- Create: `crates/erp-health/src/handler/alert_handler.rs` +- Create: `crates/erp-health/src/handler/alert_rule_handler.rs` + +- [ ] **Step 1: alert_service** + +- `list_alerts()` — 分页查询 alerts 表 +- `acknowledge_alert()` — 状态转换 pending→acknowledged +- `resolve_alert()` — 状态转换 acknowledged→resolved +- `dismiss_alert()` — 状态转换 pending/acknowledged→dismissed + +- [ ] **Step 2: alert_rule_service** + +- `list_rules()` — 分页查询 alert_rules +- `create_rule()` — 校验 + 插入 +- `update_rule()` — 乐观锁更新 +- `deactivate_rule()` — 设置 is_active=false + +- [ ] **Step 3: handlers** + +遵循 `device_reading_handler.rs` 模式。 + +- [ ] **Step 4: 在 module.rs 中注册路由(如果 Task 14 未包含则在此注册)** + +- [ ] **Step 5: `cargo check` + `cargo test` + 提交** + +```bash +git add crates/erp-health/src/service/alert_service.rs crates/erp-health/src/handler/alert_handler.rs crates/erp-health/src/handler/alert_rule_handler.rs +git commit -m "feat(health): 告警 CRUD — 规则管理+告警确认+处置" +``` + +--- + +## Chunk 6: SSE 扩展(Task 17) + +### Task 17: SSE Handler 扩展 + 前端监听 + +**Files:** +- Modify: `crates/erp-message/src/handler/sse_handler.rs` +- Modify: `apps/web/src/stores/message.ts` + +- [ ] **Step 1: 改造 SSE Handler** + +将 `subscribe_filtered("message.sent")` 改为 `subscribe()` 全量订阅,在 stream 中按 event_type 分发: + +- `message.sent` → SSE event: `message`(已有逻辑) +- `alert.triggered` → SSE event: `alert`(检查 payload 中的 attending_doctor 匹配当前用户) +- `device.readings.synced` → SSE event: `vital_update`(同上) + +- [ ] **Step 2: 前端 message store 扩展** + +在 `connectSSE()` 中添加 `eventSource.addEventListener('alert', ...)` 和 `eventSource.addEventListener('vital_update', ...)`。 + +- [ ] **Step 3: `cargo check` + `pnpm build` + 提交** + +```bash +git add crates/erp-message/src/handler/sse_handler.rs apps/web/src/stores/message.ts +git commit -m "feat: SSE 推送扩展 — 告警+体征更新事件" +``` + +--- + +## Chunk 7: 小程序 BLE 模块(Task 18-20) + +### Task 18: BLE 类型 + BLEManager + +**Files:** +- Create: `apps/miniprogram/src/services/ble/types.ts` +- Create: `apps/miniprogram/src/services/ble/BLEManager.ts` + +- [ ] **Step 1: 创建 types.ts** + +定义 `NormalizedReading`, `DeviceType`, `DeviceAdapter`, `BLEDevice`, `BLEConnection`, `SyncResult` 接口。 + +- [ ] **Step 2: 创建 BLEManager.ts** + +实现设备注册、扫描、连接、断开、同步的主入口。使用 Taro BLE API。 + +- [ ] **Step 3: 提交** + +```bash +git add apps/miniprogram/src/services/ble/ +git commit -m "feat(miniprogram): BLE 类型定义 + 连接管理器" +``` + +--- + +### Task 19: 小米手环 Adapter + +**Files:** +- Create: `apps/miniprogram/src/services/ble/adapters/XiaomiBandAdapter.ts` + +- [ ] **Step 1: 实现 Adapter** + +实现 `DeviceAdapter` 接口,支持 Mi Band 7/8 的心率数据读取。 + +首版可使用标准 Heart Rate Service UUID (`0x180D`) + Heart Rate Measurement Characteristic (`0x2A37`)。 + +- [ ] **Step 2: 提交** + +```bash +git add apps/miniprogram/src/services/ble/adapters/XiaomiBandAdapter.ts +git commit -m "feat(miniprogram): 小米手环 BLE 适配器 — 心率读取" +``` + +--- + +### Task 20: 设备同步页面 + API + +**Files:** +- Create: `apps/miniprogram/src/services/device-sync.ts` +- Create: `apps/miniprogram/src/pages/device-sync/index.tsx` +- Create: `apps/miniprogram/src/pages/device-sync/index.config.ts` + +- [ ] **Step 1: 创建 device-sync.ts API 层** + +封装 `POST /health/patients/{id}/device-readings/batch` 调用。 + +- [ ] **Step 2: 创建设备同步页面** + +包含:设备扫描列表、连接状态、同步进度、同步结果展示。 + +- [ ] **Step 3: 在 app.config.ts 中注册页面** + +- [ ] **Step 4: 提交** + +```bash +git add apps/miniprogram/src/services/device-sync.ts apps/miniprogram/src/pages/device-sync/ +git commit -m "feat(miniprogram): 设备同步页面 — 扫描+连接+数据上传" +``` + +--- + +## Chunk 8: Web 前端 + 集成验证(Task 21-22) + +### Task 21: Web 端 API + 告警通知 UI + +**Files:** +- Create: `apps/web/src/api/health/deviceReadings.ts` +- Create: `apps/web/src/api/health/alerts.ts` + +- [ ] **Step 1: 创建 API 文件** + +deviceReadings.ts:`queryReadings()`, `queryHourlyReadings()` +alerts.ts:`listAlerts()`, `acknowledgeAlert()`, `listAlertRules()`, `createAlertRule()` + +- [ ] **Step 2: 提交** + +```bash +git add apps/web/src/api/health/deviceReadings.ts apps/web/src/api/health/alerts.ts +git commit -m "feat(web): 设备数据+告警 API 层" +``` + +--- + +### Task 22: 端到端集成验证 + +- [ ] **Step 1: `cargo check --workspace`** + +Expected: 编译通过,0 错误 + +- [ ] **Step 2: `cargo test --workspace`** + +Expected: 所有测试通过 + +- [ ] **Step 3: 启动后端** + +Run: `cd crates/erp-server && cargo run` + +Expected: 服务启动,日志中看到新路由注册 + +- [ ] **Step 4: Swagger 验证** + +访问 `http://localhost:3000/api/docs/openapi.json`,确认新端点出现在文档中 + +- [ ] **Step 5: API 手动测试** + +用 Swagger UI 测试: +1. `POST /health/patients/{id}/device-readings/batch` — 提交测试数据 +2. `GET /health/patients/{id}/device-readings` — 查询原始数据 +3. `GET /health/patients/{id}/device-readings/hourly` — 查询降采样 + +- [ ] **Step 6: 前端构建验证** + +Run: `cd apps/web && pnpm build` +Expected: 构建通过 + +- [ ] **Step 7: 最终提交** + +```bash +git add -A +git commit -m "feat(health): 实时体征采集 Phase 1 — 完整链路验证通过" +git push +``` + +--- + +## Phase 2 实施计划(概要,Phase 1 完成后细化) + +Phase 2 任务基于 Phase 1 的基础设施,在 `module.rs` 的 `on_startup()` 中: + +1. 注册 `device.readings.synced` 事件订阅 → 调用 `alert_engine::evaluate_rules()` +2. 实现 SSE 推送的医生匹配逻辑(查询患者的 attending_doctor) +3. Web 前端告警通知组件 + 体征实时看板 +4. 告警规则管理页面(Ant Design ProTable) + +--- + +## Phase 3 实施计划(概要) + +1. 更多 BLE Adapter(华为/OPPO 手环) +2. 降采样后台修正任务(每 6 小时) +3. 数据清理任务(自动 DROP 超期分区) +4. 周/月趋势对比看板