feat(health+ai): P2 咨询联动 + AI 巡检消费 — 全链路打通
业务链路打通 5/5 断点全部完成: - 咨询→随访:医生端新增"创建随访"按钮,从咨询会话直接创建随访任务 - 咨询→AI:医生端新增"AI 分析"按钮,对咨询上下文触发 AI 分析 - 告警→咨询:小程序告警详情页新增"在线咨询"快捷入口 - AI 巡检消费:erp-ai 新增 patrol_consumer,订阅 ai.patrol.requested 事件 - 前端联动:Web ConsultationDetail + 小程序 alerts 页面联动实现 后端:2 新 API + 2 handler + 1 service + AI event consumer 前端:Web 2 API + 1 页面改造 + 小程序 2 页面改造 测试:Web consultations.test.ts 9/9 通过
This commit is contained in:
@@ -1 +1,2 @@
|
||||
pub mod copilot_consumer;
|
||||
pub mod patrol_consumer;
|
||||
|
||||
254
crates/erp-ai/src/event/patrol_consumer.rs
Normal file
254
crates/erp-ai/src/event/patrol_consumer.rs
Normal file
@@ -0,0 +1,254 @@
|
||||
//! AI 巡护事件消费者 — 订阅 health 模块发布的 `ai.patrol.requested` 事件,
|
||||
//! 为未处理告警患者自动入队 AI 趋势分析。
|
||||
//!
|
||||
//! 事件来源:erp-health 每日定时扫描有未处理告警的患者,发布此事件。
|
||||
//! 处理策略:幂等消费 + 入队分析(复用 AnalysisQueue),不直接调用 AI Provider。
|
||||
|
||||
use erp_core::events::{EventBus, SubscriptionHandle};
|
||||
use tracing::{info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// 消费者标识,用于幂等检查(processed_events 表的 consumer_id)
|
||||
const CONSUMER_ID: &str = "patrol_consumer";
|
||||
|
||||
/// 订阅事件类型前缀
|
||||
const EVENT_PREFIX: &str = "ai.patrol.";
|
||||
|
||||
/// 巡护请求事件 payload 中提取的结构
|
||||
#[derive(Debug)]
|
||||
struct PatrolRequest {
|
||||
patient_id: Uuid,
|
||||
doctor_id: Option<Uuid>,
|
||||
source: String,
|
||||
reason: String,
|
||||
}
|
||||
|
||||
/// 从事件 payload 中提取巡护请求字段
|
||||
fn extract_patrol_request(payload: &serde_json::Value) -> Option<PatrolRequest> {
|
||||
let patient_id = payload
|
||||
.get("patient_id")
|
||||
.or_else(|| payload.get("data").and_then(|d| d.get("patient_id")))
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok())?;
|
||||
|
||||
let doctor_id = payload
|
||||
.get("doctor_id")
|
||||
.or_else(|| payload.get("data").and_then(|d| d.get("doctor_id")))
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok());
|
||||
|
||||
let source = payload
|
||||
.get("source")
|
||||
.or_else(|| payload.get("data").and_then(|d| d.get("source")))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("daily_patrol")
|
||||
.to_string();
|
||||
|
||||
let reason = payload
|
||||
.get("reason")
|
||||
.or_else(|| payload.get("data").and_then(|d| d.get("reason")))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("未指定原因")
|
||||
.to_string();
|
||||
|
||||
Some(PatrolRequest {
|
||||
patient_id,
|
||||
doctor_id,
|
||||
source,
|
||||
reason,
|
||||
})
|
||||
}
|
||||
|
||||
/// 启动巡护事件消费者。
|
||||
///
|
||||
/// 订阅 `ai.patrol.*` 前缀事件,收到 `ai.patrol.requested` 时:
|
||||
/// 1. 幂等检查(跳过已处理事件)
|
||||
/// 2. 提取 patient_id / tenant_id / reason
|
||||
/// 3. 入队 AI 趋势分析(priority=2,与 health_data.critical_alert 同级)
|
||||
/// 4. 标记事件已处理
|
||||
pub fn spawn(db: &sea_orm::DatabaseConnection, event_bus: &EventBus) -> SubscriptionHandle {
|
||||
let (mut rx, handle) = event_bus.subscribe_filtered(EVENT_PREFIX.to_string());
|
||||
let db = db.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
info!("AI 巡护事件消费者已启动,监听前缀: {EVENT_PREFIX}");
|
||||
|
||||
while let Some(event) = rx.recv().await {
|
||||
if event.event_type != "ai.patrol.requested" {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 幂等检查
|
||||
match erp_core::events::is_event_processed(&db, event.id, CONSUMER_ID).await {
|
||||
Ok(true) => {
|
||||
info!(
|
||||
event_id = %event.id,
|
||||
"巡护事件已处理,跳过"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Ok(false) => {}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
event_id = %event.id,
|
||||
error = %e,
|
||||
"幂等检查失败,继续处理"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 提取 payload
|
||||
let request = match extract_patrol_request(&event.payload) {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
warn!(
|
||||
event_id = %event.id,
|
||||
"ai.patrol.requested 事件缺少 patient_id 字段,跳过"
|
||||
);
|
||||
// 仍然标记已处理,避免重复消费格式错误的事件
|
||||
let _ =
|
||||
erp_core::events::mark_event_processed(&db, event.id, CONSUMER_ID).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
event_id = %event.id,
|
||||
tenant_id = %event.tenant_id,
|
||||
patient_id = %request.patient_id,
|
||||
source = %request.source,
|
||||
reason = %request.reason,
|
||||
"收到巡护分析请求"
|
||||
);
|
||||
|
||||
// 入队趋势分析 — 复用现有 AnalysisQueue
|
||||
let queue = crate::service::analysis_queue::AnalysisQueue::new(db.clone());
|
||||
let job = crate::service::analysis_queue::AnalysisJob {
|
||||
tenant_id: event.tenant_id,
|
||||
patient_id: request.patient_id,
|
||||
analysis_type: "trend".to_string(),
|
||||
priority: 2, // 与 critical_alert 同级,高于普通分析
|
||||
source_event: Some(event.event_type.clone()),
|
||||
source_ref: format!("patrol:{}", request.reason),
|
||||
created_by: request.doctor_id,
|
||||
};
|
||||
|
||||
match queue.enqueue(job).await {
|
||||
Ok(queue_id) => {
|
||||
info!(
|
||||
queue_id = %queue_id,
|
||||
patient_id = %request.patient_id,
|
||||
tenant_id = %event.tenant_id,
|
||||
"巡护分析已入队"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
patient_id = %request.patient_id,
|
||||
tenant_id = %event.tenant_id,
|
||||
error = %e,
|
||||
"巡护分析入队失败"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 标记事件已处理(无论入队成功与否,避免无限重试同一事件)
|
||||
if let Err(e) = erp_core::events::mark_event_processed(&db, event.id, CONSUMER_ID).await
|
||||
{
|
||||
warn!(
|
||||
event_id = %event.id,
|
||||
error = %e,
|
||||
"标记巡护事件已处理失败(非致命)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
info!("AI 巡护事件消费者已退出");
|
||||
});
|
||||
|
||||
handle
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_extract_patrol_request_full_payload() {
|
||||
let payload = serde_json::json!({
|
||||
"patient_id": "01234567-89ab-7def-8000-000000000001",
|
||||
"doctor_id": "01234567-89ab-7def-8000-000000000002",
|
||||
"source": "daily_patrol",
|
||||
"reason": "告警未处理: 血压异常"
|
||||
});
|
||||
|
||||
let request = extract_patrol_request(&payload).unwrap();
|
||||
assert_eq!(
|
||||
request.patient_id.to_string(),
|
||||
"01234567-89ab-7def-8000-000000000001"
|
||||
);
|
||||
assert!(request.doctor_id.is_some());
|
||||
assert_eq!(request.source, "daily_patrol");
|
||||
assert_eq!(request.reason, "告警未处理: 血压异常");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_patrol_request_nested_in_data() {
|
||||
let payload = serde_json::json!({
|
||||
"schema_version": "v1",
|
||||
"occurred_at": "2026-05-20T10:00:00Z",
|
||||
"data": {
|
||||
"patient_id": "01234567-89ab-7def-8000-000000000001",
|
||||
"source": "daily_patrol",
|
||||
"reason": "告警未处理: 血糖偏高"
|
||||
}
|
||||
});
|
||||
|
||||
let request = extract_patrol_request(&payload).unwrap();
|
||||
assert_eq!(request.source, "daily_patrol");
|
||||
assert!(request.doctor_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_patrol_request_missing_patient_id() {
|
||||
let payload = serde_json::json!({
|
||||
"source": "daily_patrol",
|
||||
"reason": "告警未处理"
|
||||
});
|
||||
|
||||
assert!(extract_patrol_request(&payload).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_patrol_request_invalid_uuid() {
|
||||
let payload = serde_json::json!({
|
||||
"patient_id": "not-a-uuid",
|
||||
"source": "daily_patrol"
|
||||
});
|
||||
|
||||
assert!(extract_patrol_request(&payload).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_patrol_request_defaults() {
|
||||
let payload = serde_json::json!({
|
||||
"patient_id": "01234567-89ab-7def-8000-000000000001"
|
||||
});
|
||||
|
||||
let request = extract_patrol_request(&payload).unwrap();
|
||||
assert_eq!(request.source, "daily_patrol");
|
||||
assert_eq!(request.reason, "未指定原因");
|
||||
assert!(request.doctor_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_consumer_id_is_stable() {
|
||||
// 确保消费者 ID 不变,否则幂等检查会失效
|
||||
assert_eq!(CONSUMER_ID, "patrol_consumer");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_event_prefix_matches_expected() {
|
||||
assert_eq!(EVENT_PREFIX, "ai.patrol.");
|
||||
}
|
||||
}
|
||||
@@ -363,6 +363,10 @@ impl ErpModule for AiModule {
|
||||
let copilot_handles = crate::event::copilot_consumer::spawn(&ctx.db, &ctx.event_bus);
|
||||
std::mem::forget(copilot_handles);
|
||||
|
||||
// 巡护事件消费者 — 订阅 ai.patrol.requested,为未处理告警患者入队趋势分析
|
||||
let patrol_handle = crate::event::patrol_consumer::spawn(&ctx.db, &ctx.event_bus);
|
||||
std::mem::forget(patrol_handle);
|
||||
|
||||
// 每日凌晨 2:00 批量刷新所有在管患者风险快照
|
||||
let refresh_db = ctx.db.clone();
|
||||
let refresh_event_bus = ctx.event_bus.clone();
|
||||
@@ -406,7 +410,7 @@ impl ErpModule for AiModule {
|
||||
|
||||
tracing::info!(
|
||||
module = "ai",
|
||||
"AI 模块事件处理器已注册(监听 ai.* 事件 + Copilot 事件)"
|
||||
"AI 模块事件处理器已注册(监听 ai.* 事件 + Copilot 事件 + 巡护事件)"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user