feat(health+message): 关怀已送达通知管道 — care.action.performed 事件 + 温暖消息推送
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

- 新增 CARE_ACTION_PERFORMED 事件常量(care.action.performed)
- care_plan_service 在护理项完成、测量数据更新、干预项创建时发布关怀行动事件
- erp-message 新增 care_plan.activated/completed + care.action.performed 消息处理
- 温暖消息文案:护理计划启动/完成通知、关怀已送达、健康数据已更新
- 事件测试覆盖新常量、payload 契约、通知分支逻辑
This commit is contained in:
iven
2026-05-04 18:56:52 +08:00
parent 0a5290aee4
commit 3ff17382ff
3 changed files with 321 additions and 5 deletions

View File

@@ -63,6 +63,9 @@ pub const CARE_PLAN_UPDATED: &str = "care_plan.updated";
pub const CARE_PLAN_ACTIVATED: &str = "care_plan.activated";
pub const CARE_PLAN_COMPLETED: &str = "care_plan.completed";
// 关怀行动
pub const CARE_ACTION_PERFORMED: &str = "care.action.performed";
/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup
pub fn register_handlers(_bus: &EventBus) {
// 事件处理器已迁移到 on_startup → register_handlers_with_state
@@ -995,6 +998,11 @@ mod tests {
POINTS_EXPIRED,
POINTS_EARNED,
POINTS_EXCHANGED,
CARE_PLAN_CREATED,
CARE_PLAN_UPDATED,
CARE_PLAN_ACTIVATED,
CARE_PLAN_COMPLETED,
CARE_ACTION_PERFORMED,
];
for t in &all_types {
assert_valid_event_type(t);
@@ -1030,6 +1038,11 @@ mod tests {
POINTS_EXPIRED,
POINTS_EARNED,
POINTS_EXCHANGED,
CARE_PLAN_CREATED,
CARE_PLAN_UPDATED,
CARE_PLAN_ACTIVATED,
CARE_PLAN_COMPLETED,
CARE_ACTION_PERFORMED,
];
let set: HashSet<&&str> = all_types.iter().collect();
assert_eq!(
@@ -1068,6 +1081,11 @@ mod tests {
assert_eq!(POINTS_EXPIRED, "points.expired");
assert_eq!(POINTS_EARNED, "points.earned");
assert_eq!(POINTS_EXCHANGED, "points.exchanged");
assert_eq!(CARE_PLAN_CREATED, "care_plan.created");
assert_eq!(CARE_PLAN_UPDATED, "care_plan.updated");
assert_eq!(CARE_PLAN_ACTIVATED, "care_plan.activated");
assert_eq!(CARE_PLAN_COMPLETED, "care_plan.completed");
assert_eq!(CARE_ACTION_PERFORMED, "care.action.performed");
}
/// 消费者中硬编码的事件类型(非通过常量引用)也必须可被常量覆盖
@@ -1686,6 +1704,15 @@ mod tests {
assert!(LAB_REPORT_REVIEWED.starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_care_plan_events() {
let prefix = "care_plan.";
assert!(CARE_PLAN_CREATED.starts_with(prefix));
assert!(CARE_PLAN_UPDATED.starts_with(prefix));
assert!(CARE_PLAN_ACTIVATED.starts_with(prefix));
assert!(CARE_PLAN_COMPLETED.starts_with(prefix));
}
// ── device.readings.synced 消费者设备类型列表测试 ──────────────────
#[test]
@@ -2145,4 +2172,117 @@ mod tests {
];
assert_eq!(consumer_ids.len(), 23, "消费者总数应为 23");
}
// ── care.action.performed 事件 payload 契约测试 ──────────────────
/// care.action.performed (item_completed) payload 契约
#[test]
fn care_action_item_completed_payload_contract() {
let plan_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"plan_id": plan_id.to_string(),
"patient_id": patient_id.to_string(),
"action": "item_completed",
"item_title": "血压监测",
"item_type": "monitoring",
"operator_id": Uuid::now_v7().to_string(),
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_some(), "消费者需要 patient_id");
let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
assert_eq!(action, "item_completed");
let item_title = payload.get("item_title").and_then(|v| v.as_str()).unwrap_or("护理项目");
assert_eq!(item_title, "血压监测");
}
/// care.action.performed (outcome_measured) payload 契约
#[test]
fn care_action_outcome_measured_payload_contract() {
let plan_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"plan_id": plan_id.to_string(),
"patient_id": patient_id.to_string(),
"action": "outcome_measured",
"metric": "血压",
"operator_id": Uuid::now_v7().to_string(),
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_some(), "消费者需要 patient_id");
let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
assert_eq!(action, "outcome_measured");
let metric = payload.get("metric").and_then(|v| v.as_str()).unwrap_or("健康指标");
assert_eq!(metric, "血压");
}
/// care_plan.activated / care_plan.completed payload 契约
#[test]
fn care_plan_lifecycle_payload_contract() {
let plan_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
for (event_type, status) in [
(CARE_PLAN_ACTIVATED, "active"),
(CARE_PLAN_COMPLETED, "completed"),
] {
let payload = build_event_payload(json!({
"plan_id": plan_id.to_string(),
"patient_id": patient_id.to_string(),
"status": status,
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_some(), "{} 消费者需要 patient_id", event_type);
let plid = payload.get("plan_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(plid.is_some(), "{} 消费者需要 plan_id", event_type);
}
}
/// 关怀行动通知消息分支逻辑
#[test]
fn care_action_notification_branch_logic() {
// item_completed → 温暖通知
let item_payload = build_event_payload(json!({
"action": "item_completed",
"item_title": "血压监测",
}));
let action = item_payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
let (title, _body) = match action {
"item_completed" => ("关怀已送达".to_string(), "您的护理团队已完成「血压监测」".to_string()),
"outcome_measured" => ("健康数据已更新".to_string(), "数据已记录".to_string()),
_ => ("关怀已送达".to_string(), "正在关注".to_string()),
};
assert_eq!(title, "关怀已送达");
// outcome_measured → 数据更新通知
let outcome_payload = build_event_payload(json!({
"action": "outcome_measured",
"metric": "血压",
}));
let action2 = outcome_payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
let (title2, _body2) = match action2 {
"item_completed" => ("关怀已送达".to_string(), "已完成".to_string()),
"outcome_measured" => ("健康数据已更新".to_string(), "血压数据已记录".to_string()),
_ => ("关怀已送达".to_string(), "正在关注".to_string()),
};
assert_eq!(title2, "健康数据已更新");
// 未知 action → 默认通知
let unknown_payload = build_event_payload(json!({ "action": "unknown" }));
let action3 = unknown_payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
let (title3, _body3) = match action3 {
"item_completed" => ("关怀已送达".to_string(), "已完成".to_string()),
"outcome_measured" => ("健康数据已更新".to_string(), "已记录".to_string()),
_ => ("关怀已送达".to_string(), "正在关注".to_string()),
};
assert_eq!(title3, "关怀已送达");
}
}

View File

@@ -141,7 +141,7 @@ pub async fn update_care_plan(
req: UpdateCarePlanWithVersion,
) -> HealthResult<CarePlanResp> {
let existing = find_plan(state, tenant_id, plan_id).await?;
let _old_status = existing.status.clone(); // 用于后续事件类型判断
let _old_status = existing.status.clone();
let next_ver = check_version(req.version, existing.version)
.map_err(|_| HealthError::VersionMismatch)?;
@@ -285,9 +285,13 @@ pub async fn create_care_plan_item(
operator_id: Option<Uuid>,
req: CreateCarePlanItemReq,
) -> HealthResult<CarePlanItemResp> {
let _plan = find_plan(state, tenant_id, plan_id).await?;
let plan = find_plan(state, tenant_id, plan_id).await?;
validate_item_type(&req.item_type)?;
let is_intervention = req.item_type == "intervention";
let item_title_for_event = req.title.clone();
let item_type_for_event = req.item_type.clone();
let now = Utc::now();
let active = care_plan_item::ActiveModel {
id: Set(Uuid::now_v7()),
@@ -308,6 +312,25 @@ pub async fn create_care_plan_item(
};
let m = active.insert(&state.db).await?;
// 关怀行动事件 — 新增干预项
if is_intervention {
state.event_bus.publish(
DomainEvent::new(
crate::event::CARE_ACTION_PERFORMED,
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"plan_id": plan_id,
"patient_id": plan.patient_id,
"action": "item_created",
"item_title": item_title_for_event,
"item_type": item_type_for_event,
"operator_id": operator_id,
})),
),
&state.db,
).await;
}
Ok(item_to_resp(m))
}
@@ -319,7 +342,7 @@ pub async fn update_care_plan_item(
operator_id: Option<Uuid>,
req: UpdateCarePlanItemWithVersion,
) -> HealthResult<CarePlanItemResp> {
let _plan = find_plan(state, tenant_id, plan_id).await?;
let plan = find_plan(state, tenant_id, plan_id).await?;
let existing = care_plan_item::Entity::find_by_id(item_id)
.one(&state.db)
.await?
@@ -332,6 +355,8 @@ pub async fn update_care_plan_item(
let next_ver = check_version(req.version, existing.version)
.map_err(|_| HealthError::VersionMismatch)?;
let is_completing = req.data.status.as_deref() == Some("completed");
let mut active: care_plan_item::ActiveModel = existing.into();
let now = Utc::now();
@@ -359,6 +384,26 @@ pub async fn update_care_plan_item(
active.version = Set(next_ver);
let m = active.update(&state.db).await?;
// 关怀行动事件 — 项目完成时通知患者
if is_completing {
state.event_bus.publish(
DomainEvent::new(
crate::event::CARE_ACTION_PERFORMED,
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"plan_id": plan_id,
"patient_id": plan.patient_id,
"action": "item_completed",
"item_title": m.title,
"item_type": m.item_type,
"operator_id": operator_id,
})),
),
&state.db,
).await;
}
Ok(item_to_resp(m))
}
@@ -475,7 +520,7 @@ pub async fn update_care_plan_outcome(
operator_id: Option<Uuid>,
req: UpdateCarePlanOutcomeWithVersion,
) -> HealthResult<CarePlanOutcomeResp> {
let _plan = find_plan(state, tenant_id, plan_id).await?;
let plan = find_plan(state, tenant_id, plan_id).await?;
let existing = care_plan_outcome::Entity::find_by_id(outcome_id)
.one(&state.db)
.await?
@@ -488,10 +533,12 @@ pub async fn update_care_plan_outcome(
let next_ver = check_version(req.version, existing.version)
.map_err(|_| HealthError::VersionMismatch)?;
let has_new_measurement = req.data.current_value.is_some();
let mut active: care_plan_outcome::ActiveModel = existing.into();
let now = Utc::now();
if req.data.current_value.is_some() {
if has_new_measurement {
active.current_value = Set(req.data.current_value);
active.measured_at = Set(Some(now));
}
@@ -509,6 +556,25 @@ pub async fn update_care_plan_outcome(
active.version = Set(next_ver);
let m = active.update(&state.db).await?;
// 关怀行动事件 — 测量数据更新时通知患者
if has_new_measurement {
state.event_bus.publish(
DomainEvent::new(
crate::event::CARE_ACTION_PERFORMED,
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"plan_id": plan_id,
"patient_id": plan.patient_id,
"action": "outcome_measured",
"metric": m.metric,
"operator_id": operator_id,
})),
),
&state.db,
).await;
}
Ok(outcome_to_resp(m))
}

View File

@@ -962,6 +962,116 @@ async fn handle_workflow_event(
"医生在线状态变更"
);
}
// 关怀计划激活 — 温暖通知患者
"care_plan.activated" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"护理计划已启动".to_string(),
"您的护理计划已启动,我们将持续关注您的健康状况。如有任何疑问,请随时咨询您的护理团队。".to_string(),
"normal",
Some("care_plan".to_string()),
event.payload.get("plan_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 关怀计划完成 — 温暖通知患者
"care_plan.completed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"护理计划已完成".to_string(),
"您的护理计划已完成,感谢您这段时间的配合!我们将继续关注您的健康。".to_string(),
"normal",
Some("care_plan".to_string()),
event.payload.get("plan_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 关怀行动执行 — 温暖通知患者(护理项完成、测量数据记录等)
"care.action.performed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let action = event
.payload
.get("action")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "low", db).await {
return Ok(());
}
let (title, body) = match action {
"item_completed" => {
let item_title = event.payload.get("item_title").and_then(|v| v.as_str()).unwrap_or("护理项目");
(
"关怀已送达".to_string(),
format!("您的护理团队已完成「{}」,感谢您的配合。", item_title),
)
}
"outcome_measured" => {
let metric = event.payload.get("metric").and_then(|v| v.as_str()).unwrap_or("健康指标");
(
"健康数据已更新".to_string(),
format!("您的{}数据已记录,护理团队正在持续关注。", metric),
)
}
_ => {
(
"关怀已送达".to_string(),
"您的护理团队正在关注您的健康状况。".to_string(),
)
}
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
title,
body,
"low",
Some("care_action".to_string()),
event.payload.get("plan_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
_ => {}
}
Ok(())