diff --git a/docs/superpowers/specs/2026-04-25-notification-realtime-architecture-design.md b/docs/superpowers/specs/2026-04-25-notification-realtime-architecture-design.md new file mode 100644 index 0000000..355bb81 --- /dev/null +++ b/docs/superpowers/specs/2026-04-25-notification-realtime-architecture-design.md @@ -0,0 +1,331 @@ +# HMS 全渠道通知与实时架构设计 + +> 日期: 2026-04-25 +> 状态: Draft +> 范围: 通知管道、实时推送、WebSocket 咨询、HealthDataProvider 渐进实现 + +## 1. 背景与动机 + +QA 审查发现 HMS 系统存在以下关键缺口: + +- **事件消费断裂**: `appointment.*` 和 `health.*` 事件已发布但无消费者,患者预约后无任何通知 +- **无实时推送**: 服务端无 WebSocket/SSE 基础设施,所有数据靠轮询获取 +- **微信模板 ID 为空**: 小程序推送通知不可用 +- **HealthDataProvider 全是 stub**: AI 集成无数据访问入口 + +## 2. 设计决策 + +| 决策点 | 选择 | 理由 | +|--------|------|------| +| 通知覆盖范围 | 全渠道(站内+微信+实时) | 医疗场景需要及时通知 | +| 实时技术 | 混合 WS+SSE | WS 适合双向聊天,SSE 适合单向通知 | +| HealthDataProvider | 保留 trait,渐进实现 | 保持抽象边界,等 AI 需求明确 | +| 小程序离线模式 | 延迟,当前不做 | 优先级低于实时通知 | +| 管理仪表盘 | 独立推进 | 与通知架构无强依赖 | + +## 3. 架构概览 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 事件总线 (erp-core, 已有) │ +│ appointment.* / health.* / workflow.* / message.* │ +└───────┬──────────────────────────────┬──────────────────────────┘ + │ │ + ▼ ▼ +┌───────────────────────┐ ┌────────────────────────────────────┐ +│ 通知分发器 (新增) │ │ SSE 推送网关 (新增) │ +│ • 订阅所有业务事件 │──→│ • GET /api/v1/notifications/sse │ +│ • 查询用户通知偏好 │ │ • JWT 认证 + 租户过滤 │ +│ • 多渠道分发: │ │ • 心跳 30s │ +│ → 站内消息 (已有) │ │ • 自动重连 (Last-Event-ID) │ +│ → 微信推送 (新增) │ └────────────────────────────────────┘ +│ → SSE 实时推送 ↗ │ +└───────────────────────┘ + +┌─────────────────────────────────────────────────────────────────┐ +│ WebSocket 网关 (新增) — 仅用于咨询聊天 │ +│ • WS /api/v1/health/consultations/{id}/ws │ +│ • JWT 握手认证 │ +│ • 消息先持久化再广播 │ +│ • 在线状态(只记录 last_active_at,不做 presence 系统) │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## 4. Phase 分解与实施计划 + +### Phase A: 通知分发器(立即,2-3 天) + +**目标**: 让健康事件能自动生成站内通知消息。 + +**实现位置**: `crates/erp-message/src/notification_dispatcher.rs`(新增) + +**核心逻辑**: +``` +订阅 EventBus (prefix: "") + → 匹配 event_type: + "appointment.created" → 通知患者 "预约已创建" + "appointment.confirmed" → 通知患者 "预约已确认" + "appointment.cancelled" → 通知患者 "预约已取消" + "appointment.completed" → 通知患者 "预约已完成" + "appointment.no_show" → 通知患者 "预约已标记爽约" + "follow_up.created" → 通知医生 "新随访任务" + "follow_up.overdue" → 通知医生 "随访任务逾期" (需新增事件发布) + "health.vital_signs.created" → 通知医生 "患者录入生命体征" + "lab_report.uploaded" → 通知患者 "化验报告已出" + → 查询用户通知偏好 (subscription_service) + → 按偏好分发到对应渠道 (当前只有站内消息) +``` + +> **注意**: 事件类型名称必须与后端实际发布的一致。`appointment.*` 通过 +> `format!("appointment.{}", status)` 动态生成;`follow_up.created` 和 +> `lab_report.uploaded` 分别在 `follow_up_service.rs` 和 `health_data_service.rs` +> 中硬编码。`follow_up.overdue` 事件当前不存在,需在 Phase A 中新增。 + +**事件 payload 要求**: +- `appointment.*`: 需要 `patient_id`, `doctor_id`, `status` +- `follow_up.created`: 需要 `task_id`, `patient_id`, `doctor_id` +- `follow_up.overdue`: 需要 `task_id`, `patient_id`, `doctor_id`(新增) +- `lab_report.uploaded`: 需要 `patient_id`, `report_id` +- `health.vital_signs.created`: 需要 `patient_id` + +**变更文件**: +- 新增: `crates/erp-message/src/notification_dispatcher.rs` +- 修改: `crates/erp-message/src/module.rs` — 启动分发器 +- 修改: `crates/erp-server/src/main.rs` — 调用启动 +- 修改: `crates/erp-health/src/service/follow_up_service.rs` — `check_overdue_tasks` 发布 `follow_up.overdue` 事件 +- 修改: `crates/erp-health/src/service/follow_up_service.rs` — `create_task` 发布 `follow_up.created` 事件(如尚未发布) + +**验证**: +1. 创建预约 → 确认 `messages` 表有新记录 +2. 化验报告创建 → 确认患者收到通知 +3. 随访逾期 → 确认医生收到通知 + +### Phase B: SSE 推送网关(紧随 Phase A,2-3 天) + +**目标**: Web 端和小程序端能实时接收通知。 + +**实现位置**: `crates/erp-server/src/sse.rs`(新增) + +**API 端点**: +- `GET /api/v1/notifications/sse` — SSE 连接端点 +- 请求头: `Authorization: Bearer ` +- 响应头: `Content-Type: text/event-stream` +- 事件格式: + ``` + event: notification + data: {"id":"...","title":"预约已确认","body":"...","priority":"normal","business_type":"appointment","business_id":"..."} + ``` + +**核心逻辑**: +1. JWT 认证,提取 `user_id` 和 `tenant_id` +2. 订阅 EventBus,按 `tenant_id` + 用户相关的 `patient_id`/`user_id` 过滤 +3. 维护连接池(`DashMap`),支持多设备同时连接 +4. 心跳每 30s 发送 `:heartbeat\n\n` +5. 支持 `Last-Event-ID` 断线重连。新建 `notification_events` 表(含 `recipient_id`、`event_id`、`channel`、`created_at`)用于高效按用户重放;`domain_events` 表无 `recipient_id` 列,不适合直接用于 SSE 重放 + +**前端集成**: +- Web: `apps/web/src/hooks/useEventSource.ts` — 封装 EventSource + 自动重连 +- 小程序: 不支持原生 SSE,使用轮询 `GET /api/v1/notifications/sse/poll?after=` 降级方案 + +**变更文件**: +- 新增: `crates/erp-server/src/sse.rs` +- 新增: `apps/web/src/hooks/useEventSource.ts` +- 修改: `crates/erp-server/src/main.rs` — 注册 SSE 路由 +- 修改: `crates/erp-message/src/notification_dispatcher.rs` — 同时推送到 SSE 连接池 +- 修改: `crates/erp-server/Cargo.toml` — 添加 `dashmap` 依赖 +- 新增: 迁移文件 — 创建 `notification_events` 表 + +**验证**: +1. Web 端连接 SSE → 后端创建预约 → Web 端实时收到通知 +2. 断线重连 → 补发遗漏事件 +3. 多设备连接 → 均收到通知 + +### Phase C: 微信模板消息推送(Phase B 之后,1-2 天) + +**目标**: 小程序端通过微信推送通知患者。 + +**前置条件**: 在微信公众平台注册 3 个模板消息: +1. 预约提醒模板 — 包含:医生名、预约时间、科室 +2. 随访提醒模板 — 包含:随访类型、截止日期、医生名 +3. 报告通知模板 — 包含:报告类型、出报告日期 + +**实现位置**: `crates/erp-auth/src/service/wechat_notify_service.rs`(新增) + +**核心逻辑**: +``` +通知分发器 → 检查用户偏好.channel_preferences 包含 "wechat" + → 查询 wechat_users 表获取 openid + → 调用微信模板消息 API + → 记录推送结果到 message_audit 表 +``` + +**微信 API**: `POST https://api.weixin.qq.com/cgi-bin/message/template/send` + +**变更文件**: +- 新增: `crates/erp-auth/src/service/wechat_notify_service.rs` +- 修改: `crates/erp-message/src/notification_dispatcher.rs` — 添加微信渠道 +- 修改: `apps/miniprogram/src/services/wechat-templates.ts` — 填入实际模板 ID + +**验证**: +1. 预约确认 → 患者微信收到模板消息 +2. 随访到期 → 患者微信收到提醒 +3. 推送失败 → 记录日志,不影响主流程 + +### Phase D: WebSocket 咨询聊天(Phase B 之后,3-4 天) + +**目标**: 替换咨询消息的轮询为 WebSocket 实时通信。 + +**API 端点**: +- `WS /api/v1/health/consultations/{session_id}/ws` +- 握手: `?token=` + +**协议**: +```json +// 客户端 → 服务端 +{"type": "message", "content": "医生您好..."} +{"type": "typing", "is_typing": true} + +// 服务端 → 客户端 +{"type": "message", "id": "...", "sender_id": "...", "content": "...", "created_at": "..."} +{"type": "message_ack", "temp_id": "...", "server_id": "..."} // 确认客户端临时消息 +{"type": "user_typing", "user_id": "...", "is_typing": true} +{"type": "session_closed", "reason": "..."} +``` + +**核心逻辑**: +1. JWT 认证,验证用户是 session 的参与方 +2. 消息先持久化到 `consultation_message` 表,再广播给 session 内所有连接 +3. 乐观发送:客户端立即显示消息(temp_id),服务端确认后替换为 server_id +4. 连接管理:每个 session 一个 broadcast channel,`DashMap` +5. 速率限制:每用户每分钟最多 60 条消息,超出返回 error frame +6. 降级方案:保留现有 HTTP 轮询端点 `GET /health/consultation-messages`,客户端在 WS 连接失败时自动切换为轮询 + +**变更文件**: +- 新增: `crates/erp-health/src/ws/` — WebSocket handler + 连接管理 +- 修改: `crates/erp-health/src/module.rs` — 注册 WS 路由 +- 新增: `apps/web/src/hooks/useConsultationWS.ts` — WebSocket hook +- 修改: `apps/web/src/pages/health/ConsultationDetail.tsx` — 使用 WS 替代轮询 +- 新增: `apps/miniprogram/src/services/websocket.ts` — 小程序 WS 服务 + +**验证**: +1. 打开咨询详情 → WS 连接建立 +2. 发送消息 → 对方实时收到 +3. 网络断开 → 自动重连 + 补发消息 +4. 关闭会话 → WS 关闭 + 双方收到通知 + +### Phase E: HealthDataProvider 渐进实现 + 管理仪表盘(远期,5-7 天) + +> **前置条件**: 修复 `admin_list_orders` 传递 nil UUID 的 bug(见 QA 审查计划 Phase 2.1),此 bug 应独立于本设计文档提前修复。 + +**HealthDataProvider**: +- 标记 trait 为 `#[doc(hidden)]` + `#[unstable]` +- 将 trait 拆分为 `CoreHealthDataProvider`(稳定,2 个方法)和 `ExtendedHealthDataProvider`(实验性,2 个方法): + - `CoreHealthDataProvider`: + - `get_lab_report` → 委托给 `health_data_service::get_lab_report` + - `get_patient_summary` → 委托给 `patient_service::get_health_summary` + - `ExtendedHealthDataProvider`(保持 stub): + - `get_vital_signs` — 等 AI 需求明确后实现 + - `get_full_report` — 等 AI 需求明确后实现 +- AI 模块仅依赖 `CoreHealthDataProvider`,获得编译时保证 +- 在 AI 模块集成时验证接口适配性,根据实际使用情况决定是否合并回单一 trait + +**管理仪表盘**: +- 修复 `admin_list_orders`(当前传 nil UUID 的 bug) +- 新增管理员统计 API: `GET /api/v1/health/admin/stats` + - 返回: 总预约数、按状态分布、积分兑换趋势、活跃患者数 +- 新增管理仪表盘页面: `apps/web/src/pages/health/AdminDashboard.tsx` + +## 5. 数据模型变更 + +### 通知事件记录表(新增) + +```sql +CREATE TABLE notification_events ( + id UUID PRIMARY KEY, + tenant_id UUID NOT NULL, + recipient_id UUID NOT NULL, -- 接收者用户 ID + event_id UUID NOT NULL, -- 关联 domain_events.id + channel VARCHAR(20) NOT NULL, -- 'in_app' | 'wechat' | 'sse' + status VARCHAR(20) DEFAULT 'pending', -- 'pending' | 'sent' | 'failed' + created_at TIMESTAMPTZ NOT NULL, + INDEX idx_notification_recipient (tenant_id, recipient_id, created_at DESC) +); +``` + +此表用于 SSE 断线重放,按 `recipient_id` 高效查询用户遗漏的通知。 + +### 通知偏好扩展 + +`message_subscriptions` 表字段扩展(复用已有 `channel_preferences` JSON 列): +- `channel_preferences` JSON — 值格式: `["in_app", "wechat", "sse"]`(列已存在,需统一值格式) +- 新增: `quiet_hours_start` TIME — 免打扰开始时间 +- 新增: `quiet_hours_end` TIME — 免打扰结束时间 + +### SSE 连接管理(内存中,不持久化) + +```rust +struct ConnectionManager { + // user_id → 该用户的所有 SSE 连接 + connections: DashMap>>, + // session_id → 该咨询的所有 WS 连接 + ws_sessions: DashMap>, +} +``` + +## 6. 错误处理策略 + +| 场景 | 处理方式 | +|------|----------| +| 站内消息发送失败 | 重试 3 次,仍失败则记录 dead-letter | +| SSE 连接断开 | 客户端自动重连,服务端通过 Last-Event-ID 补发 | +| 微信推送失败 | 记录日志,降级为站内消息,不影响主流程 | +| WS 连接断开 | 客户端自动重连,服务端补发断线期间消息 | +| 通知偏好查询失败 | 使用默认偏好(全渠道) | + +## 7. 性能考虑 + +- **SSE 连接数**: 单用户最多 3 个设备连接,超出自动关闭最旧连接 +- **WS 连接数**: 单 session 最多 10 个连接(考虑多标签页) +- **事件过滤**: SSE 端在服务端完成用户+租户过滤,不传输不相关事件 +- **心跳**: SSE 30s,WS 45s,超时自动断开 +- **消息广播**: 使用 broadcast channel,内存开销 O(连接数 × 活跃 session 数) + +## 8. 安全考虑 + +- SSE 和 WS 均需 JWT 认证 +- WS 在握手时验证用户是 session 的参与方(patient 或 doctor) +- SSE 只推送当前用户+当前租户的事件 +- 微信 openid 不暴露给前端,仅服务端使用 +- 通知内容不包含 PII(身份证、手机号),仅显示摘要信息 + +## 9. 验证计划 + +### Phase A 验证 +1. 创建预约 → `messages` 表新增记录 → 患者用户 ID 为 recipient +2. 化验报告创建 → 患者收到通知 +3. 随访任务逾期 → 医生收到通知 +4. 用户设置免打扰 → 免打扰时段内不收到通知 + +### Phase B 验证 +1. Web 端连接 SSE → 实时接收通知 +2. 断开网络 10s → 恢复 → 补发遗漏事件 +3. 并发 100 个 SSE 连接 → 性能不退化 + +### Phase C 验证 +1. 预约确认 → 患者微信收到模板消息 +2. 推送失败 → 站内消息正常发送 + +### Phase D 验证 +1. 医生和患者同时在线 → 消息实时到达 +2. 发送消息 → 服务端持久化 → 对方收到 +3. 断线重连 → 消息不丢失 +4. 关闭会话 → WS 正常关闭 + +## 10. 风险与缓解 + +| 风险 | 影响 | 缓解措施 | +|------|------|----------| +| 微信模板消息审核不通过 | 无法推送微信通知 | 降级为站内消息 + SSE | +| SSE 连接数过多消耗内存 | 服务性能下降 | 单用户限制连接数 + 空闲超时断开 | +| WebSocket 代理/防火墙阻断 | 咨询聊天不可用 | 保留 HTTP 轮询作为降级方案 | +| 事件总线延迟 | 通知不及时 | 监控 lag 指标,超出阈值告警 |