@@ -0,0 +1,968 @@
# HMS 实时体征采集与智能告警系统 — 设计规格
> 日期: 2026-04-26 | 状态: Draft | 作者: Claude + 用户协作
---
## 1. 背景与目标
### 1.1 业务背景
HMS 平台当前已实现体征数据的手动录入和危急值阈值告警。但慢病患者(高血压、糖尿病、肾病)的健康数据采集严重依赖患者主动录入,依从性差、数据稀疏、无法捕捉短时波动。
健康手环(如小米手环、华为手环)的普及率为 **24 小时被动采集 ** 提供了硬件基础。心率、血氧、步数、睡眠等数据可连续产生,通过小程序 BLE 同步到 HMS, 实现:
- **从"主动录入"到"被动采集"** — 患者无感,数据密度提升 100 倍
- **从"单次阈值"到"趋势预警"** — 连续超标、趋势恶化等规则引擎
- **从"事后查看"到"实时看板"** — 医生通过 SSE 实时感知患者状态变化
### 1.2 目标用户与场景
| 用户 | 场景 | 价值 |
|------|------|------|
| **慢病患者 ** | 佩戴手环,打开小程序自动同步数据 | 免去手动录入,持续监测 |
| **主管医生 ** | Web 端实时看板查看患者体征趋势 | 及时发现异常,干预更早 |
| **护士站 ** | 接收高危告警推送 | 快速响应危急情况 |
| **体检中心 ** | 出具体检报告时参考连续监测数据 | 报告更全面 |
### 1.3 成功指标
| 指标 | 基线 | 目标 |
|------|------|------|
| 体征数据日均采集量 | ~10 条/患者(手动) | > 1000 条/患者(手环) |
| 危急值告警延迟 | 分钟级(轮询) | < 5 秒( SSE 推送) |
| 趋势预警准确率 | N/A( 无此能力) | > 85%(首版) |
| 患者数据同步频率 | 不定(手动) | 每天至少 1 次(自动) |
### 1.4 范围边界
**做: **
- 小程序 BLE 采集模块( DeviceAdapter 抽象 + 手环适配)
- 后端设备数据摄入 API( 批量提交 + 降采样)
- 告警规则引擎(单次阈值 + 连续超标 + 趋势恶化)
- SSE 推送扩展(体征更新 + 告警通知)
- 数据分区与降采样策略
**不做(本设计不覆盖): **
- ICU 级实时监控(亚秒级,需要专用系统)
- 医疗设备网关( HL7/串口,场景不同)
- AI 驱动的异常检测( erp-ai 模块职责)
- 非手环设备适配(血压计、血糖仪 — 后续扩展)
- 多实例部署的事件总线演进(单实例足够)
---
## 2. 系统架构
### 2.1 端到端数据流
```
健康手环 ─BLE批量同步→ 微信小程序 ─REST API→ HMS后端
│
┌─────────────┼──────────────┐
│ │ │
快速路径(同步) 异步路径 SSE推送
校验+存储+事件 告警引擎 体征更新
(< 200ms) 统计聚合 告警通知
趋势分析
│ │ │
▼ ▼ ▼
PostgreSQL EventBus 医生Web端
(分区+降采样) → 消息模块 实时看板
```
### 2.2 模块划分与职责
| 模块 | 位置 | 职责 |
|------|------|------|
| **BLE 采集层 ** | 小程序 `services/ble/` | 设备发现、连接、数据读取、格式化 |
| **DeviceAdapter ** | 小程序 `services/ble/adapters/` | 统一接口,屏蔽设备差异 |
| **摄入 API ** | `erp-health` handler | 批量接收、校验、存储、降采样触发 |
| **降采样 Pipeline ** | `erp-health` service | 原始数据聚合为小时/日级汇总 |
| **告警引擎 ** | `erp-health` service | 规则加载、评估、告警生成 |
| **SSE 扩展 ** | `erp-message` handler | 新增体征/告警事件推送 |
| **EventBus 事件 ** | `erp-core` | 新增 `device.readings.synced` 、`alert.triggered` 事件类型 |
### 2.3 与现有 HMS 架构的集成点
| 集成点 | 现有 | 扩展 |
|--------|------|------|
| EventBus | `tokio::broadcast` 进程内 | 新增事件类型,不改架构 |
| SSE Handler | 仅推送 `message.sent` | 扩展支持 `device.readings.synced` 、`alert.triggered` |
| vital_signs 表 | 手动录入数据 | 新增 `device_readings` 表存储高频数据,`vital_signs` 保持不变 |
| 通知分发 | `erp-message` 内联 | 复用现有通知管道,新增告警通知类型 |
| 小程序 | Taro 4.2 + React 18 | 新增 BLE 采集服务层 |
### 2.4 技术选型与权衡
| 决策 | 选择 | 理由 |
|------|------|------|
| 设备通信 | 微信 BLE API | 小程序原生支持,无需额外 SDK |
| 数据传输 | REST API 批量提交 | 简单可靠,实时性依赖患者操作频率(每天 1-2 次) |
| 降采样存储 | PostgreSQL 窗口函数 | 无需引入 TimescaleDB/InfluxDB, 现有栈即可 |
| 告警引擎 | 规则 + 滑动窗口 SQL | 覆盖 80% 临床场景,不需要流式计算框架 |
| 数据分区 | PostgreSQL RANGE 分区 | 按月分区 `device_readings` , 90 天保留原始数据 |
| SSE 推送 | 扩展现有 SSE handler | 不引入 WebSocket( 单向推送够用) |
---
## 3. 数据模型
### 3.1 原始设备数据表 (device_readings)
存储从手环同步的原始高频数据,按 `measured_at` 做月分区。
``` sql
CREATE TABLE device_readings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid ( ) ,
tenant_id UUID NOT NULL ,
patient_id UUID NOT NULL ,
device_id VARCHAR ( 64 ) , -- 设备唯一标识( MAC 地址哈希)
device_type VARCHAR ( 32 ) NOT NULL , -- 'heart_rate', 'blood_oxygen', 'steps', 'sleep', 'temperature', 'stress'
device_model VARCHAR ( 64 ) , -- 'Xiaomi Band 8', 'Huawei Band 9'
raw_value JSONB NOT NULL , -- 原始值 {"hr": 72, "confidence": 0.95}
measured_at TIMESTAMPTZ NOT NULL , -- 设备上的测量时间
created_at TIMESTAMPTZ DEFAULT NOW ( ) ,
deleted_at TIMESTAMPTZ -- 软删除
) PARTITION BY RANGE ( measured_at ) ;
-- 按月分区模板(自动创建未来 3 个月)
CREATE TABLE device_readings_2026_05 PARTITION OF device_readings
FOR VALUES FROM ( ' 2026-05-01 ' ) TO ( ' 2026-06-01 ' ) ;
-- ... 每月一个分区
-- 核心索引
CREATE INDEX idx_dr_tenant_patient ON device_readings ( tenant_id , patient_id , measured_at DESC ) ;
CREATE INDEX idx_dr_device_type ON device_readings ( tenant_id , device_type , measured_at DESC ) ;
-- 注意:不包含 updated_by / version 字段
-- 原始设备数据是不可变的(写入后不修改),不需要乐观锁
```
**JSONB raw_value 结构(按 device_type) : **
``` jsonc
// heart_rate
{ "hr" : 72 , "confidence" : 0.95 }
// blood_oxygen
{ "spo2" : 98 }
// steps
{ "steps" : 8500 , "distance_m" : 6200 , "calories" : 320 }
// sleep
{ "stage" : "deep" , "duration_min" : 45 }
// temperature
{ "temp_celsius" : 36.5 }
```
### 3.2 降采样表 (vital_signs_hourly)
存储小时级聚合数据,长期保留,用于趋势查询和告警引擎。
``` sql
CREATE TABLE vital_signs_hourly (
id UUID PRIMARY KEY DEFAULT gen_random_uuid ( ) ,
tenant_id UUID NOT NULL ,
patient_id UUID NOT NULL ,
device_type VARCHAR ( 32 ) NOT NULL ,
hour_start TIMESTAMPTZ NOT NULL , -- 小时起始时间
min_val FLOAT ,
max_val FLOAT ,
avg_val FLOAT NOT NULL ,
sample_count INT NOT NULL DEFAULT 1 ,
created_at TIMESTAMPTZ DEFAULT NOW ( ) ,
updated_at TIMESTAMPTZ DEFAULT NOW ( ) ,
version INT NOT NULL DEFAULT 1 ,
UNIQUE ( tenant_id , patient_id , device_type , hour_start )
) ;
-- 告警引擎和趋势查询的主要读取路径
CREATE INDEX idx_vsh_tenant_patient ON vital_signs_hourly
( tenant_id , patient_id , device_type , hour_start DESC ) ;
```
### 3.3 告警规则表 (alert_rules)
存储可配置的告警规则,按租户隔离。
``` sql
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid ( ) ,
tenant_id UUID NOT NULL ,
name VARCHAR ( 128 ) NOT NULL , -- "心率持续过高"
description TEXT ,
device_type VARCHAR ( 32 ) NOT NULL , -- 触发的指标类型
condition_type VARCHAR ( 32 ) NOT NULL , -- 'single_threshold' / 'consecutive' / 'trend'
-- 阈值条件 (JSONB 灵活存储不同类型规则的参数)
condition_params JSONB NOT NULL ,
-- single_threshold: { "direction": "above", "value": 100 }
-- consecutive: { "count": 3, "direction": "above", "value": 140 }
-- trend: { "window_hours": 168, "delta": 20, "direction": "up" }
severity VARCHAR ( 16 ) NOT NULL DEFAULT ' warning ' , -- 'info' / 'warning' / 'critical' / 'urgent'
is_active BOOLEAN NOT NULL DEFAULT true ,
-- 适用范围(可选过滤)
apply_tags JSONB , -- ["hypertension", "diabetes"] 患者标签
notify_roles JSONB NOT NULL DEFAULT ' ["attending_doctor"] ' ,
cooldown_minutes INT NOT NULL DEFAULT 60 , -- 同一规则对同一患者的告警冷却时间
created_at TIMESTAMPTZ DEFAULT NOW ( ) ,
updated_at TIMESTAMPTZ DEFAULT NOW ( ) ,
created_by UUID ,
updated_by UUID ,
deleted_at TIMESTAMPTZ ,
version INT NOT NULL DEFAULT 1
) ;
```
### 3.4 告警记录表 (alerts)
``` sql
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid ( ) ,
tenant_id UUID NOT NULL ,
patient_id UUID NOT NULL ,
rule_id UUID NOT NULL REFERENCES alert_rules ( id ) ON DELETE RESTRICT , -- 禁止硬删除规则
severity VARCHAR ( 16 ) NOT NULL ,
title VARCHAR ( 256 ) NOT NULL , -- "心率持续过高 - 患者张三"
detail JSONB NOT NULL , -- 告警上下文
-- { "metric": "heart_rate", "values": [105, 108, 112], "threshold": 100, "window": "1h" }
status VARCHAR ( 16 ) NOT NULL DEFAULT ' pending ' , -- 'pending' / 'acknowledged' / 'resolved' / 'dismissed'
acknowledged_by UUID ,
acknowledged_at TIMESTAMPTZ ,
resolved_at TIMESTAMPTZ ,
created_at TIMESTAMPTZ DEFAULT NOW ( ) ,
updated_at TIMESTAMPTZ DEFAULT NOW ( ) ,
deleted_at TIMESTAMPTZ ,
version INT NOT NULL DEFAULT 1
) ;
CREATE INDEX idx_alerts_tenant_patient ON alerts ( tenant_id , patient_id , created_at DESC ) ;
CREATE INDEX idx_alerts_status ON alerts ( tenant_id , status , created_at DESC ) ;
```
### 3.5 数据保留与清理策略
| 数据层级 | 保留期 | 清理方式 |
|----------|--------|---------|
| `device_readings` (原始) | 90 天 | 超期分区 DROP( 月级) |
| `vital_signs_hourly` (小时) | 2 年 | 后台任务清理 |
| `vital_signs` (日级,已有) | 永久 | 不清理 |
| `alerts` (告警) | 1 年 | 软删除 + 归档 |
降采样 Pipeline 触发时机:
1. **写入时同步降采样 ** — 原始数据写入后,立即 upsert 对应小时聚合记录
2. **后台修正任务 ** — 每 6 小时运行一次,修正因乱序数据导致的聚合偏差
### 3.6 与现有 vital_signs 表的关系
**并存不合并。 ** 两张表服务于不同场景:
| 特征 | vital_signs( 现有) | device_readings( 新增) |
|------|-------------------|----------------------|
| 数据来源 | 手动录入 / 医护录入 | 设备自动采集 |
| 数据频率 | 每天 1-3 条 | 每天数百到数千条 |
| 指标类型 | 血压、血糖、体重、尿量等 | 心率、血氧、步数、睡眠等 |
| 用途 | 门诊/查房记录 | 连续监测与趋势分析 |
查询 API 提供统一视图,内部按 `source` 字段路由到对应表。
---
## 4. 小程序 BLE 采集模块
### 4.1 DeviceAdapter 统一接口
``` typescript
// services/ble/types.ts
/** 统一的标准化读数 */
interface NormalizedReading {
patientId : string ;
deviceType : DeviceType ;
deviceModel : string ; // 'Xiaomi Band 8'
deviceId : string ; // 设备唯一标识
values : Record < string , number > ; // { hr: 72, confidence: 0.95 }
measuredAt : string ; // ISO 8601
source : 'ble_device' ;
}
type DeviceType =
| 'heart_rate' | 'blood_oxygen' | 'steps'
| 'sleep' | 'temperature' | 'stress' ;
/** 设备适配器统一接口 */
interface DeviceAdapter {
/** 设备类型标识 */
readonly deviceType : DeviceType ;
/** 支持的设备型号(用于自动识别) */
readonly supportedModels : string [ ] ;
/** 服务 UUID( 用于 BLE 扫描过滤) */
readonly serviceUUIDs : string [ ] ;
/** 发现设备 */
discover ( timeoutMs? : number ) : Promise < BLEDevice [ ] > ;
/** 连接设备 */
connect ( deviceId : string ) : Promise < BLEConnection > ;
/** 批量读取历史数据 */
readHistory ( conn : BLEConnection , since : Date ) : Promise < NormalizedReading [ ] > ;
/** 断开连接 */
disconnect ( conn : BLEConnection ) : Promise < void > ;
}
interface BLEDevice {
deviceId : string ;
name : string ;
rssi : number ;
model? : string ; // 从广播数据解析
}
interface BLEConnection {
deviceId : string ;
connected : boolean ;
}
```
### 4.2 健康手环 Adapter 实现
以小米手环为例(首版支持):
``` typescript
// services/ble/adapters/XiaomiBandAdapter.ts
class XiaomiBandAdapter implements DeviceAdapter {
readonly deviceType = 'heart_rate' ; // 首版仅支持心率
readonly supportedModels = [ 'Mi Band 8' , 'Mi Band 7' , 'Xiaomi Smart Band 8' ] ;
readonly serviceUUIDs = [
'0000180d-0000-1000-8000-00805f9b34fb' , // Heart Rate Service (标准)
// 小米私有服务 UUID
] ;
async discover ( timeoutMs = 10000 ) : Promise < BLEDevice [ ] > {
// 使用 Taro.openBluetoothAdapter + startBluetoothDevicesDiscovery
// 过滤 supportedModels 中的设备名
}
async readHistory ( conn : BLEConnection , since : Date ) : Promise < NormalizedReading [ ] > {
// 小米手环私有协议:读取活动数据特征值
// 解析二进制协议 → NormalizedReading[]
// 按时间过滤 since 之后的数据
}
}
```
### 4.3 BLE 连接管理与错误恢复
``` typescript
// services/ble/BLEManager.ts
class BLEManager {
private adapters : Map < string , DeviceAdapter > = new Map ( ) ;
private connection : BLEConnection | null = null ;
/** 注册适配器 */
registerAdapter ( adapter : DeviceAdapter ) : void ;
/** 自动扫描并识别设备 */
async scanAndIdentify ( ) : Promise < { adapter : DeviceAdapter ; device : BLEDevice } | null > ;
/** 同步数据(主入口) */
async syncData ( patientId : string , since : Date ) : Promise < SyncResult > {
try {
// 1. 扫描 → 识别设备 → 选择 Adapter
// 2. 连接 → 读取历史数据 → 格式化
// 3. 批量提交到 HMS API
// 4. 断开连接
// 5. 返回同步结果
} catch ( error ) {
// 错误恢复:连接失败 → 重试 1 次 → 提示用户
// 数据提交失败 → 本地缓存 → 下次同步时补传
}
}
}
interface SyncResult {
success : boolean ;
readingsCount : number ;
dateRange : { from : string ; to : string } ;
errors? : string [ ] ;
}
```
### 4.4 数据同步流程
```
用户打开小程序 → 检查蓝牙权限
│
▼
自动扫描附近设备 → 匹配已注册 Adapter?
│ │
Yes No → 提示"未发现设备"
▼
BLE 连接 → 读取上次同步时间
│
▼
批量读取 since 上次同步的数据
│
▼
POST /api/v1/health/device-readings/batch
│ │
成功 失败 → 缓存到本地存储
│ │
▼ ▼
更新同步时间戳 下次同步时补传
显示同步结果
```
### 4.5 数据格式规范化
Adapter 负责将设备原始数据翻译为 `NormalizedReading` :
```
小米手环原始数据 (二进制):
[0x01, 0x48, 0x00, ...] → 心率 72 bpm
Adapter 翻译:
NormalizedReading {
deviceType: 'heart_rate',
values: { hr: 72 },
measuredAt: '2026-05-15T08:30:00Z',
source: 'ble_device'
}
```
### 4.6 离线缓存策略
使用小程序本地存储( Taro.setStorageSync) 缓存未提交的数据:
``` typescript
interface PendingSync {
readings : NormalizedReading [ ] ;
createdAt : string ;
retryCount : number ;
}
// 缓存 key: `pending_sync_${patientId}`
// 最大缓存: 5000 条(超出时丢弃最旧的)
// 提交成功后清除缓存
// 下次 syncData 时检查缓存,优先补传
```
---
## 5. 后端 API 设计
### 5.1 设备数据摄入 API
**批量提交端点: **
```
POST /api/v1/health/device-readings/batch
Authorization: Bearer <token>
Content-Type: application/json
Request:
{
"deviceId": "abc123",
"deviceModel": "Xiaomi Band 8",
"readings": [
{
"deviceType": "heart_rate",
"values": { "hr": 72 },
"measuredAt": "2026-05-15T08:30:00Z"
}
// ... 最多 500 条/请求
]
}
Response 200:
{
"success": true,
"data": {
"accepted": 498,
"duplicates": 2,
"earliest": "2026-05-14T22:00:00Z",
"latest": "2026-05-15T08:30:00Z"
}
}
```
**去重机制: ** 以 `(patient_id, device_id, device_type, measured_at)` 为唯一约束,冲突时忽略(幂等)。
**限流: ** 单用户每分钟最多 10 次批量提交。
### 5.2 快速路径 vs 异步路径
``` rust
// erp-health/src/service/device_reading_service.rs
pub async fn batch_create_readings (
db : & DatabaseConnection ,
event_bus : & EventBus ,
tenant_id : Uuid ,
patient_id : Uuid ,
readings : Vec < DeviceReadingInput > ,
) -> AppResult < BatchResult > {
// ── 快速路径(同步,< 200ms) ──
// 1. 校验患者存在性
let patient = find_patient ( db , tenant_id , patient_id ) . await ? ;
// 1.5 校验设备绑定关系(防止数据注入)
verify_device_binding ( db , tenant_id , patient_id , & device_id ) . await ? ;
// 2. 批量插入( ON CONFLICT DO NOTHING 去重)
let inserted = batch_insert_readings ( db , tenant_id , patient_id , & readings ) . await ? ;
// 3. 同步降采样 upsert( 每条原始数据更新对应小时聚合)
upsert_hourly_aggregates ( db , tenant_id , patient_id , & readings ) . await ? ;
// 4. 发布 EventBus 事件(异步消费者处理告警和推送)
// 注意: publish 签名需要 db 参数用于 outbox 持久化
event_bus . publish ( DomainEvent {
event_type : " device.readings.synced " . into ( ) ,
tenant_id ,
payload : json ! ( { " patient_id " : patient_id , " count " : inserted } ) ,
} , db ) . await ? ;
Ok ( BatchResult { accepted : inserted , .. } )
}
```
``` rust
// 异步消费者( erp-health 事件订阅)
// 监听 "device.readings.synced" 事件
async fn on_readings_synced ( event : DomainEvent ) {
// 1. 加载该患者适用的告警规则
// 2. 逐规则评估(滑动窗口查询)
// 3. 匹配规则 → 生成 Alert → 发布 alert.triggered 事件
// 4. 统计聚合更新(异步,不阻塞)
}
```
### 5.3 降采样 Pipeline
``` rust
/// 同步降采样:写入原始数据后立即更新小时聚合
async fn upsert_hourly_aggregates (
db : & DatabaseConnection ,
tenant_id : Uuid ,
patient_id : Uuid ,
readings : & [ DeviceReadingInput ] ,
) -> AppResult < ( ) > {
// 按 (device_type, hour_start) 分组
// 每组计算 min/max/avg/count
// INSERT ... ON CONFLICT (tenant_id, patient_id, device_type, hour_start)
// DO UPDATE SET
// min_val = LEAST(min_val, EXCLUDED.min_val),
// max_val = GREATEST(max_val, EXCLUDED.max_val),
// avg_val = (avg_val * sample_count + EXCLUDED.total) / (sample_count + EXCLUDED.count),
// sample_count = sample_count + EXCLUDED.count
Ok ( ( ) )
}
```
### 5.4 查询 API
```
# 原始数据查询(最近 N 条)
GET /api/v1/health/device-readings?patient_id=xxx&device_type=heart_rate&hours=24
# 降采样数据查询(趋势图用)
GET /api/v1/health/device-readings/hourly?patient_id=xxx&device_type=heart_rate&days=7
# 告警列表
GET /api/v1/health/alerts?patient_id=xxx&status=pending
# 告警确认
PUT /api/v1/health/alerts/{id}/acknowledge
# 告警规则管理
GET/POST/PUT/DELETE /api/v1/health/alert-rules
```
### 5.5 EventBus 事件扩展
新增事件类型:
| 事件 | 触发时机 | Payload |
|------|---------|---------|
| `device.readings.synced` | 手环数据批量提交成功 | `{ patient_id, count, device_model, date_range }` |
| `alert.triggered` | 告警规则匹配 | `{ alert_id, patient_id, rule_name, severity, detail }` |
| `alert.acknowledged` | 医生确认告警 | `{ alert_id, acknowledged_by }` |
订阅关系:
| 订阅者 | 订阅事件 | 动作 |
|--------|---------|------|
| 告警引擎 | `device.readings.synced` | 规则评估 |
| SSE Handler | `alert.triggered` | 推送告警通知到医生 |
| SSE Handler | `device.readings.synced` | 推送体征更新到看板 |
| 消息模块 | `alert.triggered` | 生成站内告警消息 |
---
## 6. 告警引擎
### 6.1 规则模型
``` rust
// erp-health/src/service/alert_engine.rs
#[ derive(Debug, Clone, Deserialize) ]
pub enum AlertCondition {
/// 单次阈值判断
SingleThreshold {
direction : Direction , // Above / Below
value : f64 ,
} ,
/// 连续 N 次超标
Consecutive {
count : usize ,
direction : Direction ,
value : f64 ,
window_hours : Option < i32 > , // 可选时间窗口限制
} ,
/// 趋势变化量
Trend {
window_hours : i32 , // 观察窗口
delta : f64 , // 变化量
direction : Direction , // Up / Down
} ,
}
#[ derive(Debug, Clone, Deserialize) ]
pub enum Direction { Above , Below }
```
### 6.2 规则评估流程
``` rust
pub async fn evaluate_rules (
db : & DatabaseConnection ,
tenant_id : Uuid ,
patient_id : Uuid ,
device_type : & str ,
event_bus : & EventBus ,
) -> AppResult < Vec < Alert > > {
// 1. 加载该租户激活的规则(按 device_type 过滤)
let rules = load_active_rules ( db , tenant_id , device_type ) . await ? ;
let mut alerts = Vec ::new ( ) ;
for rule in rules {
// 检查冷却期(同一规则对同一患者)
if is_in_cooldown ( db , rule . id , patient_id , rule . cooldown_minutes ) . await ? {
continue ;
}
// 2. 按条件类型评估
let triggered = match & rule . condition {
AlertCondition ::SingleThreshold { direction , value } = > {
// 查最近一条数据判断
evaluate_single ( db , tenant_id , patient_id , & rule . device_type ,
* direction , * value ) . await ?
}
AlertCondition ::Consecutive { count , direction , value , window_hours } = > {
// 滑动窗口查询最近 N 条数据
evaluate_consecutive ( db , tenant_id , patient_id , & rule . device_type ,
* count , * direction , * value , * window_hours ) . await ?
}
AlertCondition ::Trend { window_hours , delta , direction } = > {
// 查询窗口内数据做斜率计算
evaluate_trend ( db , tenant_id , patient_id , & rule . device_type ,
* window_hours , * delta , * direction ) . await ?
}
} ;
if triggered {
let alert = create_alert ( db , tenant_id , patient_id , & rule ) . await ? ;
alerts . push ( alert ) ;
// 发布事件(通知管道 + SSE 推送)
event_bus . publish ( alert_triggered_event ( & alert ) ) . await ? ;
}
}
Ok ( alerts )
}
```
### 6.3 滑动窗口查询实现
``` sql
-- Consecutive: 连续 N 次超标
-- 查询最近 N 条记录,检查是否全部超标
SELECT avg_val FROM vital_signs_hourly
WHERE tenant_id = $ 1
AND patient_id = $ 2
AND device_type = $ 3
AND hour_start > NOW ( ) - ( $ 4 * interval ' 1 hour ' )
ORDER BY hour_start DESC
LIMIT $ 5 ;
-- Rust 侧检查:是否连续 N 条都满足条件
-- (all recent readings exceed threshold, no "break" in the sequence)
-- Trend: 窗口内趋势
-- 线性回归斜率近似:用首尾差值 / 时间跨度
WITH window_data AS (
SELECT avg_val , hour_start ,
ROW_NUMBER ( ) OVER ( ORDER BY hour_start ) as rn ,
COUNT ( * ) OVER ( ) as total
FROM vital_signs_hourly
WHERE tenant_id = $ 1 AND patient_id = $ 2 AND device_type = $ 3
AND hour_start > NOW ( ) - ( $ 4 * interval ' 1 hour ' )
ORDER BY hour_start
)
SELECT
( MAX ( avg_val ) - MIN ( avg_val ) ) as delta ,
MAX ( hour_start ) - MIN ( hour_start ) as timespan
FROM window_data ;
```
### 6.4 告警通知管道
```
Alert 触发
│
├── EventBus → SSE Handler → 医生 Web 端实时弹窗
│
├── EventBus → 消息模块 → 站内告警消息
│
└── (未来) 微信模板消息 → 医生小程序推送
```
### 6.5 规则管理 API
```
# 创建规则
POST /api/v1/health/alert-rules
{
"name": "心率持续过高",
"deviceType": "heart_rate",
"conditionType": "consecutive",
"conditionParams": {
"count": 3,
"direction": "above",
"value": 100,
"windowHours": 1
},
"severity": "warning",
"notifyRoles": ["attending_doctor"],
"cooldownMinutes": 60
}
# 查询规则
GET /api/v1/health/alert-rules?is_active=true
# 更新规则
PUT /api/v1/health/alert-rules/{id}
# 禁用规则
PUT /api/v1/health/alert-rules/{id}/deactivate
```
---
## 7. SSE 推送扩展
### 7.1 现有 SSE 架构扩展
当前 SSE Handler (`sse_handler.rs` ) 仅订阅 `message.sent` 事件。扩展方案:新增订阅源,复用现有 SSE 连接。
``` rust
// 改造 sse_handler.rs 的事件订阅
async fn sse_stream (
rx : BroadcastReceiver < DomainEvent > ,
tenant_id : Uuid ,
user_id : Uuid ,
) -> Sse < impl Stream < Item = Result < Event , Infallible > > > {
// 现有: subscribe_filtered("message.sent")
// 扩展: 改为全量 subscribe() + 自行过滤
// 原因: 新增事件前缀不同 (device. / alert. / message.),无法用单一前缀覆盖
let stream = async_stream ::stream! {
loop {
match rx . recv ( ) . await {
Ok ( event ) = > {
// 消息通知(已有)
if event . event_type = = " message.sent "
& & matches_tenant_user ( & event , tenant_id , user_id ) {
yield Ok ( Event ::default ( )
. event ( " message " )
. data ( event . payload . to_string ( ) ) ) ;
}
// 告警通知(新增)— 推送给患者的主治医生
if event . event_type = = " alert.triggered "
& & matches_tenant_doctor ( & event , tenant_id , user_id ) {
yield Ok ( Event ::default ( )
. event ( " alert " )
. data ( event . payload . to_string ( ) ) ) ;
}
// 体征更新(新增)— 推送给正在查看该患者的医生
if event . event_type = = " device.readings.synced "
& & matches_tenant_doctor ( & event , tenant_id , user_id ) {
yield Ok ( Event ::default ( )
. event ( " vital_update " )
. data ( event . payload . to_string ( ) ) ) ;
}
}
Err ( Lagged ( n ) ) = > tracing ::warn! ( " SSE lagged {n} events " ) ,
Err ( _ ) = > break ,
}
}
} ;
Sse ::new ( stream ) . keep_alive ( KeepAlive ::default ( ) )
}
```
### 7.2 新增事件类型与 SSE event 映射
| EventBus 事件 | SSE event | 推送对象 | Payload |
|---------------|-----------|---------|---------|
| `message.sent` | `message` | 消息接收者(已有) | 消息内容 |
| `alert.triggered` | `alert` | 患者的主治医生 | 告警详情 |
| `device.readings.synced` | `vital_update` | 患者的主治医生 | 同步摘要 |
### 7.3 医生端实时看板数据推送
前端监听 `vital_update` 事件,更新实时看板:
``` typescript
// apps/web 前端 SSE 监听扩展
eventSource . addEventListener ( 'vital_update' , ( e ) = > {
const data = JSON . parse ( e . data ) ;
// 更新患者详情页的体征图表(如果正在查看该患者)
updateVitalSignsChart ( data . patient_id , data . summary ) ;
} ) ;
eventSource . addEventListener ( 'alert' , ( e ) = > {
const data = JSON . parse ( e . data ) ;
// 显示告警弹窗 + 声音提示
showAlertNotification ( data ) ;
} ) ;
```
---
## 8. 多租户与安全
### 8.1 租户隔离策略
| 数据层 | 隔离方式 |
|--------|---------|
| `device_readings` | `tenant_id` 列过滤(中间件注入),分区键不含 tenant_id |
| `vital_signs_hourly` | `tenant_id` 列过滤 + UNIQUE 约束含 tenant_id |
| `alert_rules` | `tenant_id` 列过滤,每租户独立配置规则 |
| `alerts` | `tenant_id` 列过滤 |
API 端点全部走现有 JWT 中间件,`tenant_id` 从 token 提取,不需要额外开发。
### 8.2 设备认证与数据完整性
**设备绑定: ** 每个设备通过 `device_id` ( MAC 哈希)绑定到患者。首次同步时创建绑定关系,后续只接受已绑定设备的数据。
``` sql
CREATE TABLE patient_devices (
id UUID PRIMARY KEY ,
tenant_id UUID NOT NULL ,
patient_id UUID NOT NULL ,
device_id VARCHAR ( 64 ) NOT NULL , -- 设备唯一标识
device_model VARCHAR ( 64 ) ,
device_type VARCHAR ( 32 ) ,
bound_at TIMESTAMPTZ DEFAULT NOW ( ) ,
created_at TIMESTAMPTZ DEFAULT NOW ( ) ,
updated_at TIMESTAMPTZ DEFAULT NOW ( ) ,
created_by UUID ,
updated_by UUID ,
deleted_at TIMESTAMPTZ , -- 软删除(解绑时设置)
version INT NOT NULL DEFAULT 1 ,
UNIQUE ( tenant_id , patient_id , device_id )
) ;
```
**数据完整性: **
- API 通过 JWT 认证,确保只有授权用户(患者本人或医护)能提交数据
- 批量提交有大小限制( 500 条/请求) 和频率限制( 10 次/分钟/用户)
- `measured_at` 不接受未来时间(服务端校验)
### 8.3 PII 加密扩展
手环数据本身不包含 PII( 心率、步数等不是个人身份信息) , 但 `patient_id` 关联关系需要保护。
- `device_readings` 表的 `patient_id` 字段不加密(性能考虑,高频查询)
- `alerts` 表的告警详情可能包含患者姓名,走现有 PII 加密流程
- 查询 API 返回时,按现有 PII 解密规则处理
---
## 9. 实施路线图
### Phase 1: 数据管道 + 手环适配(基础能力)
**目标: ** 跑通"手环 → 小程序 → API → 数据库"的完整链路
| 任务 | 涉及模块 | 预估复杂度 |
|------|---------|-----------|
| 数据库迁移:`device_readings` 分区表 + `vital_signs_hourly` + `patient_devices` | migration | 中 |
| 后端摄入 API: `POST /device-readings/batch` + 降采样 upsert | erp-health | 高 |
| EventBus 事件扩展:`device.readings.synced` | erp-core + erp-health | 低 |
| 小程序 BLE 模块: DeviceAdapter 接口 + 小米手环 Adapter | miniprogram | 高 |
| 小程序同步页面:设备扫描 + 数据同步 UI | miniprogram | 中 |
| 查询 API: `GET /device-readings` + `/hourly` | erp-health | 中 |
**验收标准: **
- [ ] 小程序能连接小米手环并读取心率数据
- [ ] 数据通过 API 批量提交到 HMS 后端
- [ ] `device_readings` 和 `vital_signs_hourly` 表正确写入
- [ ] 手动执行降采样查询验证数据正确
### Phase 2: 告警引擎 + SSE 推送(智能预警)
**目标: ** 告警规则可配置、评估准确、推送实时
| 任务 | 涉及模块 | 预估复杂度 |
|------|---------|-----------|
| 数据库迁移:`alert_rules` + `alerts` 表 | migration | 低 |
| 告警引擎核心:规则加载 + 评估 + 生成告警 | erp-health | 高 |
| SSE 扩展:新增 `alert` / `vital_update` 事件推送 | erp-server | 中 |
| 规则管理 API: CRUD + 启停 | erp-health | 中 |
| 告警确认/处置 API | erp-health | 低 |
| 医生 Web 端告警通知 UI | web frontend | 中 |
| 医生 Web 端体征实时看板 | web frontend | 高 |
**验收标准: **
- [ ] 可通过 API 配置告警规则
- [ ] 手环数据提交后,匹配规则自动触发告警
- [ ] 告警通过 SSE 实时推送到医生端
- [ ] 医生可确认/处置告警
- [ ] 冷却期机制正常工作
### Phase 3: 高级分析 + 更多设备(扩展能力)
**目标: ** 支持更多设备类型,提供更丰富的分析能力
| 任务 | 说明 |
|------|------|
| 更多手环适配 | 华为手环、OPPO 手环等 Adapter |
| 血压计/血糖仪 BLE 适配 | 扩展 DeviceType, 新增 Adapter |
| 数据看板增强 | 周/月趋势对比、患者分组统计 |
| 降采样后台修正任务 | 修正乱序数据导致的聚合偏差 |
| 数据清理任务 | 自动 DROP 超期分区 |
| (远期) 微信模板消息推送 | 告警推送到医生小程序 |