Files
hms/docs/superpowers/specs/2026-04-26-realtime-vital-signs-pipeline-design.md
iven 29b19a90f6 docs: 实时体征采集与智能告警系统设计规格
健康手环 BLE 采集 → REST API 批量提交 → 降采样 Pipeline →
规则引擎告警 → SSE 实时推送的完整闭环设计。
覆盖数据模型(分区表+降采样+告警规则)、
小程序 BLE 适配器抽象、三层告警引擎、
SSE 推送扩展,分三阶段实施。
2026-04-26 22:14:34 +08:00

32 KiB
Raw Blame History

HMS 实时体征采集与智能告警系统 — 设计规格

日期: 2026-04-26 | 状态: Draft | 作者: Claude + 用户协作


1. 背景与目标

1.1 业务背景

HMS 平台当前已实现体征数据的手动录入和危急值阈值告警。但慢病患者(高血压、糖尿病、肾病)的健康数据采集严重依赖患者主动录入,依从性差、数据稀疏、无法捕捉短时波动。

健康手环(如小米手环、华为手环)的普及率为 24 小时被动采集 提供了硬件基础。心率、血氧、步数、睡眠等数据可连续产生,通过小程序 BLE 同步到 HMS实现

  • 从"主动录入"到"被动采集" — 患者无感,数据密度提升 100 倍
  • 从"单次阈值"到"趋势预警" — 连续超标、趋势恶化等规则引擎
  • 从"事后查看"到"实时看板" — 医生通过 SSE 实时感知患者状态变化

1.2 目标用户与场景

用户 场景 价值
慢病患者 佩戴手环,打开小程序自动同步数据 免去手动录入,持续监测
主管医生 Web 端实时看板查看患者体征趋势 及时发现异常,干预更早
护士站 接收高危告警推送 快速响应危急情况
体检中心 出具体检报告时参考连续监测数据 报告更全面

1.3 成功指标

指标 基线 目标
体征数据日均采集量 ~10 条/患者(手动) > 1000 条/患者(手环)
危急值告警延迟 分钟级(轮询) < 5 秒SSE 推送)
趋势预警准确率 N/A无此能力 > 85%(首版)
患者数据同步频率 不定(手动) 每天至少 1 次(自动)

1.4 范围边界

做:

  • 小程序 BLE 采集模块DeviceAdapter 抽象 + 手环适配)
  • 后端设备数据摄入 API批量提交 + 降采样)
  • 告警规则引擎(单次阈值 + 连续超标 + 趋势恶化)
  • SSE 推送扩展(体征更新 + 告警通知)
  • 数据分区与降采样策略

不做(本设计不覆盖):

  • ICU 级实时监控(亚秒级,需要专用系统)
  • 医疗设备网关HL7/串口,场景不同)
  • AI 驱动的异常检测erp-ai 模块职责)
  • 非手环设备适配(血压计、血糖仪 — 后续扩展)
  • 多实例部署的事件总线演进(单实例足够)

2. 系统架构

2.1 端到端数据流

健康手环 ─BLE批量同步→ 微信小程序 ─REST API→ HMS后端
                                                  │
                                    ┌─────────────┼──────────────┐
                                    │             │              │
                              快速路径(同步)  异步路径         SSE推送
                              校验+存储+事件  告警引擎         体征更新
                              (< 200ms)       统计聚合         告警通知
                                              趋势分析
                                    │             │              │
                                    ▼             ▼              ▼
                              PostgreSQL     EventBus        医生Web端
                              (分区+降采样)  → 消息模块       实时看板

2.2 模块划分与职责

模块 位置 职责
BLE 采集层 小程序 services/ble/ 设备发现、连接、数据读取、格式化
DeviceAdapter 小程序 services/ble/adapters/ 统一接口,屏蔽设备差异
摄入 API erp-health handler 批量接收、校验、存储、降采样触发
降采样 Pipeline erp-health service 原始数据聚合为小时/日级汇总
告警引擎 erp-health service 规则加载、评估、告警生成
SSE 扩展 erp-message handler 新增体征/告警事件推送
EventBus 事件 erp-core 新增 device.readings.syncedalert.triggered 事件类型

2.3 与现有 HMS 架构的集成点

集成点 现有 扩展
EventBus tokio::broadcast 进程内 新增事件类型,不改架构
SSE Handler 仅推送 message.sent 扩展支持 device.readings.syncedalert.triggered
vital_signs 表 手动录入数据 新增 device_readings 表存储高频数据,vital_signs 保持不变
通知分发 erp-message 内联 复用现有通知管道,新增告警通知类型
小程序 Taro 4.2 + React 18 新增 BLE 采集服务层

2.4 技术选型与权衡

决策 选择 理由
设备通信 微信 BLE API 小程序原生支持,无需额外 SDK
数据传输 REST API 批量提交 简单可靠,实时性依赖患者操作频率(每天 1-2 次)
降采样存储 PostgreSQL 窗口函数 无需引入 TimescaleDB/InfluxDB现有栈即可
告警引擎 规则 + 滑动窗口 SQL 覆盖 80% 临床场景,不需要流式计算框架
数据分区 PostgreSQL RANGE 分区 按月分区 device_readings90 天保留原始数据
SSE 推送 扩展现有 SSE handler 不引入 WebSocket单向推送够用

3. 数据模型

3.1 原始设备数据表 (device_readings)

存储从手环同步的原始高频数据,按 measured_at 做月分区。

CREATE TABLE device_readings (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL,
    patient_id      UUID NOT NULL,
    device_id       VARCHAR(64),                -- 设备唯一标识MAC 地址哈希)
    device_type     VARCHAR(32) NOT NULL,        -- 'heart_rate', 'blood_oxygen', 'steps', 'sleep', 'temperature', 'stress'
    device_model    VARCHAR(64),                 -- 'Xiaomi Band 8', 'Huawei Band 9'
    raw_value       JSONB NOT NULL,              -- 原始值 {"hr": 72, "confidence": 0.95}
    measured_at     TIMESTAMPTZ NOT NULL,         -- 设备上的测量时间
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    deleted_at      TIMESTAMPTZ                  -- 软删除
) PARTITION BY RANGE (measured_at);

-- 按月分区模板(自动创建未来 3 个月)
CREATE TABLE device_readings_2026_05 PARTITION OF device_readings
    FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
-- ... 每月一个分区

-- 核心索引
CREATE INDEX idx_dr_tenant_patient ON device_readings (tenant_id, patient_id, measured_at DESC);
CREATE INDEX idx_dr_device_type ON device_readings (tenant_id, device_type, measured_at DESC);

-- 注意:不包含 updated_by / version 字段
-- 原始设备数据是不可变的(写入后不修改),不需要乐观锁

JSONB raw_value 结构(按 device_type

// heart_rate
{ "hr": 72, "confidence": 0.95 }

// blood_oxygen
{ "spo2": 98 }

// steps
{ "steps": 8500, "distance_m": 6200, "calories": 320 }

// sleep
{ "stage": "deep", "duration_min": 45 }

// temperature
{ "temp_celsius": 36.5 }

3.2 降采样表 (vital_signs_hourly)

存储小时级聚合数据,长期保留,用于趋势查询和告警引擎。

CREATE TABLE vital_signs_hourly (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL,
    patient_id      UUID NOT NULL,
    device_type     VARCHAR(32) NOT NULL,
    hour_start      TIMESTAMPTZ NOT NULL,         -- 小时起始时间
    min_val         FLOAT,
    max_val         FLOAT,
    avg_val         FLOAT NOT NULL,
    sample_count    INT NOT NULL DEFAULT 1,
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    updated_at      TIMESTAMPTZ DEFAULT NOW(),
    version         INT NOT NULL DEFAULT 1,

    UNIQUE (tenant_id, patient_id, device_type, hour_start)
);

-- 告警引擎和趋势查询的主要读取路径
CREATE INDEX idx_vsh_tenant_patient ON vital_signs_hourly
    (tenant_id, patient_id, device_type, hour_start DESC);

3.3 告警规则表 (alert_rules)

存储可配置的告警规则,按租户隔离。

CREATE TABLE alert_rules (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL,
    name            VARCHAR(128) NOT NULL,         -- "心率持续过高"
    description     TEXT,
    device_type     VARCHAR(32) NOT NULL,           -- 触发的指标类型
    condition_type  VARCHAR(32) NOT NULL,           -- 'single_threshold' / 'consecutive' / 'trend'
    -- 阈值条件 (JSONB 灵活存储不同类型规则的参数)
    condition_params JSONB NOT NULL,
    -- single_threshold: { "direction": "above", "value": 100 }
    -- consecutive: { "count": 3, "direction": "above", "value": 140 }
    -- trend: { "window_hours": 168, "delta": 20, "direction": "up" }
    severity        VARCHAR(16) NOT NULL DEFAULT 'warning',  -- 'info' / 'warning' / 'critical' / 'urgent'
    is_active       BOOLEAN NOT NULL DEFAULT true,
    -- 适用范围(可选过滤)
    apply_tags      JSONB,                          -- ["hypertension", "diabetes"] 患者标签
    notify_roles    JSONB NOT NULL DEFAULT '["attending_doctor"]',
    cooldown_minutes INT NOT NULL DEFAULT 60,       -- 同一规则对同一患者的告警冷却时间
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    updated_at      TIMESTAMPTZ DEFAULT NOW(),
    created_by      UUID,
    updated_by      UUID,
    deleted_at      TIMESTAMPTZ,
    version         INT NOT NULL DEFAULT 1
);

3.4 告警记录表 (alerts)

CREATE TABLE alerts (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL,
    patient_id      UUID NOT NULL,
    rule_id         UUID NOT NULL REFERENCES alert_rules(id) ON DELETE RESTRICT,  -- 禁止硬删除规则
    severity        VARCHAR(16) NOT NULL,
    title           VARCHAR(256) NOT NULL,          -- "心率持续过高 - 患者张三"
    detail          JSONB NOT NULL,                 -- 告警上下文
    -- { "metric": "heart_rate", "values": [105, 108, 112], "threshold": 100, "window": "1h" }
    status          VARCHAR(16) NOT NULL DEFAULT 'pending',  -- 'pending' / 'acknowledged' / 'resolved' / 'dismissed'
    acknowledged_by UUID,
    acknowledged_at TIMESTAMPTZ,
    resolved_at     TIMESTAMPTZ,
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    updated_at      TIMESTAMPTZ DEFAULT NOW(),
    deleted_at      TIMESTAMPTZ,
    version         INT NOT NULL DEFAULT 1
);

CREATE INDEX idx_alerts_tenant_patient ON alerts (tenant_id, patient_id, created_at DESC);
CREATE INDEX idx_alerts_status ON alerts (tenant_id, status, created_at DESC);

3.5 数据保留与清理策略

数据层级 保留期 清理方式
device_readings(原始) 90 天 超期分区 DROP月级
vital_signs_hourly(小时) 2 年 后台任务清理
vital_signs(日级,已有) 永久 不清理
alerts(告警) 1 年 软删除 + 归档

降采样 Pipeline 触发时机:

  1. 写入时同步降采样 — 原始数据写入后,立即 upsert 对应小时聚合记录
  2. 后台修正任务 — 每 6 小时运行一次,修正因乱序数据导致的聚合偏差

3.6 与现有 vital_signs 表的关系

并存不合并。 两张表服务于不同场景:

特征 vital_signs现有 device_readings新增
数据来源 手动录入 / 医护录入 设备自动采集
数据频率 每天 1-3 条 每天数百到数千条
指标类型 血压、血糖、体重、尿量等 心率、血氧、步数、睡眠等
用途 门诊/查房记录 连续监测与趋势分析

查询 API 提供统一视图,内部按 source 字段路由到对应表。


4. 小程序 BLE 采集模块

4.1 DeviceAdapter 统一接口

// services/ble/types.ts

/** 统一的标准化读数 */
interface NormalizedReading {
  patientId: string;
  deviceType: DeviceType;
  deviceModel: string;                // 'Xiaomi Band 8'
  deviceId: string;                   // 设备唯一标识
  values: Record<string, number>;     // { hr: 72, confidence: 0.95 }
  measuredAt: string;                 // ISO 8601
  source: 'ble_device';
}

type DeviceType =
  | 'heart_rate' | 'blood_oxygen' | 'steps'
  | 'sleep' | 'temperature' | 'stress';

/** 设备适配器统一接口 */
interface DeviceAdapter {
  /** 设备类型标识 */
  readonly deviceType: DeviceType;
  /** 支持的设备型号(用于自动识别) */
  readonly supportedModels: string[];
  /** 服务 UUID用于 BLE 扫描过滤) */
  readonly serviceUUIDs: string[];

  /** 发现设备 */
  discover(timeoutMs?: number): Promise<BLEDevice[]>;
  /** 连接设备 */
  connect(deviceId: string): Promise<BLEConnection>;
  /** 批量读取历史数据 */
  readHistory(conn: BLEConnection, since: Date): Promise<NormalizedReading[]>;
  /** 断开连接 */
  disconnect(conn: BLEConnection): Promise<void>;
}

interface BLEDevice {
  deviceId: string;
  name: string;
  rssi: number;
  model?: string;                     // 从广播数据解析
}

interface BLEConnection {
  deviceId: string;
  connected: boolean;
}

4.2 健康手环 Adapter 实现

以小米手环为例(首版支持):

// services/ble/adapters/XiaomiBandAdapter.ts

class XiaomiBandAdapter implements DeviceAdapter {
  readonly deviceType = 'heart_rate';  // 首版仅支持心率
  readonly supportedModels = ['Mi Band 8', 'Mi Band 7', 'Xiaomi Smart Band 8'];
  readonly serviceUUIDs = [
    '0000180d-0000-1000-8000-00805f9b34fb',  // Heart Rate Service (标准)
    // 小米私有服务 UUID
  ];

  async discover(timeoutMs = 10000): Promise<BLEDevice[]> {
    // 使用 Taro.openBluetoothAdapter + startBluetoothDevicesDiscovery
    // 过滤 supportedModels 中的设备名
  }

  async readHistory(conn: BLEConnection, since: Date): Promise<NormalizedReading[]> {
    // 小米手环私有协议:读取活动数据特征值
    // 解析二进制协议 → NormalizedReading[]
    // 按时间过滤 since 之后的数据
  }
}

4.3 BLE 连接管理与错误恢复

// services/ble/BLEManager.ts

class BLEManager {
  private adapters: Map<string, DeviceAdapter> = new Map();
  private connection: BLEConnection | null = null;

  /** 注册适配器 */
  registerAdapter(adapter: DeviceAdapter): void;

  /** 自动扫描并识别设备 */
  async scanAndIdentify(): Promise<{ adapter: DeviceAdapter; device: BLEDevice } | null>;

  /** 同步数据(主入口) */
  async syncData(patientId: string, since: Date): Promise<SyncResult> {
    try {
      // 1. 扫描 → 识别设备 → 选择 Adapter
      // 2. 连接 → 读取历史数据 → 格式化
      // 3. 批量提交到 HMS API
      // 4. 断开连接
      // 5. 返回同步结果
    } catch (error) {
      // 错误恢复:连接失败 → 重试 1 次 → 提示用户
      // 数据提交失败 → 本地缓存 → 下次同步时补传
    }
  }
}

interface SyncResult {
  success: boolean;
  readingsCount: number;
  dateRange: { from: string; to: string };
  errors?: string[];
}

4.4 数据同步流程

用户打开小程序 → 检查蓝牙权限
       │
       ▼
自动扫描附近设备 → 匹配已注册 Adapter
       │                      │
       Yes                    No → 提示"未发现设备"
       ▼
BLE 连接 → 读取上次同步时间
       │
       ▼
批量读取 since 上次同步的数据
       │
       ▼
POST /api/v1/health/device-readings/batch
       │              │
       成功           失败 → 缓存到本地存储
       │                        │
       ▼                        ▼
更新同步时间戳              下次同步时补传
显示同步结果

4.5 数据格式规范化

Adapter 负责将设备原始数据翻译为 NormalizedReading

小米手环原始数据 (二进制):
  [0x01, 0x48, 0x00, ...]  → 心率 72 bpm

Adapter 翻译:
  NormalizedReading {
    deviceType: 'heart_rate',
    values: { hr: 72 },
    measuredAt: '2026-05-15T08:30:00Z',
    source: 'ble_device'
  }

4.6 离线缓存策略

使用小程序本地存储Taro.setStorageSync缓存未提交的数据

interface PendingSync {
  readings: NormalizedReading[];
  createdAt: string;
  retryCount: number;
}

// 缓存 key: `pending_sync_${patientId}`
// 最大缓存: 5000 条(超出时丢弃最旧的)
// 提交成功后清除缓存
// 下次 syncData 时检查缓存,优先补传

5. 后端 API 设计

5.1 设备数据摄入 API

批量提交端点:

POST /api/v1/health/device-readings/batch
Authorization: Bearer <token>
Content-Type: application/json

Request:
{
  "deviceId": "abc123",
  "deviceModel": "Xiaomi Band 8",
  "readings": [
    {
      "deviceType": "heart_rate",
      "values": { "hr": 72 },
      "measuredAt": "2026-05-15T08:30:00Z"
    }
    // ... 最多 500 条/请求
  ]
}

Response 200:
{
  "success": true,
  "data": {
    "accepted": 498,
    "duplicates": 2,
    "earliest": "2026-05-14T22:00:00Z",
    "latest": "2026-05-15T08:30:00Z"
  }
}

去重机制:(patient_id, device_id, device_type, measured_at) 为唯一约束,冲突时忽略(幂等)。

限流: 单用户每分钟最多 10 次批量提交。

5.2 快速路径 vs 异步路径

// erp-health/src/service/device_reading_service.rs

pub async fn batch_create_readings(
    db: &DatabaseConnection,
    event_bus: &EventBus,
    tenant_id: Uuid,
    patient_id: Uuid,
    readings: Vec<DeviceReadingInput>,
) -> AppResult<BatchResult> {
    // ── 快速路径(同步,< 200ms──

    // 1. 校验患者存在性
    let patient = find_patient(db, tenant_id, patient_id).await?;

    // 1.5 校验设备绑定关系(防止数据注入)
    verify_device_binding(db, tenant_id, patient_id, &device_id).await?;

    // 2. 批量插入ON CONFLICT DO NOTHING 去重)
    let inserted = batch_insert_readings(db, tenant_id, patient_id, &readings).await?;

    // 3. 同步降采样 upsert每条原始数据更新对应小时聚合
    upsert_hourly_aggregates(db, tenant_id, patient_id, &readings).await?;

    // 4. 发布 EventBus 事件(异步消费者处理告警和推送)
    // 注意publish 签名需要 db 参数用于 outbox 持久化
    event_bus.publish(DomainEvent {
        event_type: "device.readings.synced".into(),
        tenant_id,
        payload: json!({ "patient_id": patient_id, "count": inserted }),
    }, db).await?;

    Ok(BatchResult { accepted: inserted, .. })
}
// 异步消费者erp-health 事件订阅)
// 监听 "device.readings.synced" 事件

async fn on_readings_synced(event: DomainEvent) {
    // 1. 加载该患者适用的告警规则
    // 2. 逐规则评估(滑动窗口查询)
    // 3. 匹配规则 → 生成 Alert → 发布 alert.triggered 事件
    // 4. 统计聚合更新(异步,不阻塞)
}

5.3 降采样 Pipeline

/// 同步降采样:写入原始数据后立即更新小时聚合
async fn upsert_hourly_aggregates(
    db: &DatabaseConnection,
    tenant_id: Uuid,
    patient_id: Uuid,
    readings: &[DeviceReadingInput],
) -> AppResult<()> {
    // 按 (device_type, hour_start) 分组
    // 每组计算 min/max/avg/count

    // INSERT ... ON CONFLICT (tenant_id, patient_id, device_type, hour_start)
    // DO UPDATE SET
    //   min_val = LEAST(min_val, EXCLUDED.min_val),
    //   max_val = GREATEST(max_val, EXCLUDED.max_val),
    //   avg_val = (avg_val * sample_count + EXCLUDED.total) / (sample_count + EXCLUDED.count),
    //   sample_count = sample_count + EXCLUDED.count

    Ok(())
}

5.4 查询 API

# 原始数据查询(最近 N 条)
GET /api/v1/health/device-readings?patient_id=xxx&device_type=heart_rate&hours=24

# 降采样数据查询(趋势图用)
GET /api/v1/health/device-readings/hourly?patient_id=xxx&device_type=heart_rate&days=7

# 告警列表
GET /api/v1/health/alerts?patient_id=xxx&status=pending

# 告警确认
PUT /api/v1/health/alerts/{id}/acknowledge

# 告警规则管理
GET/POST/PUT/DELETE /api/v1/health/alert-rules

5.5 EventBus 事件扩展

新增事件类型:

事件 触发时机 Payload
device.readings.synced 手环数据批量提交成功 { patient_id, count, device_model, date_range }
alert.triggered 告警规则匹配 { alert_id, patient_id, rule_name, severity, detail }
alert.acknowledged 医生确认告警 { alert_id, acknowledged_by }

订阅关系:

订阅者 订阅事件 动作
告警引擎 device.readings.synced 规则评估
SSE Handler alert.triggered 推送告警通知到医生
SSE Handler device.readings.synced 推送体征更新到看板
消息模块 alert.triggered 生成站内告警消息

6. 告警引擎

6.1 规则模型

// erp-health/src/service/alert_engine.rs

#[derive(Debug, Clone, Deserialize)]
pub enum AlertCondition {
    /// 单次阈值判断
    SingleThreshold {
        direction: Direction,  // Above / Below
        value: f64,
    },
    /// 连续 N 次超标
    Consecutive {
        count: usize,
        direction: Direction,
        value: f64,
        window_hours: Option<i32>,  // 可选时间窗口限制
    },
    /// 趋势变化量
    Trend {
        window_hours: i32,    // 观察窗口
        delta: f64,           // 变化量
        direction: Direction, // Up / Down
    },
}

#[derive(Debug, Clone, Deserialize)]
pub enum Direction { Above, Below }

6.2 规则评估流程

pub async fn evaluate_rules(
    db: &DatabaseConnection,
    tenant_id: Uuid,
    patient_id: Uuid,
    device_type: &str,
    event_bus: &EventBus,
) -> AppResult<Vec<Alert>> {
    // 1. 加载该租户激活的规则(按 device_type 过滤)
    let rules = load_active_rules(db, tenant_id, device_type).await?;

    let mut alerts = Vec::new();

    for rule in rules {
        // 检查冷却期(同一规则对同一患者)
        if is_in_cooldown(db, rule.id, patient_id, rule.cooldown_minutes).await? {
            continue;
        }

        // 2. 按条件类型评估
        let triggered = match &rule.condition {
            AlertCondition::SingleThreshold { direction, value } => {
                // 查最近一条数据判断
                evaluate_single(db, tenant_id, patient_id, &rule.device_type,
                    *direction, *value).await?
            }
            AlertCondition::Consecutive { count, direction, value, window_hours } => {
                // 滑动窗口查询最近 N 条数据
                evaluate_consecutive(db, tenant_id, patient_id, &rule.device_type,
                    *count, *direction, *value, *window_hours).await?
            }
            AlertCondition::Trend { window_hours, delta, direction } => {
                // 查询窗口内数据做斜率计算
                evaluate_trend(db, tenant_id, patient_id, &rule.device_type,
                    *window_hours, *delta, *direction).await?
            }
        };

        if triggered {
            let alert = create_alert(db, tenant_id, patient_id, &rule).await?;
            alerts.push(alert);

            // 发布事件(通知管道 + SSE 推送)
            event_bus.publish(alert_triggered_event(&alert)).await?;
        }
    }

    Ok(alerts)
}

6.3 滑动窗口查询实现

-- Consecutive: 连续 N 次超标
-- 查询最近 N 条记录,检查是否全部超标
SELECT avg_val FROM vital_signs_hourly
WHERE tenant_id = $1
  AND patient_id = $2
  AND device_type = $3
  AND hour_start > NOW() - ($4 * interval '1 hour')
ORDER BY hour_start DESC
LIMIT $5;

-- Rust 侧检查:是否连续 N 条都满足条件
-- (all recent readings exceed threshold, no "break" in the sequence)

-- Trend: 窗口内趋势
-- 线性回归斜率近似:用首尾差值 / 时间跨度
WITH window_data AS (
    SELECT avg_val, hour_start,
           ROW_NUMBER() OVER (ORDER BY hour_start) as rn,
           COUNT(*) OVER () as total
    FROM vital_signs_hourly
    WHERE tenant_id = $1 AND patient_id = $2 AND device_type = $3
      AND hour_start > NOW() - ($4 * interval '1 hour')
    ORDER BY hour_start
)
SELECT
    (MAX(avg_val) - MIN(avg_val)) as delta,
    MAX(hour_start) - MIN(hour_start) as timespan
FROM window_data;

6.4 告警通知管道

Alert 触发
    │
    ├── EventBus → SSE Handler → 医生 Web 端实时弹窗
    │
    ├── EventBus → 消息模块 → 站内告警消息
    │
    └── (未来) 微信模板消息 → 医生小程序推送

6.5 规则管理 API

# 创建规则
POST /api/v1/health/alert-rules
{
  "name": "心率持续过高",
  "deviceType": "heart_rate",
  "conditionType": "consecutive",
  "conditionParams": {
    "count": 3,
    "direction": "above",
    "value": 100,
    "windowHours": 1
  },
  "severity": "warning",
  "notifyRoles": ["attending_doctor"],
  "cooldownMinutes": 60
}

# 查询规则
GET /api/v1/health/alert-rules?is_active=true

# 更新规则
PUT /api/v1/health/alert-rules/{id}

# 禁用规则
PUT /api/v1/health/alert-rules/{id}/deactivate

7. SSE 推送扩展

7.1 现有 SSE 架构扩展

当前 SSE Handler (sse_handler.rs) 仅订阅 message.sent 事件。扩展方案:新增订阅源,复用现有 SSE 连接。

// 改造 sse_handler.rs 的事件订阅

async fn sse_stream(
    rx: BroadcastReceiver<DomainEvent>,
    tenant_id: Uuid,
    user_id: Uuid,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    // 现有: subscribe_filtered("message.sent")
    // 扩展: 改为全量 subscribe() + 自行过滤
    // 原因: 新增事件前缀不同 (device. / alert. / message.),无法用单一前缀覆盖

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    // 消息通知(已有)
                    if event.event_type == "message.sent"
                        && matches_tenant_user(&event, tenant_id, user_id) {
                        yield Ok(Event::default()
                            .event("message")
                            .data(event.payload.to_string()));
                    }

                    // 告警通知(新增)— 推送给患者的主治医生
                    if event.event_type == "alert.triggered"
                        && matches_tenant_doctor(&event, tenant_id, user_id) {
                        yield Ok(Event::default()
                            .event("alert")
                            .data(event.payload.to_string()));
                    }

                    // 体征更新(新增)— 推送给正在查看该患者的医生
                    if event.event_type == "device.readings.synced"
                        && matches_tenant_doctor(&event, tenant_id, user_id) {
                        yield Ok(Event::default()
                            .event("vital_update")
                            .data(event.payload.to_string()));
                    }
                }
                Err(Lagged(n)) => tracing::warn!("SSE lagged {n} events"),
                Err(_) => break,
            }
        }
    };

    Sse::new(stream).keep_alive(KeepAlive::default())
}

7.2 新增事件类型与 SSE event 映射

EventBus 事件 SSE event 推送对象 Payload
message.sent message 消息接收者(已有) 消息内容
alert.triggered alert 患者的主治医生 告警详情
device.readings.synced vital_update 患者的主治医生 同步摘要

7.3 医生端实时看板数据推送

前端监听 vital_update 事件,更新实时看板:

// apps/web 前端 SSE 监听扩展
eventSource.addEventListener('vital_update', (e) => {
  const data = JSON.parse(e.data);
  // 更新患者详情页的体征图表(如果正在查看该患者)
  updateVitalSignsChart(data.patient_id, data.summary);
});

eventSource.addEventListener('alert', (e) => {
  const data = JSON.parse(e.data);
  // 显示告警弹窗 + 声音提示
  showAlertNotification(data);
});

8. 多租户与安全

8.1 租户隔离策略

数据层 隔离方式
device_readings tenant_id 列过滤(中间件注入),分区键不含 tenant_id
vital_signs_hourly tenant_id 列过滤 + UNIQUE 约束含 tenant_id
alert_rules tenant_id 列过滤,每租户独立配置规则
alerts tenant_id 列过滤

API 端点全部走现有 JWT 中间件,tenant_id 从 token 提取,不需要额外开发。

8.2 设备认证与数据完整性

设备绑定: 每个设备通过 device_idMAC 哈希)绑定到患者。首次同步时创建绑定关系,后续只接受已绑定设备的数据。

CREATE TABLE patient_devices (
    id          UUID PRIMARY KEY,
    tenant_id   UUID NOT NULL,
    patient_id  UUID NOT NULL,
    device_id   VARCHAR(64) NOT NULL,      -- 设备唯一标识
    device_model VARCHAR(64),
    device_type VARCHAR(32),
    bound_at    TIMESTAMPTZ DEFAULT NOW(),
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    updated_at  TIMESTAMPTZ DEFAULT NOW(),
    created_by  UUID,
    updated_by  UUID,
    deleted_at  TIMESTAMPTZ,               -- 软删除(解绑时设置)
    version     INT NOT NULL DEFAULT 1,
    UNIQUE (tenant_id, patient_id, device_id)
);

数据完整性:

  • API 通过 JWT 认证,确保只有授权用户(患者本人或医护)能提交数据
  • 批量提交有大小限制500 条/请求和频率限制10 次/分钟/用户)
  • measured_at 不接受未来时间(服务端校验)

8.3 PII 加密扩展

手环数据本身不包含 PII心率、步数等不是个人身份信息patient_id 关联关系需要保护。

  • device_readings 表的 patient_id 字段不加密(性能考虑,高频查询)
  • alerts 表的告警详情可能包含患者姓名,走现有 PII 加密流程
  • 查询 API 返回时,按现有 PII 解密规则处理

9. 实施路线图

Phase 1数据管道 + 手环适配(基础能力)

目标: 跑通"手环 → 小程序 → API → 数据库"的完整链路

任务 涉及模块 预估复杂度
数据库迁移:device_readings 分区表 + vital_signs_hourly + patient_devices migration
后端摄入 APIPOST /device-readings/batch + 降采样 upsert erp-health
EventBus 事件扩展:device.readings.synced erp-core + erp-health
小程序 BLE 模块DeviceAdapter 接口 + 小米手环 Adapter miniprogram
小程序同步页面:设备扫描 + 数据同步 UI miniprogram
查询 APIGET /device-readings + /hourly erp-health

验收标准:

  • 小程序能连接小米手环并读取心率数据
  • 数据通过 API 批量提交到 HMS 后端
  • device_readingsvital_signs_hourly 表正确写入
  • 手动执行降采样查询验证数据正确

Phase 2告警引擎 + SSE 推送(智能预警)

目标: 告警规则可配置、评估准确、推送实时

任务 涉及模块 预估复杂度
数据库迁移:alert_rules + alerts migration
告警引擎核心:规则加载 + 评估 + 生成告警 erp-health
SSE 扩展:新增 alert / vital_update 事件推送 erp-server
规则管理 APICRUD + 启停 erp-health
告警确认/处置 API erp-health
医生 Web 端告警通知 UI web frontend
医生 Web 端体征实时看板 web frontend

验收标准:

  • 可通过 API 配置告警规则
  • 手环数据提交后,匹配规则自动触发告警
  • 告警通过 SSE 实时推送到医生端
  • 医生可确认/处置告警
  • 冷却期机制正常工作

Phase 3高级分析 + 更多设备(扩展能力)

目标: 支持更多设备类型,提供更丰富的分析能力

任务 说明
更多手环适配 华为手环、OPPO 手环等 Adapter
血压计/血糖仪 BLE 适配 扩展 DeviceType新增 Adapter
数据看板增强 周/月趋势对比、患者分组统计
降采样后台修正任务 修正乱序数据导致的聚合偏差
数据清理任务 自动 DROP 超期分区
(远期) 微信模板消息推送 告警推送到医生小程序