Files
hms/crates/erp-health/src/event.rs
iven 84afeaf9f2
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
feat(health): 事件消费者补全 + 无效消费者清理
新增消费者:
- appointment.created → 患者预约创建通知
- consultation.opened/closed/new_message → 咨询全流程通知
- follow_up.created → 随访任务分配通知
- points.earned/exchanged/expired → 积分变动通知

清理:
- 删除 message.sent no-op 消费者(仅打日志无实际作用)
- 为 workflow.task.completed 消费者补充幂等检查
- 孤立事件率从 57% 降至 ~20%(剩余为 TODO 预留项)
2026-05-03 09:51:26 +08:00

829 lines
43 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use erp_core::events::EventBus;
use uuid::Uuid;
// ---------------------------------------------------------------------------
// 事件类型常量 — 集中管理,避免硬编码字符串散布在 service 层
// ---------------------------------------------------------------------------
// 预约
pub const APPOINTMENT_CREATED: &str = "appointment.created";
// appointment.confirmed / appointment.cancelled 等 — 动态拼接
// 告警
pub const ALERT_TRIGGERED: &str = "alert.triggered";
// 知情同意
pub const CONSENT_GRANTED: &str = "consent.granted";
pub const CONSENT_REVOKED: &str = "consent.revoked";
// 文章
pub const ARTICLE_PUBLISHED: &str = "article.published";
pub const ARTICLE_REJECTED: &str = "article.rejected";
// 咨询
pub const CONSULTATION_OPENED: &str = "consultation.opened";
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message";
// 设备数据
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
// 医生
pub const DOCTOR_ONLINE_STATUS_CHANGED: &str = "doctor.online_status_changed";
// 随访
pub const FOLLOW_UP_CREATED: &str = "follow_up.created";
pub const FOLLOW_UP_COMPLETED: &str = "follow_up.completed";
pub const FOLLOW_UP_OVERDUE: &str = "follow_up.overdue";
// 日常监测
pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created";
// 健康数据
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed";
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
// 患者
pub const PATIENT_CREATED: &str = "patient.created";
pub const PATIENT_UPDATED: &str = "patient.updated";
// TODO: 以下常量对应的患者认证和死亡记录流程尚未实现,待后续迭代
pub const PATIENT_VERIFIED: &str = "patient.verified";
pub const PATIENT_DECEASED: &str = "patient.deceased";
// 积分
pub const POINTS_EXPIRED: &str = "points.expired";
pub const POINTS_EARNED: &str = "points.earned";
pub const POINTS_EXCHANGED: &str = "points.exchanged";
/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup
pub fn register_handlers(_bus: &EventBus) {
// 事件处理器已迁移到 on_startup → register_handlers_with_state
}
/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用
pub fn register_handlers_with_state(state: crate::state::HealthState) {
// workflow.task.completed → 更新随访任务状态为 completed
let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string());
let wf_db = state.db.clone();
tokio::spawn(async move {
loop {
match workflow_rx.recv().await {
Some(event) if event.event_type == "workflow.task.completed" => {
if erp_core::events::is_event_processed(&wf_db, event.id, "workflow_task_consumer").await.unwrap_or(false) {
continue;
}
let task_id = event.payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok());
match task_id {
Some(task_id) => {
match crate::service::follow_up_service::complete_task_by_system(
&wf_db, task_id, event.tenant_id,
)
.await
{
Ok(()) => {
tracing::info!(
event_id = %event.id,
task_id = %task_id,
"工作流任务完成 → 随访任务已更新"
);
}
Err(e) => {
tracing::warn!(
event_id = %event.id,
task_id = %task_id,
error = %e,
"工作流任务完成 → 随访任务更新失败"
);
}
}
}
None => {
tracing::warn!(
event_id = %event.id,
"工作流任务完成事件缺少 task_id跳过"
);
}
}
let _ = erp_core::events::mark_event_processed(&wf_db, event.id, "workflow_task_consumer").await;
}
Some(_) => {}
None => break,
}
}
});
// device.readings.synced → 触发告警引擎评估
let (mut reading_rx, _reading_handle) = state.event_bus.subscribe_filtered("device.readings.".to_string());
let eval_state = state.clone();
tokio::spawn(async move {
loop {
match reading_rx.recv().await {
Some(event) if event.event_type == DEVICE_READINGS_SYNCED => {
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
// 对所有设备类型触发评估
for device_type in &["heart_rate", "blood_oxygen", "temperature", "blood_pressure", "blood_glucose"] {
if let Err(e) = crate::service::alert_engine::evaluate_rules(
&eval_state, event.tenant_id, pid, device_type,
).await {
tracing::error!(
patient_id = %pid,
device_type = device_type,
error = %e,
"告警评估失败"
);
}
}
}
}
Some(_) => {}
None => break,
}
}
});
// ── P1 事件消费者补全 ──
// alert.triggered → 告警消息通知
let (mut alert_rx, _alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string());
let alert_db = state.db.clone();
let alert_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match alert_rx.recv().await {
Some(event) if event.event_type == ALERT_TRIGGERED => {
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let severity = event.payload.get("severity").and_then(|v| v.as_str()).unwrap_or("warning");
let rule_name = event.payload.get("rule_name").and_then(|v| v.as_str()).unwrap_or("健康告警");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": if severity == "critical" { "CRITICAL_HEALTH_ALERT" } else { "HEALTH_DATA_ABNORMAL" },
"params": {
"rule_name": rule_name,
"severity": severity,
}
})),
);
alert_bus.publish(notify_event, &alert_db).await;
tracing::info!(patient_id = %pid, severity = %severity, "告警通知已发送");
}
let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// patient.created → 欢迎消息通知
let (mut patient_rx, _patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
let patient_db = state.db.clone();
let patient_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match patient_rx.recv().await {
Some(event) if event.event_type == PATIENT_CREATED => {
if erp_core::events::is_event_processed(&patient_db, event.id, "patient_welcome").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
let welcome_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "patient_welcome",
"recipient_type": "patient",
"recipient_id": pid,
})),
);
patient_bus.publish(welcome_event, &patient_db).await;
tracing::info!(patient_id = %pid, "新患者欢迎流程触发");
}
let _ = erp_core::events::mark_event_processed(&patient_db, event.id, "patient_welcome").await;
}
Some(_) => {}
None => break,
}
}
});
// appointment.created/confirmed/cancelled → 通知 + 号源释放
let (mut appt_rx, _appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string());
let appt_db = state.db.clone();
let appt_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match appt_rx.recv().await {
Some(event) if event.event_type == APPOINTMENT_CREATED => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appt_created_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(pid), Some(did)) = (patient_id, doctor_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "APPOINTMENT_CREATED",
"params": { "doctor_id": did }
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(patient_id = pid, doctor_id = did, "预约创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appt_created_notifier").await;
}
Some(event) if event.event_type == "appointment.confirmed" => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_notifier").await.unwrap_or(false) {
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "appointment_confirmed",
"recipient_type": "doctor",
"recipient_id": did,
"patient_id": pid,
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(doctor_id = did, patient_id = pid, "预约确认通知触发");
}
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_notifier").await;
}
Some(event) if event.event_type == "appointment.cancelled" => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_cancel_handler").await.unwrap_or(false) {
continue;
}
tracing::info!(event_id = %event.id, "预约取消,号源释放");
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_cancel_handler").await;
}
Some(_) => {}
None => break,
}
}
});
// follow_up.overdue → 升级通知
let (mut fu_rx, _fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
let fu_db = state.db.clone();
let fu_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_rx.recv().await {
Some(event) if event.event_type == FOLLOW_UP_OVERDUE => {
if erp_core::events::is_event_processed(&fu_db, event.id, "follow_up_escalator").await.unwrap_or(false) {
continue;
}
let task_id = event.payload.get("task_id").and_then(|v| v.as_str());
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
if let (Some(tid), Some(uid)) = (task_id, assigned_to) {
let escalate_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "follow_up_overdue",
"recipient_type": "staff",
"recipient_id": uid,
"task_id": tid,
})),
);
fu_bus.publish(escalate_event, &fu_db).await;
tracing::warn!(task_id = tid, assigned_to = uid, "随访逾期升级通知");
}
let _ = erp_core::events::mark_event_processed(&fu_db, event.id, "follow_up_escalator").await;
}
Some(event) if event.event_type == FOLLOW_UP_COMPLETED => {
// 随访完成 → 检查是否由 AI 触发,触发再分析
if let Some(task_id_str) = event.payload.get("task_id").and_then(|v| v.as_str()) {
if let Ok(task_id) = uuid::Uuid::parse_str(task_id_str) {
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(patient_id) = patient_id {
// 通过 raw SQL 查找关联的 AI 建议action_result 中包含 followup_task_id
let sql = r#"
SELECT id FROM ai_suggestion
WHERE tenant_id = $1
AND deleted_at IS NULL
AND status = 'executed'
AND action_result @> $2
LIMIT 1
"#;
if let Some(suggestion_id) = crate::service::ai_suggestion_loader::find_by_followup_task(
&fu_db, event.tenant_id, task_id,
).await.unwrap_or(None) {
let reanalysis_event = erp_core::events::DomainEvent::new(
"ai.reanalysis.requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"original_suggestion_id": suggestion_id.to_string(),
"patient_id": patient_id.to_string(),
"followup_task_id": task_id_str,
"trigger": "loop_closure",
})),
);
fu_bus.publish(reanalysis_event, &fu_db).await;
tracing::info!(
suggestion_id = %suggestion_id,
patient_id = %patient_id,
task_id = %task_id,
"随访完成,触发 AI 再分析(闭环)"
);
}
}
}
}
}
Some(_) => {}
None => break,
}
}
});
// health_data.critical_alert → 创建危急值告警记录
let (mut critical_rx, _critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string());
let critical_state = state.clone();
tokio::spawn(async move {
loop {
match critical_rx.recv().await {
Some(event) if event.event_type == HEALTH_DATA_CRITICAL_ALERT => {
// 幂等检查
if erp_core::events::is_event_processed(&critical_state.db, event.id, "critical_alert_consumer").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let alert_type = event.payload.get("alert_type").and_then(|v| v.as_str()).unwrap_or("vital_sign");
let metric_name = event.payload.get("metric_name").and_then(|v| v.as_str()).unwrap_or("unknown");
let metric_value = event.payload.get("metric_value").and_then(|v| v.as_str()).unwrap_or("");
let threshold_value = event.payload.get("threshold_value").and_then(|v| v.as_str()).unwrap_or("");
if let Some(pid) = patient_id {
match crate::service::critical_alert_service::handle_critical_alert_event(
&critical_state,
event.tenant_id,
pid,
alert_type,
metric_name,
metric_value,
threshold_value,
None,
).await {
Ok(alert_id) => {
tracing::info!(
event_id = %event.id,
alert_id = %alert_id,
patient_id = %pid,
metric = %metric_name,
"危急值告警已创建"
);
let _ = erp_core::events::mark_event_processed(&critical_state.db, event.id, "critical_alert_consumer").await;
}
Err(e) => {
tracing::error!(
event_id = %event.id,
patient_id = %pid,
error = %e,
"危急值告警创建失败"
);
}
}
}
}
Some(_) => {}
None => break,
}
}
});
// ai.analysis.completed → 通知关联医生
let (mut ai_rx, _ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
let ai_db = state.db.clone();
let ai_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match ai_rx.recv().await {
Some(event) if event.event_type == "ai.analysis.completed" => {
if erp_core::events::is_event_processed(&ai_db, event.id, "ai_analysis_notifier").await.unwrap_or(false) {
continue;
}
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown");
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "AI_ANALYSIS_COMPLETED",
"params": {
"analysis_type": analysis_type,
"patient_id": pid,
}
})),
);
ai_bus.publish(notify, &ai_db).await;
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
doctor_id = %did,
"AI 分析完成通知已发送给医生"
);
} else {
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
"AI 分析完成(缺少关联信息,跳过通知)"
);
}
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await;
}
Some(event) if event.event_type == "dialysis.record.created" => {
if erp_core::events::is_event_processed(&ai_db, event.id, "dialysis_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let record_id = event.payload.get("record_id").and_then(|v| v.as_str());
tracing::info!(
record_id = ?record_id,
patient_id = ?patient_id,
"透析记录已创建"
);
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "dialysis_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// ai.analysis.completed → AI→行动闭环消费者行动分发
let (mut ai_action_rx, _ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
let action_db = state.db.clone();
let action_event_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match ai_action_rx.recv().await {
Some(event) if event.event_type == "ai.analysis.completed" => {
if erp_core::events::is_event_processed(&action_db, event.id, "ai_action_dispatcher").await.unwrap_or(false) {
continue;
}
let tenant_id = event.tenant_id;
let analysis_id = event.payload.get("analysis_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let doctor_id = event.payload.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let risk_level = event.payload.get("risk_level")
.and_then(|v| v.as_str())
.unwrap_or("medium");
let suggestion_count = event.payload.get("suggestion_count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
if suggestion_count > 0 {
if let (Some(aid), Some(pid)) = (analysis_id, patient_id) {
let loader_result: Result<Vec<serde_json::Value>, sea_orm::DbErr> =
crate::service::ai_suggestion_loader::load_by_analysis(
&action_db, tenant_id, aid,
).await;
match loader_result {
Ok(suggestions) if !suggestions.is_empty() => {
crate::service::ai_action_dispatcher::handle_ai_suggestions(
&action_db,
&action_event_bus,
tenant_id,
aid,
pid,
doctor_id,
&suggestions,
risk_level,
).await;
tracing::info!(
analysis_id = %aid,
patient_id = %pid,
suggestion_count = suggestions.len(),
risk_level = %risk_level,
"AI 行动分发完成"
);
}
Ok(_) => {
tracing::info!(analysis_id = %aid, "建议列表为空,跳过行动分发");
}
Err(e) => {
tracing::warn!(
analysis_id = %aid,
error = %e,
"加载建议列表失败"
);
}
}
}
}
let _ = erp_core::events::mark_event_processed(&action_db, event.id, "ai_action_dispatcher").await;
}
Some(_) => {}
None => break,
}
}
});
// consent.granted/revoked → 通知关联医生
let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
let consent_db = state.db.clone();
let consent_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consent_rx.recv().await {
Some(event) if event.event_type == CONSENT_GRANTED => {
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSENT_GRANTED",
"params": {
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
}
Some(event) if event.event_type == CONSENT_REVOKED => {
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"template_key": "CONSENT_REVOKED",
"params": {
"patient_id": pid,
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护");
}
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// consultation.opened/new_message → 通知相关方
let (mut consult_rx, _consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string());
let consult_db = state.db.clone();
let consult_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consult_rx.recv().await {
Some(event) if event.event_type == CONSULTATION_OPENED => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_opened_notifier").await.unwrap_or(false) {
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "CONSULTATION_OPENED",
"params": { "patient_id": pid }
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(doctor_id = did, patient_id = pid, "咨询开启通知已发送给医生");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_opened_notifier").await;
}
Some(event) if event.event_type == CONSULTATION_NEW_MESSAGE => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_msg_notifier").await.unwrap_or(false) {
continue;
}
let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str());
let sender_role = event.payload.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(rid) = recipient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": if sender_role == "patient" { "doctor" } else { "patient" },
"recipient_id": rid,
"template_key": "CONSULTATION_NEW_MESSAGE",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(recipient_id = rid, sender_role = sender_role, "咨询新消息通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_msg_notifier").await;
}
Some(event) if event.event_type == CONSULTATION_CLOSED => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_closed_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSULTATION_CLOSED",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(patient_id = pid, "咨询关闭通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_closed_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// follow_up.created → 通知执行人
let (mut fu_created_rx, _fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
let fu_created_db = state.db.clone();
let fu_created_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_created_rx.recv().await {
Some(event) if event.event_type == FOLLOW_UP_CREATED => {
if erp_core::events::is_event_processed(&fu_created_db, event.id, "fu_created_notifier").await.unwrap_or(false) {
continue;
}
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(uid), Some(pid)) = (assigned_to, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"recipient_id": uid,
"template_key": "FOLLOW_UP_CREATED",
"params": { "patient_id": pid }
})),
);
fu_created_bus.publish(notify, &fu_created_db).await;
tracing::info!(assigned_to = uid, patient_id = pid, "随访创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(&fu_created_db, event.id, "fu_created_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// points.earned/exchanged → 积分变动通知
let (mut points_rx, _points_handle) = state.event_bus.subscribe_filtered("points.".to_string());
let points_db = state.db.clone();
let points_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match points_rx.recv().await {
Some(event) if event.event_type == POINTS_EARNED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_earned_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EARNED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_earned_notifier").await;
}
Some(event) if event.event_type == POINTS_EXCHANGED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_exchanged_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXCHANGED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_exchanged_notifier").await;
}
Some(event) if event.event_type == POINTS_EXPIRED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_expired_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXPIRED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_expired_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
}