docs: 实时体征采集与智能告警系统设计规格

健康手环 BLE 采集 → REST API 批量提交 → 降采样 Pipeline →
规则引擎告警 → SSE 实时推送的完整闭环设计。
覆盖数据模型(分区表+降采样+告警规则)、
小程序 BLE 适配器抽象、三层告警引擎、
SSE 推送扩展,分三阶段实施。
This commit is contained in:
iven
2026-04-26 22:14:34 +08:00
parent 787e64d9a9
commit 29b19a90f6

View File

@@ -0,0 +1,968 @@
# HMS 实时体征采集与智能告警系统 — 设计规格
> 日期: 2026-04-26 | 状态: Draft | 作者: Claude + 用户协作
---
## 1. 背景与目标
### 1.1 业务背景
HMS 平台当前已实现体征数据的手动录入和危急值阈值告警。但慢病患者(高血压、糖尿病、肾病)的健康数据采集严重依赖患者主动录入,依从性差、数据稀疏、无法捕捉短时波动。
健康手环(如小米手环、华为手环)的普及率为 **24 小时被动采集** 提供了硬件基础。心率、血氧、步数、睡眠等数据可连续产生,通过小程序 BLE 同步到 HMS实现
- **从"主动录入"到"被动采集"** — 患者无感,数据密度提升 100 倍
- **从"单次阈值"到"趋势预警"** — 连续超标、趋势恶化等规则引擎
- **从"事后查看"到"实时看板"** — 医生通过 SSE 实时感知患者状态变化
### 1.2 目标用户与场景
| 用户 | 场景 | 价值 |
|------|------|------|
| **慢病患者** | 佩戴手环,打开小程序自动同步数据 | 免去手动录入,持续监测 |
| **主管医生** | Web 端实时看板查看患者体征趋势 | 及时发现异常,干预更早 |
| **护士站** | 接收高危告警推送 | 快速响应危急情况 |
| **体检中心** | 出具体检报告时参考连续监测数据 | 报告更全面 |
### 1.3 成功指标
| 指标 | 基线 | 目标 |
|------|------|------|
| 体征数据日均采集量 | ~10 条/患者(手动) | > 1000 条/患者(手环) |
| 危急值告警延迟 | 分钟级(轮询) | < 5 秒SSE 推送) |
| 趋势预警准确率 | N/A无此能力 | > 85%(首版) |
| 患者数据同步频率 | 不定(手动) | 每天至少 1 次(自动) |
### 1.4 范围边界
**做:**
- 小程序 BLE 采集模块DeviceAdapter 抽象 + 手环适配)
- 后端设备数据摄入 API批量提交 + 降采样)
- 告警规则引擎(单次阈值 + 连续超标 + 趋势恶化)
- SSE 推送扩展(体征更新 + 告警通知)
- 数据分区与降采样策略
**不做(本设计不覆盖):**
- ICU 级实时监控(亚秒级,需要专用系统)
- 医疗设备网关HL7/串口,场景不同)
- AI 驱动的异常检测erp-ai 模块职责)
- 非手环设备适配(血压计、血糖仪 — 后续扩展)
- 多实例部署的事件总线演进(单实例足够)
---
## 2. 系统架构
### 2.1 端到端数据流
```
健康手环 ─BLE批量同步→ 微信小程序 ─REST API→ HMS后端
┌─────────────┼──────────────┐
│ │ │
快速路径(同步) 异步路径 SSE推送
校验+存储+事件 告警引擎 体征更新
(< 200ms) 统计聚合 告警通知
趋势分析
│ │ │
▼ ▼ ▼
PostgreSQL EventBus 医生Web端
(分区+降采样) → 消息模块 实时看板
```
### 2.2 模块划分与职责
| 模块 | 位置 | 职责 |
|------|------|------|
| **BLE 采集层** | 小程序 `services/ble/` | 设备发现、连接、数据读取、格式化 |
| **DeviceAdapter** | 小程序 `services/ble/adapters/` | 统一接口,屏蔽设备差异 |
| **摄入 API** | `erp-health` handler | 批量接收、校验、存储、降采样触发 |
| **降采样 Pipeline** | `erp-health` service | 原始数据聚合为小时/日级汇总 |
| **告警引擎** | `erp-health` service | 规则加载、评估、告警生成 |
| **SSE 扩展** | `erp-message` handler | 新增体征/告警事件推送 |
| **EventBus 事件** | `erp-core` | 新增 `device.readings.synced``alert.triggered` 事件类型 |
### 2.3 与现有 HMS 架构的集成点
| 集成点 | 现有 | 扩展 |
|--------|------|------|
| EventBus | `tokio::broadcast` 进程内 | 新增事件类型,不改架构 |
| SSE Handler | 仅推送 `message.sent` | 扩展支持 `device.readings.synced``alert.triggered` |
| vital_signs 表 | 手动录入数据 | 新增 `device_readings` 表存储高频数据,`vital_signs` 保持不变 |
| 通知分发 | `erp-message` 内联 | 复用现有通知管道,新增告警通知类型 |
| 小程序 | Taro 4.2 + React 18 | 新增 BLE 采集服务层 |
### 2.4 技术选型与权衡
| 决策 | 选择 | 理由 |
|------|------|------|
| 设备通信 | 微信 BLE API | 小程序原生支持,无需额外 SDK |
| 数据传输 | REST API 批量提交 | 简单可靠,实时性依赖患者操作频率(每天 1-2 次) |
| 降采样存储 | PostgreSQL 窗口函数 | 无需引入 TimescaleDB/InfluxDB现有栈即可 |
| 告警引擎 | 规则 + 滑动窗口 SQL | 覆盖 80% 临床场景,不需要流式计算框架 |
| 数据分区 | PostgreSQL RANGE 分区 | 按月分区 `device_readings`90 天保留原始数据 |
| SSE 推送 | 扩展现有 SSE handler | 不引入 WebSocket单向推送够用 |
---
## 3. 数据模型
### 3.1 原始设备数据表 (device_readings)
存储从手环同步的原始高频数据,按 `measured_at` 做月分区。
```sql
CREATE TABLE device_readings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
patient_id UUID NOT NULL,
device_id VARCHAR(64), -- 设备唯一标识MAC 地址哈希)
device_type VARCHAR(32) NOT NULL, -- 'heart_rate', 'blood_oxygen', 'steps', 'sleep', 'temperature', 'stress'
device_model VARCHAR(64), -- 'Xiaomi Band 8', 'Huawei Band 9'
raw_value JSONB NOT NULL, -- 原始值 {"hr": 72, "confidence": 0.95}
measured_at TIMESTAMPTZ NOT NULL, -- 设备上的测量时间
created_at TIMESTAMPTZ DEFAULT NOW(),
deleted_at TIMESTAMPTZ -- 软删除
) PARTITION BY RANGE (measured_at);
-- 按月分区模板(自动创建未来 3 个月)
CREATE TABLE device_readings_2026_05 PARTITION OF device_readings
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
-- ... 每月一个分区
-- 核心索引
CREATE INDEX idx_dr_tenant_patient ON device_readings (tenant_id, patient_id, measured_at DESC);
CREATE INDEX idx_dr_device_type ON device_readings (tenant_id, device_type, measured_at DESC);
-- 注意:不包含 updated_by / version 字段
-- 原始设备数据是不可变的(写入后不修改),不需要乐观锁
```
**JSONB raw_value 结构(按 device_type**
```jsonc
// heart_rate
{ "hr": 72, "confidence": 0.95 }
// blood_oxygen
{ "spo2": 98 }
// steps
{ "steps": 8500, "distance_m": 6200, "calories": 320 }
// sleep
{ "stage": "deep", "duration_min": 45 }
// temperature
{ "temp_celsius": 36.5 }
```
### 3.2 降采样表 (vital_signs_hourly)
存储小时级聚合数据,长期保留,用于趋势查询和告警引擎。
```sql
CREATE TABLE vital_signs_hourly (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
patient_id UUID NOT NULL,
device_type VARCHAR(32) NOT NULL,
hour_start TIMESTAMPTZ NOT NULL, -- 小时起始时间
min_val FLOAT,
max_val FLOAT,
avg_val FLOAT NOT NULL,
sample_count INT NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
version INT NOT NULL DEFAULT 1,
UNIQUE (tenant_id, patient_id, device_type, hour_start)
);
-- 告警引擎和趋势查询的主要读取路径
CREATE INDEX idx_vsh_tenant_patient ON vital_signs_hourly
(tenant_id, patient_id, device_type, hour_start DESC);
```
### 3.3 告警规则表 (alert_rules)
存储可配置的告警规则,按租户隔离。
```sql
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
name VARCHAR(128) NOT NULL, -- "心率持续过高"
description TEXT,
device_type VARCHAR(32) NOT NULL, -- 触发的指标类型
condition_type VARCHAR(32) NOT NULL, -- 'single_threshold' / 'consecutive' / 'trend'
-- 阈值条件 (JSONB 灵活存储不同类型规则的参数)
condition_params JSONB NOT NULL,
-- single_threshold: { "direction": "above", "value": 100 }
-- consecutive: { "count": 3, "direction": "above", "value": 140 }
-- trend: { "window_hours": 168, "delta": 20, "direction": "up" }
severity VARCHAR(16) NOT NULL DEFAULT 'warning', -- 'info' / 'warning' / 'critical' / 'urgent'
is_active BOOLEAN NOT NULL DEFAULT true,
-- 适用范围(可选过滤)
apply_tags JSONB, -- ["hypertension", "diabetes"] 患者标签
notify_roles JSONB NOT NULL DEFAULT '["attending_doctor"]',
cooldown_minutes INT NOT NULL DEFAULT 60, -- 同一规则对同一患者的告警冷却时间
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
created_by UUID,
updated_by UUID,
deleted_at TIMESTAMPTZ,
version INT NOT NULL DEFAULT 1
);
```
### 3.4 告警记录表 (alerts)
```sql
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
patient_id UUID NOT NULL,
rule_id UUID NOT NULL REFERENCES alert_rules(id) ON DELETE RESTRICT, -- 禁止硬删除规则
severity VARCHAR(16) NOT NULL,
title VARCHAR(256) NOT NULL, -- "心率持续过高 - 患者张三"
detail JSONB NOT NULL, -- 告警上下文
-- { "metric": "heart_rate", "values": [105, 108, 112], "threshold": 100, "window": "1h" }
status VARCHAR(16) NOT NULL DEFAULT 'pending', -- 'pending' / 'acknowledged' / 'resolved' / 'dismissed'
acknowledged_by UUID,
acknowledged_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
deleted_at TIMESTAMPTZ,
version INT NOT NULL DEFAULT 1
);
CREATE INDEX idx_alerts_tenant_patient ON alerts (tenant_id, patient_id, created_at DESC);
CREATE INDEX idx_alerts_status ON alerts (tenant_id, status, created_at DESC);
```
### 3.5 数据保留与清理策略
| 数据层级 | 保留期 | 清理方式 |
|----------|--------|---------|
| `device_readings`(原始) | 90 天 | 超期分区 DROP月级 |
| `vital_signs_hourly`(小时) | 2 年 | 后台任务清理 |
| `vital_signs`(日级,已有) | 永久 | 不清理 |
| `alerts`(告警) | 1 年 | 软删除 + 归档 |
降采样 Pipeline 触发时机:
1. **写入时同步降采样** — 原始数据写入后,立即 upsert 对应小时聚合记录
2. **后台修正任务** — 每 6 小时运行一次,修正因乱序数据导致的聚合偏差
### 3.6 与现有 vital_signs 表的关系
**并存不合并。** 两张表服务于不同场景:
| 特征 | vital_signs现有 | device_readings新增 |
|------|-------------------|----------------------|
| 数据来源 | 手动录入 / 医护录入 | 设备自动采集 |
| 数据频率 | 每天 1-3 条 | 每天数百到数千条 |
| 指标类型 | 血压、血糖、体重、尿量等 | 心率、血氧、步数、睡眠等 |
| 用途 | 门诊/查房记录 | 连续监测与趋势分析 |
查询 API 提供统一视图,内部按 `source` 字段路由到对应表。
---
## 4. 小程序 BLE 采集模块
### 4.1 DeviceAdapter 统一接口
```typescript
// services/ble/types.ts
/** 统一的标准化读数 */
interface NormalizedReading {
patientId: string;
deviceType: DeviceType;
deviceModel: string; // 'Xiaomi Band 8'
deviceId: string; // 设备唯一标识
values: Record<string, number>; // { hr: 72, confidence: 0.95 }
measuredAt: string; // ISO 8601
source: 'ble_device';
}
type DeviceType =
| 'heart_rate' | 'blood_oxygen' | 'steps'
| 'sleep' | 'temperature' | 'stress';
/** 设备适配器统一接口 */
interface DeviceAdapter {
/** 设备类型标识 */
readonly deviceType: DeviceType;
/** 支持的设备型号(用于自动识别) */
readonly supportedModels: string[];
/** 服务 UUID用于 BLE 扫描过滤) */
readonly serviceUUIDs: string[];
/** 发现设备 */
discover(timeoutMs?: number): Promise<BLEDevice[]>;
/** 连接设备 */
connect(deviceId: string): Promise<BLEConnection>;
/** 批量读取历史数据 */
readHistory(conn: BLEConnection, since: Date): Promise<NormalizedReading[]>;
/** 断开连接 */
disconnect(conn: BLEConnection): Promise<void>;
}
interface BLEDevice {
deviceId: string;
name: string;
rssi: number;
model?: string; // 从广播数据解析
}
interface BLEConnection {
deviceId: string;
connected: boolean;
}
```
### 4.2 健康手环 Adapter 实现
以小米手环为例(首版支持):
```typescript
// services/ble/adapters/XiaomiBandAdapter.ts
class XiaomiBandAdapter implements DeviceAdapter {
readonly deviceType = 'heart_rate'; // 首版仅支持心率
readonly supportedModels = ['Mi Band 8', 'Mi Band 7', 'Xiaomi Smart Band 8'];
readonly serviceUUIDs = [
'0000180d-0000-1000-8000-00805f9b34fb', // Heart Rate Service (标准)
// 小米私有服务 UUID
];
async discover(timeoutMs = 10000): Promise<BLEDevice[]> {
// 使用 Taro.openBluetoothAdapter + startBluetoothDevicesDiscovery
// 过滤 supportedModels 中的设备名
}
async readHistory(conn: BLEConnection, since: Date): Promise<NormalizedReading[]> {
// 小米手环私有协议:读取活动数据特征值
// 解析二进制协议 → NormalizedReading[]
// 按时间过滤 since 之后的数据
}
}
```
### 4.3 BLE 连接管理与错误恢复
```typescript
// services/ble/BLEManager.ts
class BLEManager {
private adapters: Map<string, DeviceAdapter> = new Map();
private connection: BLEConnection | null = null;
/** 注册适配器 */
registerAdapter(adapter: DeviceAdapter): void;
/** 自动扫描并识别设备 */
async scanAndIdentify(): Promise<{ adapter: DeviceAdapter; device: BLEDevice } | null>;
/** 同步数据(主入口) */
async syncData(patientId: string, since: Date): Promise<SyncResult> {
try {
// 1. 扫描 → 识别设备 → 选择 Adapter
// 2. 连接 → 读取历史数据 → 格式化
// 3. 批量提交到 HMS API
// 4. 断开连接
// 5. 返回同步结果
} catch (error) {
// 错误恢复:连接失败 → 重试 1 次 → 提示用户
// 数据提交失败 → 本地缓存 → 下次同步时补传
}
}
}
interface SyncResult {
success: boolean;
readingsCount: number;
dateRange: { from: string; to: string };
errors?: string[];
}
```
### 4.4 数据同步流程
```
用户打开小程序 → 检查蓝牙权限
自动扫描附近设备 → 匹配已注册 Adapter
│ │
Yes No → 提示"未发现设备"
BLE 连接 → 读取上次同步时间
批量读取 since 上次同步的数据
POST /api/v1/health/device-readings/batch
│ │
成功 失败 → 缓存到本地存储
│ │
▼ ▼
更新同步时间戳 下次同步时补传
显示同步结果
```
### 4.5 数据格式规范化
Adapter 负责将设备原始数据翻译为 `NormalizedReading`
```
小米手环原始数据 (二进制):
[0x01, 0x48, 0x00, ...] → 心率 72 bpm
Adapter 翻译:
NormalizedReading {
deviceType: 'heart_rate',
values: { hr: 72 },
measuredAt: '2026-05-15T08:30:00Z',
source: 'ble_device'
}
```
### 4.6 离线缓存策略
使用小程序本地存储Taro.setStorageSync缓存未提交的数据
```typescript
interface PendingSync {
readings: NormalizedReading[];
createdAt: string;
retryCount: number;
}
// 缓存 key: `pending_sync_${patientId}`
// 最大缓存: 5000 条(超出时丢弃最旧的)
// 提交成功后清除缓存
// 下次 syncData 时检查缓存,优先补传
```
---
## 5. 后端 API 设计
### 5.1 设备数据摄入 API
**批量提交端点:**
```
POST /api/v1/health/device-readings/batch
Authorization: Bearer <token>
Content-Type: application/json
Request:
{
"deviceId": "abc123",
"deviceModel": "Xiaomi Band 8",
"readings": [
{
"deviceType": "heart_rate",
"values": { "hr": 72 },
"measuredAt": "2026-05-15T08:30:00Z"
}
// ... 最多 500 条/请求
]
}
Response 200:
{
"success": true,
"data": {
"accepted": 498,
"duplicates": 2,
"earliest": "2026-05-14T22:00:00Z",
"latest": "2026-05-15T08:30:00Z"
}
}
```
**去重机制:**`(patient_id, device_id, device_type, measured_at)` 为唯一约束,冲突时忽略(幂等)。
**限流:** 单用户每分钟最多 10 次批量提交。
### 5.2 快速路径 vs 异步路径
```rust
// erp-health/src/service/device_reading_service.rs
pub async fn batch_create_readings(
db: &DatabaseConnection,
event_bus: &EventBus,
tenant_id: Uuid,
patient_id: Uuid,
readings: Vec<DeviceReadingInput>,
) -> AppResult<BatchResult> {
// ── 快速路径(同步,< 200ms──
// 1. 校验患者存在性
let patient = find_patient(db, tenant_id, patient_id).await?;
// 1.5 校验设备绑定关系(防止数据注入)
verify_device_binding(db, tenant_id, patient_id, &device_id).await?;
// 2. 批量插入ON CONFLICT DO NOTHING 去重)
let inserted = batch_insert_readings(db, tenant_id, patient_id, &readings).await?;
// 3. 同步降采样 upsert每条原始数据更新对应小时聚合
upsert_hourly_aggregates(db, tenant_id, patient_id, &readings).await?;
// 4. 发布 EventBus 事件(异步消费者处理告警和推送)
// 注意publish 签名需要 db 参数用于 outbox 持久化
event_bus.publish(DomainEvent {
event_type: "device.readings.synced".into(),
tenant_id,
payload: json!({ "patient_id": patient_id, "count": inserted }),
}, db).await?;
Ok(BatchResult { accepted: inserted, .. })
}
```
```rust
// 异步消费者erp-health 事件订阅)
// 监听 "device.readings.synced" 事件
async fn on_readings_synced(event: DomainEvent) {
// 1. 加载该患者适用的告警规则
// 2. 逐规则评估(滑动窗口查询)
// 3. 匹配规则 → 生成 Alert → 发布 alert.triggered 事件
// 4. 统计聚合更新(异步,不阻塞)
}
```
### 5.3 降采样 Pipeline
```rust
/// 同步降采样:写入原始数据后立即更新小时聚合
async fn upsert_hourly_aggregates(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
readings: &[DeviceReadingInput],
) -> AppResult<()> {
// 按 (device_type, hour_start) 分组
// 每组计算 min/max/avg/count
// INSERT ... ON CONFLICT (tenant_id, patient_id, device_type, hour_start)
// DO UPDATE SET
// min_val = LEAST(min_val, EXCLUDED.min_val),
// max_val = GREATEST(max_val, EXCLUDED.max_val),
// avg_val = (avg_val * sample_count + EXCLUDED.total) / (sample_count + EXCLUDED.count),
// sample_count = sample_count + EXCLUDED.count
Ok(())
}
```
### 5.4 查询 API
```
# 原始数据查询(最近 N 条)
GET /api/v1/health/device-readings?patient_id=xxx&device_type=heart_rate&hours=24
# 降采样数据查询(趋势图用)
GET /api/v1/health/device-readings/hourly?patient_id=xxx&device_type=heart_rate&days=7
# 告警列表
GET /api/v1/health/alerts?patient_id=xxx&status=pending
# 告警确认
PUT /api/v1/health/alerts/{id}/acknowledge
# 告警规则管理
GET/POST/PUT/DELETE /api/v1/health/alert-rules
```
### 5.5 EventBus 事件扩展
新增事件类型:
| 事件 | 触发时机 | Payload |
|------|---------|---------|
| `device.readings.synced` | 手环数据批量提交成功 | `{ patient_id, count, device_model, date_range }` |
| `alert.triggered` | 告警规则匹配 | `{ alert_id, patient_id, rule_name, severity, detail }` |
| `alert.acknowledged` | 医生确认告警 | `{ alert_id, acknowledged_by }` |
订阅关系:
| 订阅者 | 订阅事件 | 动作 |
|--------|---------|------|
| 告警引擎 | `device.readings.synced` | 规则评估 |
| SSE Handler | `alert.triggered` | 推送告警通知到医生 |
| SSE Handler | `device.readings.synced` | 推送体征更新到看板 |
| 消息模块 | `alert.triggered` | 生成站内告警消息 |
---
## 6. 告警引擎
### 6.1 规则模型
```rust
// erp-health/src/service/alert_engine.rs
#[derive(Debug, Clone, Deserialize)]
pub enum AlertCondition {
/// 单次阈值判断
SingleThreshold {
direction: Direction, // Above / Below
value: f64,
},
/// 连续 N 次超标
Consecutive {
count: usize,
direction: Direction,
value: f64,
window_hours: Option<i32>, // 可选时间窗口限制
},
/// 趋势变化量
Trend {
window_hours: i32, // 观察窗口
delta: f64, // 变化量
direction: Direction, // Up / Down
},
}
#[derive(Debug, Clone, Deserialize)]
pub enum Direction { Above, Below }
```
### 6.2 规则评估流程
```rust
pub async fn evaluate_rules(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
event_bus: &EventBus,
) -> AppResult<Vec<Alert>> {
// 1. 加载该租户激活的规则(按 device_type 过滤)
let rules = load_active_rules(db, tenant_id, device_type).await?;
let mut alerts = Vec::new();
for rule in rules {
// 检查冷却期(同一规则对同一患者)
if is_in_cooldown(db, rule.id, patient_id, rule.cooldown_minutes).await? {
continue;
}
// 2. 按条件类型评估
let triggered = match &rule.condition {
AlertCondition::SingleThreshold { direction, value } => {
// 查最近一条数据判断
evaluate_single(db, tenant_id, patient_id, &rule.device_type,
*direction, *value).await?
}
AlertCondition::Consecutive { count, direction, value, window_hours } => {
// 滑动窗口查询最近 N 条数据
evaluate_consecutive(db, tenant_id, patient_id, &rule.device_type,
*count, *direction, *value, *window_hours).await?
}
AlertCondition::Trend { window_hours, delta, direction } => {
// 查询窗口内数据做斜率计算
evaluate_trend(db, tenant_id, patient_id, &rule.device_type,
*window_hours, *delta, *direction).await?
}
};
if triggered {
let alert = create_alert(db, tenant_id, patient_id, &rule).await?;
alerts.push(alert);
// 发布事件(通知管道 + SSE 推送)
event_bus.publish(alert_triggered_event(&alert)).await?;
}
}
Ok(alerts)
}
```
### 6.3 滑动窗口查询实现
```sql
-- Consecutive: 连续 N 次超标
-- 查询最近 N 条记录,检查是否全部超标
SELECT avg_val FROM vital_signs_hourly
WHERE tenant_id = $1
AND patient_id = $2
AND device_type = $3
AND hour_start > NOW() - ($4 * interval '1 hour')
ORDER BY hour_start DESC
LIMIT $5;
-- Rust 侧检查:是否连续 N 条都满足条件
-- (all recent readings exceed threshold, no "break" in the sequence)
-- Trend: 窗口内趋势
-- 线性回归斜率近似:用首尾差值 / 时间跨度
WITH window_data AS (
SELECT avg_val, hour_start,
ROW_NUMBER() OVER (ORDER BY hour_start) as rn,
COUNT(*) OVER () as total
FROM vital_signs_hourly
WHERE tenant_id = $1 AND patient_id = $2 AND device_type = $3
AND hour_start > NOW() - ($4 * interval '1 hour')
ORDER BY hour_start
)
SELECT
(MAX(avg_val) - MIN(avg_val)) as delta,
MAX(hour_start) - MIN(hour_start) as timespan
FROM window_data;
```
### 6.4 告警通知管道
```
Alert 触发
├── EventBus → SSE Handler → 医生 Web 端实时弹窗
├── EventBus → 消息模块 → 站内告警消息
└── (未来) 微信模板消息 → 医生小程序推送
```
### 6.5 规则管理 API
```
# 创建规则
POST /api/v1/health/alert-rules
{
"name": "心率持续过高",
"deviceType": "heart_rate",
"conditionType": "consecutive",
"conditionParams": {
"count": 3,
"direction": "above",
"value": 100,
"windowHours": 1
},
"severity": "warning",
"notifyRoles": ["attending_doctor"],
"cooldownMinutes": 60
}
# 查询规则
GET /api/v1/health/alert-rules?is_active=true
# 更新规则
PUT /api/v1/health/alert-rules/{id}
# 禁用规则
PUT /api/v1/health/alert-rules/{id}/deactivate
```
---
## 7. SSE 推送扩展
### 7.1 现有 SSE 架构扩展
当前 SSE Handler (`sse_handler.rs`) 仅订阅 `message.sent` 事件。扩展方案:新增订阅源,复用现有 SSE 连接。
```rust
// 改造 sse_handler.rs 的事件订阅
async fn sse_stream(
rx: BroadcastReceiver<DomainEvent>,
tenant_id: Uuid,
user_id: Uuid,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
// 现有: subscribe_filtered("message.sent")
// 扩展: 改为全量 subscribe() + 自行过滤
// 原因: 新增事件前缀不同 (device. / alert. / message.),无法用单一前缀覆盖
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
// 消息通知(已有)
if event.event_type == "message.sent"
&& matches_tenant_user(&event, tenant_id, user_id) {
yield Ok(Event::default()
.event("message")
.data(event.payload.to_string()));
}
// 告警通知(新增)— 推送给患者的主治医生
if event.event_type == "alert.triggered"
&& matches_tenant_doctor(&event, tenant_id, user_id) {
yield Ok(Event::default()
.event("alert")
.data(event.payload.to_string()));
}
// 体征更新(新增)— 推送给正在查看该患者的医生
if event.event_type == "device.readings.synced"
&& matches_tenant_doctor(&event, tenant_id, user_id) {
yield Ok(Event::default()
.event("vital_update")
.data(event.payload.to_string()));
}
}
Err(Lagged(n)) => tracing::warn!("SSE lagged {n} events"),
Err(_) => break,
}
}
};
Sse::new(stream).keep_alive(KeepAlive::default())
}
```
### 7.2 新增事件类型与 SSE event 映射
| EventBus 事件 | SSE event | 推送对象 | Payload |
|---------------|-----------|---------|---------|
| `message.sent` | `message` | 消息接收者(已有) | 消息内容 |
| `alert.triggered` | `alert` | 患者的主治医生 | 告警详情 |
| `device.readings.synced` | `vital_update` | 患者的主治医生 | 同步摘要 |
### 7.3 医生端实时看板数据推送
前端监听 `vital_update` 事件,更新实时看板:
```typescript
// apps/web 前端 SSE 监听扩展
eventSource.addEventListener('vital_update', (e) => {
const data = JSON.parse(e.data);
// 更新患者详情页的体征图表(如果正在查看该患者)
updateVitalSignsChart(data.patient_id, data.summary);
});
eventSource.addEventListener('alert', (e) => {
const data = JSON.parse(e.data);
// 显示告警弹窗 + 声音提示
showAlertNotification(data);
});
```
---
## 8. 多租户与安全
### 8.1 租户隔离策略
| 数据层 | 隔离方式 |
|--------|---------|
| `device_readings` | `tenant_id` 列过滤(中间件注入),分区键不含 tenant_id |
| `vital_signs_hourly` | `tenant_id` 列过滤 + UNIQUE 约束含 tenant_id |
| `alert_rules` | `tenant_id` 列过滤,每租户独立配置规则 |
| `alerts` | `tenant_id` 列过滤 |
API 端点全部走现有 JWT 中间件,`tenant_id` 从 token 提取,不需要额外开发。
### 8.2 设备认证与数据完整性
**设备绑定:** 每个设备通过 `device_id`MAC 哈希)绑定到患者。首次同步时创建绑定关系,后续只接受已绑定设备的数据。
```sql
CREATE TABLE patient_devices (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
patient_id UUID NOT NULL,
device_id VARCHAR(64) NOT NULL, -- 设备唯一标识
device_model VARCHAR(64),
device_type VARCHAR(32),
bound_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
created_by UUID,
updated_by UUID,
deleted_at TIMESTAMPTZ, -- 软删除(解绑时设置)
version INT NOT NULL DEFAULT 1,
UNIQUE (tenant_id, patient_id, device_id)
);
```
**数据完整性:**
- API 通过 JWT 认证,确保只有授权用户(患者本人或医护)能提交数据
- 批量提交有大小限制500 条/请求和频率限制10 次/分钟/用户)
- `measured_at` 不接受未来时间(服务端校验)
### 8.3 PII 加密扩展
手环数据本身不包含 PII心率、步数等不是个人身份信息`patient_id` 关联关系需要保护。
- `device_readings` 表的 `patient_id` 字段不加密(性能考虑,高频查询)
- `alerts` 表的告警详情可能包含患者姓名,走现有 PII 加密流程
- 查询 API 返回时,按现有 PII 解密规则处理
---
## 9. 实施路线图
### Phase 1数据管道 + 手环适配(基础能力)
**目标:** 跑通"手环 → 小程序 → API → 数据库"的完整链路
| 任务 | 涉及模块 | 预估复杂度 |
|------|---------|-----------|
| 数据库迁移:`device_readings` 分区表 + `vital_signs_hourly` + `patient_devices` | migration | 中 |
| 后端摄入 API`POST /device-readings/batch` + 降采样 upsert | erp-health | 高 |
| EventBus 事件扩展:`device.readings.synced` | erp-core + erp-health | 低 |
| 小程序 BLE 模块DeviceAdapter 接口 + 小米手环 Adapter | miniprogram | 高 |
| 小程序同步页面:设备扫描 + 数据同步 UI | miniprogram | 中 |
| 查询 API`GET /device-readings` + `/hourly` | erp-health | 中 |
**验收标准:**
- [ ] 小程序能连接小米手环并读取心率数据
- [ ] 数据通过 API 批量提交到 HMS 后端
- [ ] `device_readings``vital_signs_hourly` 表正确写入
- [ ] 手动执行降采样查询验证数据正确
### Phase 2告警引擎 + SSE 推送(智能预警)
**目标:** 告警规则可配置、评估准确、推送实时
| 任务 | 涉及模块 | 预估复杂度 |
|------|---------|-----------|
| 数据库迁移:`alert_rules` + `alerts` 表 | migration | 低 |
| 告警引擎核心:规则加载 + 评估 + 生成告警 | erp-health | 高 |
| SSE 扩展:新增 `alert` / `vital_update` 事件推送 | erp-server | 中 |
| 规则管理 APICRUD + 启停 | erp-health | 中 |
| 告警确认/处置 API | erp-health | 低 |
| 医生 Web 端告警通知 UI | web frontend | 中 |
| 医生 Web 端体征实时看板 | web frontend | 高 |
**验收标准:**
- [ ] 可通过 API 配置告警规则
- [ ] 手环数据提交后,匹配规则自动触发告警
- [ ] 告警通过 SSE 实时推送到医生端
- [ ] 医生可确认/处置告警
- [ ] 冷却期机制正常工作
### Phase 3高级分析 + 更多设备(扩展能力)
**目标:** 支持更多设备类型,提供更丰富的分析能力
| 任务 | 说明 |
|------|------|
| 更多手环适配 | 华为手环、OPPO 手环等 Adapter |
| 血压计/血糖仪 BLE 适配 | 扩展 DeviceType新增 Adapter |
| 数据看板增强 | 周/月趋势对比、患者分组统计 |
| 降采样后台修正任务 | 修正乱序数据导致的聚合偏差 |
| 数据清理任务 | 自动 DROP 超期分区 |
| (远期) 微信模板消息推送 | 告警推送到医生小程序 |