From 29b19a90f6941a34f5e47db1f056d616d2bc6447 Mon Sep 17 00:00:00 2001 From: iven Date: Sun, 26 Apr 2026 22:14:34 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E5=AE=9E=E6=97=B6=E4=BD=93=E5=BE=81?= =?UTF-8?q?=E9=87=87=E9=9B=86=E4=B8=8E=E6=99=BA=E8=83=BD=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E8=AE=BE=E8=AE=A1=E8=A7=84=E6=A0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 健康手环 BLE 采集 → REST API 批量提交 → 降采样 Pipeline → 规则引擎告警 → SSE 实时推送的完整闭环设计。 覆盖数据模型(分区表+降采样+告警规则)、 小程序 BLE 适配器抽象、三层告警引擎、 SSE 推送扩展,分三阶段实施。 --- ...26-realtime-vital-signs-pipeline-design.md | 968 ++++++++++++++++++ 1 file changed, 968 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-26-realtime-vital-signs-pipeline-design.md diff --git a/docs/superpowers/specs/2026-04-26-realtime-vital-signs-pipeline-design.md b/docs/superpowers/specs/2026-04-26-realtime-vital-signs-pipeline-design.md new file mode 100644 index 0000000..aad8b4b --- /dev/null +++ b/docs/superpowers/specs/2026-04-26-realtime-vital-signs-pipeline-design.md @@ -0,0 +1,968 @@ +# HMS 实时体征采集与智能告警系统 — 设计规格 + +> 日期: 2026-04-26 | 状态: Draft | 作者: Claude + 用户协作 + +--- + +## 1. 背景与目标 + +### 1.1 业务背景 + +HMS 平台当前已实现体征数据的手动录入和危急值阈值告警。但慢病患者(高血压、糖尿病、肾病)的健康数据采集严重依赖患者主动录入,依从性差、数据稀疏、无法捕捉短时波动。 + +健康手环(如小米手环、华为手环)的普及率为 **24 小时被动采集** 提供了硬件基础。心率、血氧、步数、睡眠等数据可连续产生,通过小程序 BLE 同步到 HMS,实现: + +- **从"主动录入"到"被动采集"** — 患者无感,数据密度提升 100 倍 +- **从"单次阈值"到"趋势预警"** — 连续超标、趋势恶化等规则引擎 +- **从"事后查看"到"实时看板"** — 医生通过 SSE 实时感知患者状态变化 + +### 1.2 目标用户与场景 + +| 用户 | 场景 | 价值 | +|------|------|------| +| **慢病患者** | 佩戴手环,打开小程序自动同步数据 | 免去手动录入,持续监测 | +| **主管医生** | Web 端实时看板查看患者体征趋势 | 及时发现异常,干预更早 | +| **护士站** | 接收高危告警推送 | 快速响应危急情况 | +| **体检中心** | 出具体检报告时参考连续监测数据 | 报告更全面 | + +### 1.3 成功指标 + +| 指标 | 基线 | 目标 | +|------|------|------| +| 体征数据日均采集量 | ~10 条/患者(手动) | > 1000 条/患者(手环) | +| 危急值告警延迟 | 分钟级(轮询) | < 5 秒(SSE 推送) | +| 趋势预警准确率 | N/A(无此能力) | > 85%(首版) | +| 患者数据同步频率 | 不定(手动) | 每天至少 1 次(自动) | + +### 1.4 范围边界 + +**做:** + +- 小程序 BLE 采集模块(DeviceAdapter 抽象 + 手环适配) +- 后端设备数据摄入 API(批量提交 + 降采样) +- 告警规则引擎(单次阈值 + 连续超标 + 趋势恶化) +- SSE 推送扩展(体征更新 + 告警通知) +- 数据分区与降采样策略 + +**不做(本设计不覆盖):** + +- ICU 级实时监控(亚秒级,需要专用系统) +- 医疗设备网关(HL7/串口,场景不同) +- AI 驱动的异常检测(erp-ai 模块职责) +- 非手环设备适配(血压计、血糖仪 — 后续扩展) +- 多实例部署的事件总线演进(单实例足够) + +--- + +## 2. 系统架构 + +### 2.1 端到端数据流 + +``` +健康手环 ─BLE批量同步→ 微信小程序 ─REST API→ HMS后端 + │ + ┌─────────────┼──────────────┐ + │ │ │ + 快速路径(同步) 异步路径 SSE推送 + 校验+存储+事件 告警引擎 体征更新 + (< 200ms) 统计聚合 告警通知 + 趋势分析 + │ │ │ + ▼ ▼ ▼ + PostgreSQL EventBus 医生Web端 + (分区+降采样) → 消息模块 实时看板 +``` + +### 2.2 模块划分与职责 + +| 模块 | 位置 | 职责 | +|------|------|------| +| **BLE 采集层** | 小程序 `services/ble/` | 设备发现、连接、数据读取、格式化 | +| **DeviceAdapter** | 小程序 `services/ble/adapters/` | 统一接口,屏蔽设备差异 | +| **摄入 API** | `erp-health` handler | 批量接收、校验、存储、降采样触发 | +| **降采样 Pipeline** | `erp-health` service | 原始数据聚合为小时/日级汇总 | +| **告警引擎** | `erp-health` service | 规则加载、评估、告警生成 | +| **SSE 扩展** | `erp-message` handler | 新增体征/告警事件推送 | +| **EventBus 事件** | `erp-core` | 新增 `device.readings.synced`、`alert.triggered` 事件类型 | + +### 2.3 与现有 HMS 架构的集成点 + +| 集成点 | 现有 | 扩展 | +|--------|------|------| +| EventBus | `tokio::broadcast` 进程内 | 新增事件类型,不改架构 | +| SSE Handler | 仅推送 `message.sent` | 扩展支持 `device.readings.synced`、`alert.triggered` | +| vital_signs 表 | 手动录入数据 | 新增 `device_readings` 表存储高频数据,`vital_signs` 保持不变 | +| 通知分发 | `erp-message` 内联 | 复用现有通知管道,新增告警通知类型 | +| 小程序 | Taro 4.2 + React 18 | 新增 BLE 采集服务层 | + +### 2.4 技术选型与权衡 + +| 决策 | 选择 | 理由 | +|------|------|------| +| 设备通信 | 微信 BLE API | 小程序原生支持,无需额外 SDK | +| 数据传输 | REST API 批量提交 | 简单可靠,实时性依赖患者操作频率(每天 1-2 次) | +| 降采样存储 | PostgreSQL 窗口函数 | 无需引入 TimescaleDB/InfluxDB,现有栈即可 | +| 告警引擎 | 规则 + 滑动窗口 SQL | 覆盖 80% 临床场景,不需要流式计算框架 | +| 数据分区 | PostgreSQL RANGE 分区 | 按月分区 `device_readings`,90 天保留原始数据 | +| SSE 推送 | 扩展现有 SSE handler | 不引入 WebSocket(单向推送够用) | + +--- + +## 3. 数据模型 + +### 3.1 原始设备数据表 (device_readings) + +存储从手环同步的原始高频数据,按 `measured_at` 做月分区。 + +```sql +CREATE TABLE device_readings ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + patient_id UUID NOT NULL, + device_id VARCHAR(64), -- 设备唯一标识(MAC 地址哈希) + device_type VARCHAR(32) NOT NULL, -- 'heart_rate', 'blood_oxygen', 'steps', 'sleep', 'temperature', 'stress' + device_model VARCHAR(64), -- 'Xiaomi Band 8', 'Huawei Band 9' + raw_value JSONB NOT NULL, -- 原始值 {"hr": 72, "confidence": 0.95} + measured_at TIMESTAMPTZ NOT NULL, -- 设备上的测量时间 + created_at TIMESTAMPTZ DEFAULT NOW(), + deleted_at TIMESTAMPTZ -- 软删除 +) PARTITION BY RANGE (measured_at); + +-- 按月分区模板(自动创建未来 3 个月) +CREATE TABLE device_readings_2026_05 PARTITION OF device_readings + FOR VALUES FROM ('2026-05-01') TO ('2026-06-01'); +-- ... 每月一个分区 + +-- 核心索引 +CREATE INDEX idx_dr_tenant_patient ON device_readings (tenant_id, patient_id, measured_at DESC); +CREATE INDEX idx_dr_device_type ON device_readings (tenant_id, device_type, measured_at DESC); + +-- 注意:不包含 updated_by / version 字段 +-- 原始设备数据是不可变的(写入后不修改),不需要乐观锁 +``` + +**JSONB raw_value 结构(按 device_type):** + +```jsonc +// heart_rate +{ "hr": 72, "confidence": 0.95 } + +// blood_oxygen +{ "spo2": 98 } + +// steps +{ "steps": 8500, "distance_m": 6200, "calories": 320 } + +// sleep +{ "stage": "deep", "duration_min": 45 } + +// temperature +{ "temp_celsius": 36.5 } +``` + +### 3.2 降采样表 (vital_signs_hourly) + +存储小时级聚合数据,长期保留,用于趋势查询和告警引擎。 + +```sql +CREATE TABLE vital_signs_hourly ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + patient_id UUID NOT NULL, + device_type VARCHAR(32) NOT NULL, + hour_start TIMESTAMPTZ NOT NULL, -- 小时起始时间 + min_val FLOAT, + max_val FLOAT, + avg_val FLOAT NOT NULL, + sample_count INT NOT NULL DEFAULT 1, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + version INT NOT NULL DEFAULT 1, + + UNIQUE (tenant_id, patient_id, device_type, hour_start) +); + +-- 告警引擎和趋势查询的主要读取路径 +CREATE INDEX idx_vsh_tenant_patient ON vital_signs_hourly + (tenant_id, patient_id, device_type, hour_start DESC); +``` + +### 3.3 告警规则表 (alert_rules) + +存储可配置的告警规则,按租户隔离。 + +```sql +CREATE TABLE alert_rules ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + name VARCHAR(128) NOT NULL, -- "心率持续过高" + description TEXT, + device_type VARCHAR(32) NOT NULL, -- 触发的指标类型 + condition_type VARCHAR(32) NOT NULL, -- 'single_threshold' / 'consecutive' / 'trend' + -- 阈值条件 (JSONB 灵活存储不同类型规则的参数) + condition_params JSONB NOT NULL, + -- single_threshold: { "direction": "above", "value": 100 } + -- consecutive: { "count": 3, "direction": "above", "value": 140 } + -- trend: { "window_hours": 168, "delta": 20, "direction": "up" } + severity VARCHAR(16) NOT NULL DEFAULT 'warning', -- 'info' / 'warning' / 'critical' / 'urgent' + is_active BOOLEAN NOT NULL DEFAULT true, + -- 适用范围(可选过滤) + apply_tags JSONB, -- ["hypertension", "diabetes"] 患者标签 + notify_roles JSONB NOT NULL DEFAULT '["attending_doctor"]', + cooldown_minutes INT NOT NULL DEFAULT 60, -- 同一规则对同一患者的告警冷却时间 + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + created_by UUID, + updated_by UUID, + deleted_at TIMESTAMPTZ, + version INT NOT NULL DEFAULT 1 +); +``` + +### 3.4 告警记录表 (alerts) + +```sql +CREATE TABLE alerts ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id UUID NOT NULL, + patient_id UUID NOT NULL, + rule_id UUID NOT NULL REFERENCES alert_rules(id) ON DELETE RESTRICT, -- 禁止硬删除规则 + severity VARCHAR(16) NOT NULL, + title VARCHAR(256) NOT NULL, -- "心率持续过高 - 患者张三" + detail JSONB NOT NULL, -- 告警上下文 + -- { "metric": "heart_rate", "values": [105, 108, 112], "threshold": 100, "window": "1h" } + status VARCHAR(16) NOT NULL DEFAULT 'pending', -- 'pending' / 'acknowledged' / 'resolved' / 'dismissed' + acknowledged_by UUID, + acknowledged_at TIMESTAMPTZ, + resolved_at TIMESTAMPTZ, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + deleted_at TIMESTAMPTZ, + version INT NOT NULL DEFAULT 1 +); + +CREATE INDEX idx_alerts_tenant_patient ON alerts (tenant_id, patient_id, created_at DESC); +CREATE INDEX idx_alerts_status ON alerts (tenant_id, status, created_at DESC); +``` + +### 3.5 数据保留与清理策略 + +| 数据层级 | 保留期 | 清理方式 | +|----------|--------|---------| +| `device_readings`(原始) | 90 天 | 超期分区 DROP(月级) | +| `vital_signs_hourly`(小时) | 2 年 | 后台任务清理 | +| `vital_signs`(日级,已有) | 永久 | 不清理 | +| `alerts`(告警) | 1 年 | 软删除 + 归档 | + +降采样 Pipeline 触发时机: + +1. **写入时同步降采样** — 原始数据写入后,立即 upsert 对应小时聚合记录 +2. **后台修正任务** — 每 6 小时运行一次,修正因乱序数据导致的聚合偏差 + +### 3.6 与现有 vital_signs 表的关系 + +**并存不合并。** 两张表服务于不同场景: + +| 特征 | vital_signs(现有) | device_readings(新增) | +|------|-------------------|----------------------| +| 数据来源 | 手动录入 / 医护录入 | 设备自动采集 | +| 数据频率 | 每天 1-3 条 | 每天数百到数千条 | +| 指标类型 | 血压、血糖、体重、尿量等 | 心率、血氧、步数、睡眠等 | +| 用途 | 门诊/查房记录 | 连续监测与趋势分析 | + +查询 API 提供统一视图,内部按 `source` 字段路由到对应表。 + +--- + +## 4. 小程序 BLE 采集模块 + +### 4.1 DeviceAdapter 统一接口 + +```typescript +// services/ble/types.ts + +/** 统一的标准化读数 */ +interface NormalizedReading { + patientId: string; + deviceType: DeviceType; + deviceModel: string; // 'Xiaomi Band 8' + deviceId: string; // 设备唯一标识 + values: Record; // { hr: 72, confidence: 0.95 } + measuredAt: string; // ISO 8601 + source: 'ble_device'; +} + +type DeviceType = + | 'heart_rate' | 'blood_oxygen' | 'steps' + | 'sleep' | 'temperature' | 'stress'; + +/** 设备适配器统一接口 */ +interface DeviceAdapter { + /** 设备类型标识 */ + readonly deviceType: DeviceType; + /** 支持的设备型号(用于自动识别) */ + readonly supportedModels: string[]; + /** 服务 UUID(用于 BLE 扫描过滤) */ + readonly serviceUUIDs: string[]; + + /** 发现设备 */ + discover(timeoutMs?: number): Promise; + /** 连接设备 */ + connect(deviceId: string): Promise; + /** 批量读取历史数据 */ + readHistory(conn: BLEConnection, since: Date): Promise; + /** 断开连接 */ + disconnect(conn: BLEConnection): Promise; +} + +interface BLEDevice { + deviceId: string; + name: string; + rssi: number; + model?: string; // 从广播数据解析 +} + +interface BLEConnection { + deviceId: string; + connected: boolean; +} +``` + +### 4.2 健康手环 Adapter 实现 + +以小米手环为例(首版支持): + +```typescript +// services/ble/adapters/XiaomiBandAdapter.ts + +class XiaomiBandAdapter implements DeviceAdapter { + readonly deviceType = 'heart_rate'; // 首版仅支持心率 + readonly supportedModels = ['Mi Band 8', 'Mi Band 7', 'Xiaomi Smart Band 8']; + readonly serviceUUIDs = [ + '0000180d-0000-1000-8000-00805f9b34fb', // Heart Rate Service (标准) + // 小米私有服务 UUID + ]; + + async discover(timeoutMs = 10000): Promise { + // 使用 Taro.openBluetoothAdapter + startBluetoothDevicesDiscovery + // 过滤 supportedModels 中的设备名 + } + + async readHistory(conn: BLEConnection, since: Date): Promise { + // 小米手环私有协议:读取活动数据特征值 + // 解析二进制协议 → NormalizedReading[] + // 按时间过滤 since 之后的数据 + } +} +``` + +### 4.3 BLE 连接管理与错误恢复 + +```typescript +// services/ble/BLEManager.ts + +class BLEManager { + private adapters: Map = new Map(); + private connection: BLEConnection | null = null; + + /** 注册适配器 */ + registerAdapter(adapter: DeviceAdapter): void; + + /** 自动扫描并识别设备 */ + async scanAndIdentify(): Promise<{ adapter: DeviceAdapter; device: BLEDevice } | null>; + + /** 同步数据(主入口) */ + async syncData(patientId: string, since: Date): Promise { + try { + // 1. 扫描 → 识别设备 → 选择 Adapter + // 2. 连接 → 读取历史数据 → 格式化 + // 3. 批量提交到 HMS API + // 4. 断开连接 + // 5. 返回同步结果 + } catch (error) { + // 错误恢复:连接失败 → 重试 1 次 → 提示用户 + // 数据提交失败 → 本地缓存 → 下次同步时补传 + } + } +} + +interface SyncResult { + success: boolean; + readingsCount: number; + dateRange: { from: string; to: string }; + errors?: string[]; +} +``` + +### 4.4 数据同步流程 + +``` +用户打开小程序 → 检查蓝牙权限 + │ + ▼ +自动扫描附近设备 → 匹配已注册 Adapter? + │ │ + Yes No → 提示"未发现设备" + ▼ +BLE 连接 → 读取上次同步时间 + │ + ▼ +批量读取 since 上次同步的数据 + │ + ▼ +POST /api/v1/health/device-readings/batch + │ │ + 成功 失败 → 缓存到本地存储 + │ │ + ▼ ▼ +更新同步时间戳 下次同步时补传 +显示同步结果 +``` + +### 4.5 数据格式规范化 + +Adapter 负责将设备原始数据翻译为 `NormalizedReading`: + +``` +小米手环原始数据 (二进制): + [0x01, 0x48, 0x00, ...] → 心率 72 bpm + +Adapter 翻译: + NormalizedReading { + deviceType: 'heart_rate', + values: { hr: 72 }, + measuredAt: '2026-05-15T08:30:00Z', + source: 'ble_device' + } +``` + +### 4.6 离线缓存策略 + +使用小程序本地存储(Taro.setStorageSync)缓存未提交的数据: + +```typescript +interface PendingSync { + readings: NormalizedReading[]; + createdAt: string; + retryCount: number; +} + +// 缓存 key: `pending_sync_${patientId}` +// 最大缓存: 5000 条(超出时丢弃最旧的) +// 提交成功后清除缓存 +// 下次 syncData 时检查缓存,优先补传 +``` + +--- + +## 5. 后端 API 设计 + +### 5.1 设备数据摄入 API + +**批量提交端点:** + +``` +POST /api/v1/health/device-readings/batch +Authorization: Bearer +Content-Type: application/json + +Request: +{ + "deviceId": "abc123", + "deviceModel": "Xiaomi Band 8", + "readings": [ + { + "deviceType": "heart_rate", + "values": { "hr": 72 }, + "measuredAt": "2026-05-15T08:30:00Z" + } + // ... 最多 500 条/请求 + ] +} + +Response 200: +{ + "success": true, + "data": { + "accepted": 498, + "duplicates": 2, + "earliest": "2026-05-14T22:00:00Z", + "latest": "2026-05-15T08:30:00Z" + } +} +``` + +**去重机制:** 以 `(patient_id, device_id, device_type, measured_at)` 为唯一约束,冲突时忽略(幂等)。 + +**限流:** 单用户每分钟最多 10 次批量提交。 + +### 5.2 快速路径 vs 异步路径 + +```rust +// erp-health/src/service/device_reading_service.rs + +pub async fn batch_create_readings( + db: &DatabaseConnection, + event_bus: &EventBus, + tenant_id: Uuid, + patient_id: Uuid, + readings: Vec, +) -> AppResult { + // ── 快速路径(同步,< 200ms)── + + // 1. 校验患者存在性 + let patient = find_patient(db, tenant_id, patient_id).await?; + + // 1.5 校验设备绑定关系(防止数据注入) + verify_device_binding(db, tenant_id, patient_id, &device_id).await?; + + // 2. 批量插入(ON CONFLICT DO NOTHING 去重) + let inserted = batch_insert_readings(db, tenant_id, patient_id, &readings).await?; + + // 3. 同步降采样 upsert(每条原始数据更新对应小时聚合) + upsert_hourly_aggregates(db, tenant_id, patient_id, &readings).await?; + + // 4. 发布 EventBus 事件(异步消费者处理告警和推送) + // 注意:publish 签名需要 db 参数用于 outbox 持久化 + event_bus.publish(DomainEvent { + event_type: "device.readings.synced".into(), + tenant_id, + payload: json!({ "patient_id": patient_id, "count": inserted }), + }, db).await?; + + Ok(BatchResult { accepted: inserted, .. }) +} +``` + +```rust +// 异步消费者(erp-health 事件订阅) +// 监听 "device.readings.synced" 事件 + +async fn on_readings_synced(event: DomainEvent) { + // 1. 加载该患者适用的告警规则 + // 2. 逐规则评估(滑动窗口查询) + // 3. 匹配规则 → 生成 Alert → 发布 alert.triggered 事件 + // 4. 统计聚合更新(异步,不阻塞) +} +``` + +### 5.3 降采样 Pipeline + +```rust +/// 同步降采样:写入原始数据后立即更新小时聚合 +async fn upsert_hourly_aggregates( + db: &DatabaseConnection, + tenant_id: Uuid, + patient_id: Uuid, + readings: &[DeviceReadingInput], +) -> AppResult<()> { + // 按 (device_type, hour_start) 分组 + // 每组计算 min/max/avg/count + + // INSERT ... ON CONFLICT (tenant_id, patient_id, device_type, hour_start) + // DO UPDATE SET + // min_val = LEAST(min_val, EXCLUDED.min_val), + // max_val = GREATEST(max_val, EXCLUDED.max_val), + // avg_val = (avg_val * sample_count + EXCLUDED.total) / (sample_count + EXCLUDED.count), + // sample_count = sample_count + EXCLUDED.count + + Ok(()) +} +``` + +### 5.4 查询 API + +``` +# 原始数据查询(最近 N 条) +GET /api/v1/health/device-readings?patient_id=xxx&device_type=heart_rate&hours=24 + +# 降采样数据查询(趋势图用) +GET /api/v1/health/device-readings/hourly?patient_id=xxx&device_type=heart_rate&days=7 + +# 告警列表 +GET /api/v1/health/alerts?patient_id=xxx&status=pending + +# 告警确认 +PUT /api/v1/health/alerts/{id}/acknowledge + +# 告警规则管理 +GET/POST/PUT/DELETE /api/v1/health/alert-rules +``` + +### 5.5 EventBus 事件扩展 + +新增事件类型: + +| 事件 | 触发时机 | Payload | +|------|---------|---------| +| `device.readings.synced` | 手环数据批量提交成功 | `{ patient_id, count, device_model, date_range }` | +| `alert.triggered` | 告警规则匹配 | `{ alert_id, patient_id, rule_name, severity, detail }` | +| `alert.acknowledged` | 医生确认告警 | `{ alert_id, acknowledged_by }` | + +订阅关系: + +| 订阅者 | 订阅事件 | 动作 | +|--------|---------|------| +| 告警引擎 | `device.readings.synced` | 规则评估 | +| SSE Handler | `alert.triggered` | 推送告警通知到医生 | +| SSE Handler | `device.readings.synced` | 推送体征更新到看板 | +| 消息模块 | `alert.triggered` | 生成站内告警消息 | + +--- + +## 6. 告警引擎 + +### 6.1 规则模型 + +```rust +// erp-health/src/service/alert_engine.rs + +#[derive(Debug, Clone, Deserialize)] +pub enum AlertCondition { + /// 单次阈值判断 + SingleThreshold { + direction: Direction, // Above / Below + value: f64, + }, + /// 连续 N 次超标 + Consecutive { + count: usize, + direction: Direction, + value: f64, + window_hours: Option, // 可选时间窗口限制 + }, + /// 趋势变化量 + Trend { + window_hours: i32, // 观察窗口 + delta: f64, // 变化量 + direction: Direction, // Up / Down + }, +} + +#[derive(Debug, Clone, Deserialize)] +pub enum Direction { Above, Below } +``` + +### 6.2 规则评估流程 + +```rust +pub async fn evaluate_rules( + db: &DatabaseConnection, + tenant_id: Uuid, + patient_id: Uuid, + device_type: &str, + event_bus: &EventBus, +) -> AppResult> { + // 1. 加载该租户激活的规则(按 device_type 过滤) + let rules = load_active_rules(db, tenant_id, device_type).await?; + + let mut alerts = Vec::new(); + + for rule in rules { + // 检查冷却期(同一规则对同一患者) + if is_in_cooldown(db, rule.id, patient_id, rule.cooldown_minutes).await? { + continue; + } + + // 2. 按条件类型评估 + let triggered = match &rule.condition { + AlertCondition::SingleThreshold { direction, value } => { + // 查最近一条数据判断 + evaluate_single(db, tenant_id, patient_id, &rule.device_type, + *direction, *value).await? + } + AlertCondition::Consecutive { count, direction, value, window_hours } => { + // 滑动窗口查询最近 N 条数据 + evaluate_consecutive(db, tenant_id, patient_id, &rule.device_type, + *count, *direction, *value, *window_hours).await? + } + AlertCondition::Trend { window_hours, delta, direction } => { + // 查询窗口内数据做斜率计算 + evaluate_trend(db, tenant_id, patient_id, &rule.device_type, + *window_hours, *delta, *direction).await? + } + }; + + if triggered { + let alert = create_alert(db, tenant_id, patient_id, &rule).await?; + alerts.push(alert); + + // 发布事件(通知管道 + SSE 推送) + event_bus.publish(alert_triggered_event(&alert)).await?; + } + } + + Ok(alerts) +} +``` + +### 6.3 滑动窗口查询实现 + +```sql +-- Consecutive: 连续 N 次超标 +-- 查询最近 N 条记录,检查是否全部超标 +SELECT avg_val FROM vital_signs_hourly +WHERE tenant_id = $1 + AND patient_id = $2 + AND device_type = $3 + AND hour_start > NOW() - ($4 * interval '1 hour') +ORDER BY hour_start DESC +LIMIT $5; + +-- Rust 侧检查:是否连续 N 条都满足条件 +-- (all recent readings exceed threshold, no "break" in the sequence) + +-- Trend: 窗口内趋势 +-- 线性回归斜率近似:用首尾差值 / 时间跨度 +WITH window_data AS ( + SELECT avg_val, hour_start, + ROW_NUMBER() OVER (ORDER BY hour_start) as rn, + COUNT(*) OVER () as total + FROM vital_signs_hourly + WHERE tenant_id = $1 AND patient_id = $2 AND device_type = $3 + AND hour_start > NOW() - ($4 * interval '1 hour') + ORDER BY hour_start +) +SELECT + (MAX(avg_val) - MIN(avg_val)) as delta, + MAX(hour_start) - MIN(hour_start) as timespan +FROM window_data; +``` + +### 6.4 告警通知管道 + +``` +Alert 触发 + │ + ├── EventBus → SSE Handler → 医生 Web 端实时弹窗 + │ + ├── EventBus → 消息模块 → 站内告警消息 + │ + └── (未来) 微信模板消息 → 医生小程序推送 +``` + +### 6.5 规则管理 API + +``` +# 创建规则 +POST /api/v1/health/alert-rules +{ + "name": "心率持续过高", + "deviceType": "heart_rate", + "conditionType": "consecutive", + "conditionParams": { + "count": 3, + "direction": "above", + "value": 100, + "windowHours": 1 + }, + "severity": "warning", + "notifyRoles": ["attending_doctor"], + "cooldownMinutes": 60 +} + +# 查询规则 +GET /api/v1/health/alert-rules?is_active=true + +# 更新规则 +PUT /api/v1/health/alert-rules/{id} + +# 禁用规则 +PUT /api/v1/health/alert-rules/{id}/deactivate +``` + +--- + +## 7. SSE 推送扩展 + +### 7.1 现有 SSE 架构扩展 + +当前 SSE Handler (`sse_handler.rs`) 仅订阅 `message.sent` 事件。扩展方案:新增订阅源,复用现有 SSE 连接。 + +```rust +// 改造 sse_handler.rs 的事件订阅 + +async fn sse_stream( + rx: BroadcastReceiver, + tenant_id: Uuid, + user_id: Uuid, +) -> Sse>> { + // 现有: subscribe_filtered("message.sent") + // 扩展: 改为全量 subscribe() + 自行过滤 + // 原因: 新增事件前缀不同 (device. / alert. / message.),无法用单一前缀覆盖 + + let stream = async_stream::stream! { + loop { + match rx.recv().await { + Ok(event) => { + // 消息通知(已有) + if event.event_type == "message.sent" + && matches_tenant_user(&event, tenant_id, user_id) { + yield Ok(Event::default() + .event("message") + .data(event.payload.to_string())); + } + + // 告警通知(新增)— 推送给患者的主治医生 + if event.event_type == "alert.triggered" + && matches_tenant_doctor(&event, tenant_id, user_id) { + yield Ok(Event::default() + .event("alert") + .data(event.payload.to_string())); + } + + // 体征更新(新增)— 推送给正在查看该患者的医生 + if event.event_type == "device.readings.synced" + && matches_tenant_doctor(&event, tenant_id, user_id) { + yield Ok(Event::default() + .event("vital_update") + .data(event.payload.to_string())); + } + } + Err(Lagged(n)) => tracing::warn!("SSE lagged {n} events"), + Err(_) => break, + } + } + }; + + Sse::new(stream).keep_alive(KeepAlive::default()) +} +``` + +### 7.2 新增事件类型与 SSE event 映射 + +| EventBus 事件 | SSE event | 推送对象 | Payload | +|---------------|-----------|---------|---------| +| `message.sent` | `message` | 消息接收者(已有) | 消息内容 | +| `alert.triggered` | `alert` | 患者的主治医生 | 告警详情 | +| `device.readings.synced` | `vital_update` | 患者的主治医生 | 同步摘要 | + +### 7.3 医生端实时看板数据推送 + +前端监听 `vital_update` 事件,更新实时看板: + +```typescript +// apps/web 前端 SSE 监听扩展 +eventSource.addEventListener('vital_update', (e) => { + const data = JSON.parse(e.data); + // 更新患者详情页的体征图表(如果正在查看该患者) + updateVitalSignsChart(data.patient_id, data.summary); +}); + +eventSource.addEventListener('alert', (e) => { + const data = JSON.parse(e.data); + // 显示告警弹窗 + 声音提示 + showAlertNotification(data); +}); +``` + +--- + +## 8. 多租户与安全 + +### 8.1 租户隔离策略 + +| 数据层 | 隔离方式 | +|--------|---------| +| `device_readings` | `tenant_id` 列过滤(中间件注入),分区键不含 tenant_id | +| `vital_signs_hourly` | `tenant_id` 列过滤 + UNIQUE 约束含 tenant_id | +| `alert_rules` | `tenant_id` 列过滤,每租户独立配置规则 | +| `alerts` | `tenant_id` 列过滤 | + +API 端点全部走现有 JWT 中间件,`tenant_id` 从 token 提取,不需要额外开发。 + +### 8.2 设备认证与数据完整性 + +**设备绑定:** 每个设备通过 `device_id`(MAC 哈希)绑定到患者。首次同步时创建绑定关系,后续只接受已绑定设备的数据。 + +```sql +CREATE TABLE patient_devices ( + id UUID PRIMARY KEY, + tenant_id UUID NOT NULL, + patient_id UUID NOT NULL, + device_id VARCHAR(64) NOT NULL, -- 设备唯一标识 + device_model VARCHAR(64), + device_type VARCHAR(32), + bound_at TIMESTAMPTZ DEFAULT NOW(), + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + created_by UUID, + updated_by UUID, + deleted_at TIMESTAMPTZ, -- 软删除(解绑时设置) + version INT NOT NULL DEFAULT 1, + UNIQUE (tenant_id, patient_id, device_id) +); +``` + +**数据完整性:** + +- API 通过 JWT 认证,确保只有授权用户(患者本人或医护)能提交数据 +- 批量提交有大小限制(500 条/请求)和频率限制(10 次/分钟/用户) +- `measured_at` 不接受未来时间(服务端校验) + +### 8.3 PII 加密扩展 + +手环数据本身不包含 PII(心率、步数等不是个人身份信息),但 `patient_id` 关联关系需要保护。 + +- `device_readings` 表的 `patient_id` 字段不加密(性能考虑,高频查询) +- `alerts` 表的告警详情可能包含患者姓名,走现有 PII 加密流程 +- 查询 API 返回时,按现有 PII 解密规则处理 + +--- + +## 9. 实施路线图 + +### Phase 1:数据管道 + 手环适配(基础能力) + +**目标:** 跑通"手环 → 小程序 → API → 数据库"的完整链路 + +| 任务 | 涉及模块 | 预估复杂度 | +|------|---------|-----------| +| 数据库迁移:`device_readings` 分区表 + `vital_signs_hourly` + `patient_devices` | migration | 中 | +| 后端摄入 API:`POST /device-readings/batch` + 降采样 upsert | erp-health | 高 | +| EventBus 事件扩展:`device.readings.synced` | erp-core + erp-health | 低 | +| 小程序 BLE 模块:DeviceAdapter 接口 + 小米手环 Adapter | miniprogram | 高 | +| 小程序同步页面:设备扫描 + 数据同步 UI | miniprogram | 中 | +| 查询 API:`GET /device-readings` + `/hourly` | erp-health | 中 | + +**验收标准:** + +- [ ] 小程序能连接小米手环并读取心率数据 +- [ ] 数据通过 API 批量提交到 HMS 后端 +- [ ] `device_readings` 和 `vital_signs_hourly` 表正确写入 +- [ ] 手动执行降采样查询验证数据正确 + +### Phase 2:告警引擎 + SSE 推送(智能预警) + +**目标:** 告警规则可配置、评估准确、推送实时 + +| 任务 | 涉及模块 | 预估复杂度 | +|------|---------|-----------| +| 数据库迁移:`alert_rules` + `alerts` 表 | migration | 低 | +| 告警引擎核心:规则加载 + 评估 + 生成告警 | erp-health | 高 | +| SSE 扩展:新增 `alert` / `vital_update` 事件推送 | erp-server | 中 | +| 规则管理 API:CRUD + 启停 | erp-health | 中 | +| 告警确认/处置 API | erp-health | 低 | +| 医生 Web 端告警通知 UI | web frontend | 中 | +| 医生 Web 端体征实时看板 | web frontend | 高 | + +**验收标准:** + +- [ ] 可通过 API 配置告警规则 +- [ ] 手环数据提交后,匹配规则自动触发告警 +- [ ] 告警通过 SSE 实时推送到医生端 +- [ ] 医生可确认/处置告警 +- [ ] 冷却期机制正常工作 + +### Phase 3:高级分析 + 更多设备(扩展能力) + +**目标:** 支持更多设备类型,提供更丰富的分析能力 + +| 任务 | 说明 | +|------|------| +| 更多手环适配 | 华为手环、OPPO 手环等 Adapter | +| 血压计/血糖仪 BLE 适配 | 扩展 DeviceType,新增 Adapter | +| 数据看板增强 | 周/月趋势对比、患者分组统计 | +| 降采样后台修正任务 | 修正乱序数据导致的聚合偏差 | +| 数据清理任务 | 自动 DROP 超期分区 | +| (远期) 微信模板消息推送 | 告警推送到医生小程序 |