Files
hms/docs/superpowers/specs/2026-04-25-notification-realtime-architecture-design.md
iven d5950a5179 docs(spec): 全渠道通知与实时架构设计
QA 审查发现的事件消费断裂、无实时推送、微信模板为空等问题。
5 Phase 交付: 通知分发器 → SSE → 微信推送 → WebSocket → HealthDataProvider。
2026-04-25 19:24:37 +08:00

16 KiB
Raw Blame History

HMS 全渠道通知与实时架构设计

日期: 2026-04-25 状态: Draft 范围: 通知管道、实时推送、WebSocket 咨询、HealthDataProvider 渐进实现

1. 背景与动机

QA 审查发现 HMS 系统存在以下关键缺口:

  • 事件消费断裂: appointment.*health.* 事件已发布但无消费者,患者预约后无任何通知
  • 无实时推送: 服务端无 WebSocket/SSE 基础设施,所有数据靠轮询获取
  • 微信模板 ID 为空: 小程序推送通知不可用
  • HealthDataProvider 全是 stub: AI 集成无数据访问入口

2. 设计决策

决策点 选择 理由
通知覆盖范围 全渠道(站内+微信+实时) 医疗场景需要及时通知
实时技术 混合 WS+SSE WS 适合双向聊天SSE 适合单向通知
HealthDataProvider 保留 trait渐进实现 保持抽象边界,等 AI 需求明确
小程序离线模式 延迟,当前不做 优先级低于实时通知
管理仪表盘 独立推进 与通知架构无强依赖

3. 架构概览

┌─────────────────────────────────────────────────────────────────┐
│                     事件总线 (erp-core, 已有)                    │
│  appointment.* / health.* / workflow.* / message.*              │
└───────┬──────────────────────────────┬──────────────────────────┘
        │                              │
        ▼                              ▼
┌───────────────────────┐   ┌────────────────────────────────────┐
│ 通知分发器 (新增)       │   │ SSE 推送网关 (新增)                 │
│ • 订阅所有业务事件      │──→│ • GET /api/v1/notifications/sse    │
│ • 查询用户通知偏好      │   │ • JWT 认证 + 租户过滤               │
│ • 多渠道分发:           │   │ • 心跳 30s                          │
│   → 站内消息 (已有)     │   │ • 自动重连 (Last-Event-ID)          │
│   → 微信推送 (新增)     │   └────────────────────────────────────┘
│   → SSE 实时推送 ↗      │
└───────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ WebSocket 网关 (新增) — 仅用于咨询聊天                            │
│ • WS /api/v1/health/consultations/{id}/ws                      │
│ • JWT 握手认证                                                   │
│ • 消息先持久化再广播                                              │
│ • 在线状态(只记录 last_active_at不做 presence 系统)           │
└─────────────────────────────────────────────────────────────────┘

4. Phase 分解与实施计划

Phase A: 通知分发器立即2-3 天)

目标: 让健康事件能自动生成站内通知消息。

实现位置: crates/erp-message/src/notification_dispatcher.rs(新增)

核心逻辑:

订阅 EventBus (prefix: "")
  → 匹配 event_type:
     "appointment.created"   → 通知患者 "预约已创建"
     "appointment.confirmed" → 通知患者 "预约已确认"
     "appointment.cancelled" → 通知患者 "预约已取消"
     "appointment.completed" → 通知患者 "预约已完成"
     "appointment.no_show"   → 通知患者 "预约已标记爽约"
     "follow_up.created"     → 通知医生 "新随访任务"
     "follow_up.overdue"     → 通知医生 "随访任务逾期" (需新增事件发布)
     "health.vital_signs.created" → 通知医生 "患者录入生命体征"
     "lab_report.uploaded"   → 通知患者 "化验报告已出"
  → 查询用户通知偏好 (subscription_service)
  → 按偏好分发到对应渠道 (当前只有站内消息)

注意: 事件类型名称必须与后端实际发布的一致。appointment.* 通过 format!("appointment.{}", status) 动态生成;follow_up.createdlab_report.uploaded 分别在 follow_up_service.rshealth_data_service.rs 中硬编码。follow_up.overdue 事件当前不存在,需在 Phase A 中新增。

事件 payload 要求:

  • appointment.*: 需要 patient_id, doctor_id, status
  • follow_up.created: 需要 task_id, patient_id, doctor_id
  • follow_up.overdue: 需要 task_id, patient_id, doctor_id(新增)
  • lab_report.uploaded: 需要 patient_id, report_id
  • health.vital_signs.created: 需要 patient_id

变更文件:

  • 新增: crates/erp-message/src/notification_dispatcher.rs
  • 修改: crates/erp-message/src/module.rs — 启动分发器
  • 修改: crates/erp-server/src/main.rs — 调用启动
  • 修改: crates/erp-health/src/service/follow_up_service.rscheck_overdue_tasks 发布 follow_up.overdue 事件
  • 修改: crates/erp-health/src/service/follow_up_service.rscreate_task 发布 follow_up.created 事件(如尚未发布)

验证:

  1. 创建预约 → 确认 messages 表有新记录
  2. 化验报告创建 → 确认患者收到通知
  3. 随访逾期 → 确认医生收到通知

Phase B: SSE 推送网关(紧随 Phase A2-3 天)

目标: Web 端和小程序端能实时接收通知。

实现位置: crates/erp-server/src/sse.rs(新增)

API 端点:

  • GET /api/v1/notifications/sse — SSE 连接端点
  • 请求头: Authorization: Bearer <jwt>
  • 响应头: Content-Type: text/event-stream
  • 事件格式:
    event: notification
    data: {"id":"...","title":"预约已确认","body":"...","priority":"normal","business_type":"appointment","business_id":"..."}
    

核心逻辑:

  1. JWT 认证,提取 user_idtenant_id
  2. 订阅 EventBustenant_id + 用户相关的 patient_id/user_id 过滤
  3. 维护连接池(DashMap<Uuid, mpsc::Sender>),支持多设备同时连接
  4. 心跳每 30s 发送 :heartbeat\n\n
  5. 支持 Last-Event-ID 断线重连。新建 notification_events 表(含 recipient_idevent_idchannelcreated_at)用于高效按用户重放;domain_events 表无 recipient_id 列,不适合直接用于 SSE 重放

前端集成:

  • Web: apps/web/src/hooks/useEventSource.ts — 封装 EventSource + 自动重连
  • 小程序: 不支持原生 SSE使用轮询 GET /api/v1/notifications/sse/poll?after=<last_id> 降级方案

变更文件:

  • 新增: crates/erp-server/src/sse.rs
  • 新增: apps/web/src/hooks/useEventSource.ts
  • 修改: crates/erp-server/src/main.rs — 注册 SSE 路由
  • 修改: crates/erp-message/src/notification_dispatcher.rs — 同时推送到 SSE 连接池
  • 修改: crates/erp-server/Cargo.toml — 添加 dashmap 依赖
  • 新增: 迁移文件 — 创建 notification_events

验证:

  1. Web 端连接 SSE → 后端创建预约 → Web 端实时收到通知
  2. 断线重连 → 补发遗漏事件
  3. 多设备连接 → 均收到通知

Phase C: 微信模板消息推送Phase B 之后1-2 天)

目标: 小程序端通过微信推送通知患者。

前置条件: 在微信公众平台注册 3 个模板消息:

  1. 预约提醒模板 — 包含:医生名、预约时间、科室
  2. 随访提醒模板 — 包含:随访类型、截止日期、医生名
  3. 报告通知模板 — 包含:报告类型、出报告日期

实现位置: crates/erp-auth/src/service/wechat_notify_service.rs(新增)

核心逻辑:

通知分发器 → 检查用户偏好.channel_preferences 包含 "wechat"
  → 查询 wechat_users 表获取 openid
  → 调用微信模板消息 API
  → 记录推送结果到 message_audit 表

微信 API: POST https://api.weixin.qq.com/cgi-bin/message/template/send

变更文件:

  • 新增: crates/erp-auth/src/service/wechat_notify_service.rs
  • 修改: crates/erp-message/src/notification_dispatcher.rs — 添加微信渠道
  • 修改: apps/miniprogram/src/services/wechat-templates.ts — 填入实际模板 ID

验证:

  1. 预约确认 → 患者微信收到模板消息
  2. 随访到期 → 患者微信收到提醒
  3. 推送失败 → 记录日志,不影响主流程

Phase D: WebSocket 咨询聊天Phase B 之后3-4 天)

目标: 替换咨询消息的轮询为 WebSocket 实时通信。

API 端点:

  • WS /api/v1/health/consultations/{session_id}/ws
  • 握手: ?token=<jwt>

协议:

// 客户端 → 服务端
{"type": "message", "content": "医生您好..."}
{"type": "typing", "is_typing": true}

// 服务端 → 客户端
{"type": "message", "id": "...", "sender_id": "...", "content": "...", "created_at": "..."}
{"type": "message_ack", "temp_id": "...", "server_id": "..."}  // 确认客户端临时消息
{"type": "user_typing", "user_id": "...", "is_typing": true}
{"type": "session_closed", "reason": "..."}

核心逻辑:

  1. JWT 认证,验证用户是 session 的参与方
  2. 消息先持久化到 consultation_message 表,再广播给 session 内所有连接
  3. 乐观发送客户端立即显示消息temp_id服务端确认后替换为 server_id
  4. 连接管理:每个 session 一个 broadcast channelDashMap<Uuid, broadcast::Sender>
  5. 速率限制:每用户每分钟最多 60 条消息,超出返回 error frame
  6. 降级方案:保留现有 HTTP 轮询端点 GET /health/consultation-messages,客户端在 WS 连接失败时自动切换为轮询

变更文件:

  • 新增: crates/erp-health/src/ws/ — WebSocket handler + 连接管理
  • 修改: crates/erp-health/src/module.rs — 注册 WS 路由
  • 新增: apps/web/src/hooks/useConsultationWS.ts — WebSocket hook
  • 修改: apps/web/src/pages/health/ConsultationDetail.tsx — 使用 WS 替代轮询
  • 新增: apps/miniprogram/src/services/websocket.ts — 小程序 WS 服务

验证:

  1. 打开咨询详情 → WS 连接建立
  2. 发送消息 → 对方实时收到
  3. 网络断开 → 自动重连 + 补发消息
  4. 关闭会话 → WS 关闭 + 双方收到通知

Phase E: HealthDataProvider 渐进实现 + 管理仪表盘远期5-7 天)

前置条件: 修复 admin_list_orders 传递 nil UUID 的 bug见 QA 审查计划 Phase 2.1),此 bug 应独立于本设计文档提前修复。

HealthDataProvider:

  • 标记 trait 为 #[doc(hidden)] + #[unstable]
  • 将 trait 拆分为 CoreHealthDataProvider稳定2 个方法)和 ExtendedHealthDataProvider实验性2 个方法):
    • CoreHealthDataProvider:
      • get_lab_report → 委托给 health_data_service::get_lab_report
      • get_patient_summary → 委托给 patient_service::get_health_summary
    • ExtendedHealthDataProvider(保持 stub:
      • get_vital_signs — 等 AI 需求明确后实现
      • get_full_report — 等 AI 需求明确后实现
  • AI 模块仅依赖 CoreHealthDataProvider,获得编译时保证
  • 在 AI 模块集成时验证接口适配性,根据实际使用情况决定是否合并回单一 trait

管理仪表盘:

  • 修复 admin_list_orders(当前传 nil UUID 的 bug
  • 新增管理员统计 API: GET /api/v1/health/admin/stats
    • 返回: 总预约数、按状态分布、积分兑换趋势、活跃患者数
  • 新增管理仪表盘页面: apps/web/src/pages/health/AdminDashboard.tsx

5. 数据模型变更

通知事件记录表(新增)

CREATE TABLE notification_events (
    id UUID PRIMARY KEY,
    tenant_id UUID NOT NULL,
    recipient_id UUID NOT NULL,          -- 接收者用户 ID
    event_id UUID NOT NULL,              -- 关联 domain_events.id
    channel VARCHAR(20) NOT NULL,         -- 'in_app' | 'wechat' | 'sse'
    status VARCHAR(20) DEFAULT 'pending', -- 'pending' | 'sent' | 'failed'
    created_at TIMESTAMPTZ NOT NULL,
    INDEX idx_notification_recipient (tenant_id, recipient_id, created_at DESC)
);

此表用于 SSE 断线重放,按 recipient_id 高效查询用户遗漏的通知。

通知偏好扩展

message_subscriptions 表字段扩展(复用已有 channel_preferences JSON 列):

  • channel_preferences JSON — 值格式: ["in_app", "wechat", "sse"](列已存在,需统一值格式)
  • 新增: quiet_hours_start TIME — 免打扰开始时间
  • 新增: quiet_hours_end TIME — 免打扰结束时间

SSE 连接管理(内存中,不持久化)

struct ConnectionManager {
    // user_id → 该用户的所有 SSE 连接
    connections: DashMap<Uuid, Vec<mpsc::Sender<SseEvent>>>,
    // session_id → 该咨询的所有 WS 连接
    ws_sessions: DashMap<Uuid, broadcast::Sender<WsMessage>>,
}

6. 错误处理策略

场景 处理方式
站内消息发送失败 重试 3 次,仍失败则记录 dead-letter
SSE 连接断开 客户端自动重连,服务端通过 Last-Event-ID 补发
微信推送失败 记录日志,降级为站内消息,不影响主流程
WS 连接断开 客户端自动重连,服务端补发断线期间消息
通知偏好查询失败 使用默认偏好(全渠道)

7. 性能考虑

  • SSE 连接数: 单用户最多 3 个设备连接,超出自动关闭最旧连接
  • WS 连接数: 单 session 最多 10 个连接(考虑多标签页)
  • 事件过滤: SSE 端在服务端完成用户+租户过滤,不传输不相关事件
  • 心跳: SSE 30sWS 45s超时自动断开
  • 消息广播: 使用 broadcast channel内存开销 O(连接数 × 活跃 session 数)

8. 安全考虑

  • SSE 和 WS 均需 JWT 认证
  • WS 在握手时验证用户是 session 的参与方patient 或 doctor
  • SSE 只推送当前用户+当前租户的事件
  • 微信 openid 不暴露给前端,仅服务端使用
  • 通知内容不包含 PII身份证、手机号仅显示摘要信息

9. 验证计划

Phase A 验证

  1. 创建预约 → messages 表新增记录 → 患者用户 ID 为 recipient
  2. 化验报告创建 → 患者收到通知
  3. 随访任务逾期 → 医生收到通知
  4. 用户设置免打扰 → 免打扰时段内不收到通知

Phase B 验证

  1. Web 端连接 SSE → 实时接收通知
  2. 断开网络 10s → 恢复 → 补发遗漏事件
  3. 并发 100 个 SSE 连接 → 性能不退化

Phase C 验证

  1. 预约确认 → 患者微信收到模板消息
  2. 推送失败 → 站内消息正常发送

Phase D 验证

  1. 医生和患者同时在线 → 消息实时到达
  2. 发送消息 → 服务端持久化 → 对方收到
  3. 断线重连 → 消息不丢失
  4. 关闭会话 → WS 正常关闭

10. 风险与缓解

风险 影响 缓解措施
微信模板消息审核不通过 无法推送微信通知 降级为站内消息 + SSE
SSE 连接数过多消耗内存 服务性能下降 单用户限制连接数 + 空闲超时断开
WebSocket 代理/防火墙阻断 咨询聊天不可用 保留 HTTP 轮询作为降级方案
事件总线延迟 通知不及时 监控 lag 指标,超出阈值告警