Files
hms/crates/erp-ai/src/service/risk_service.rs
iven 9576e80175 feat(ai): Phase 2B 洞察→推送→反馈闭环 — 风险评分+通知+建议反馈
- 风险评分引擎 load_patient_data 实装(体征+化验异常)
- refresh_all_patients 高风险自动创建洞察+事件推送
- erp-message 订阅 copilot.insight.created 推送医护通知
- 每日 cron 增加洞察过期清理+建议过期清理
- POST /ai/suggestions/{id}/feedback 建议反馈端点
- SuggestionFeedbackService 反馈服务层
- 小程序健康页建议卡片增加采纳/忽略/咨询医生按钮
2026-05-19 01:19:09 +08:00

374 lines
13 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use crate::copilot::engine::CopilotEngine;
use crate::copilot::rules::RuleData;
use crate::copilot::scoring::RiskScore;
use crate::entity::copilot_risk_snapshots;
use crate::entity::copilot_rules;
use crate::provider::registry::ProviderRegistry;
use erp_core::error::AppResult;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, FromQueryResult, QueryFilter, Set};
use std::sync::Arc;
use uuid::Uuid;
pub struct RiskService;
impl RiskService {
/// 计算患者风险评分并 UPSERT 快照
pub async fn compute_risk(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
) -> AppResult<RiskScore> {
Self::compute_risk_inner(db, tenant_id, patient_id, None).await
}
/// 计算风险评分 + LLM 补充分析并 UPSERT 快照
pub async fn compute_risk_with_llm(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
provider_registry: &Arc<ProviderRegistry>,
preferred_provider: &str,
) -> AppResult<RiskScore> {
Self::compute_risk_inner(
db,
tenant_id,
patient_id,
Some((provider_registry, preferred_provider)),
)
.await
}
async fn compute_risk_inner(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
llm_ctx: Option<(&Arc<ProviderRegistry>, &str)>,
) -> AppResult<RiskScore> {
let rules = Self::load_rules(db, tenant_id).await?;
let patient_data = Self::load_patient_data(db, tenant_id, patient_id).await?;
// 数据为空时返回低风险,不写入快照(避免虚假评分)
if patient_data.as_object().is_none_or(|m| m.is_empty()) {
tracing::debug!(
patient_id = %patient_id,
"患者数据为空,跳过风险评分写入"
);
return Ok(RiskScore {
score: 0,
level: "low".into(),
matched_rules: vec![],
});
}
let risk = CopilotEngine::assess_patient(&rules, &patient_data);
// LLM 补充分析(不阻塞,失败静默降级)
let llm_summary = if let Some((registry, provider)) = llm_ctx {
crate::copilot::scoring::llm_supplement(registry, provider, &risk, &patient_data).await
} else {
None
};
let now = chrono::Utc::now();
let existing = copilot_risk_snapshots::Entity::find()
.filter(copilot_risk_snapshots::Column::TenantId.eq(tenant_id))
.filter(copilot_risk_snapshots::Column::PatientId.eq(patient_id))
.filter(copilot_risk_snapshots::Column::DeletedAt.is_null())
.one(db)
.await?;
let rule_details = serde_json::json!({
"matched_rules": risk.matched_rules,
});
if let Some(model) = existing {
let mut active: copilot_risk_snapshots::ActiveModel = model.into();
active.risk_score = Set(risk.score);
active.risk_level = Set(risk.level.clone());
active.rule_details = Set(rule_details);
active.llm_summary = Set(llm_summary);
active.computed_at = Set(now);
active.updated_at = Set(now);
active.version_lock = Set(active.version_lock.take().unwrap_or(0) + 1);
active.update(db).await?;
} else {
let id = Uuid::now_v7();
let model = copilot_risk_snapshots::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
patient_id: Set(patient_id),
risk_score: Set(risk.score),
risk_level: Set(risk.level.clone()),
rule_details: Set(rule_details),
llm_summary: Set(llm_summary),
computed_at: Set(now),
data_freshness: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(None),
updated_by: Set(None),
deleted_at: Set(None),
version_lock: Set(1),
};
model.insert(db).await?;
}
Ok(risk)
}
/// 查询患者最新风险快照
pub async fn get_latest_risk(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
) -> AppResult<Option<copilot_risk_snapshots::Model>> {
let snapshot = copilot_risk_snapshots::Entity::find()
.filter(copilot_risk_snapshots::Column::TenantId.eq(tenant_id))
.filter(copilot_risk_snapshots::Column::PatientId.eq(patient_id))
.filter(copilot_risk_snapshots::Column::DeletedAt.is_null())
.one(db)
.await?;
Ok(snapshot)
}
/// 加载租户的启用规则(含系统级规则)
async fn load_rules(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
) -> AppResult<Vec<RuleData>> {
let rules = copilot_rules::Entity::find()
.filter(copilot_rules::Column::TenantId.eq(tenant_id))
.filter(copilot_rules::Column::Enabled.eq(true))
.filter(copilot_rules::Column::DeletedAt.is_null())
.all(db)
.await?;
let system_rules = copilot_rules::Entity::find()
.filter(copilot_rules::Column::TenantId.eq(Uuid::nil()))
.filter(copilot_rules::Column::Enabled.eq(true))
.filter(copilot_rules::Column::DeletedAt.is_null())
.all(db)
.await?;
let all_rules: Vec<copilot_rules::Model> = rules.into_iter().chain(system_rules).collect();
Ok(all_rules
.into_iter()
.map(|r| {
(
r.id,
r.name,
r.condition_expr,
r.score,
r.severity,
r.suggestion,
)
})
.collect())
}
/// 组装患者数据用于规则评估
/// 从 vital_signs_daily 和 lab_report 加载最新值
async fn load_patient_data(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
) -> AppResult<serde_json::Value> {
use sea_orm::FromQueryResult;
// 最新一条体征数据(最近 30 天)
#[derive(FromQueryResult)]
struct VitalRow {
systolic_bp_morning: Option<i32>,
diastolic_bp_morning: Option<i32>,
heart_rate: Option<i32>,
blood_sugar: Option<f64>,
weight: Option<f64>,
spo2: Option<i32>,
body_temperature: Option<f64>,
}
let vital: Option<VitalRow> = VitalRow::find_by_statement(
sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"SELECT systolic_bp_morning, diastolic_bp_morning, heart_rate, blood_sugar, weight, spo2, body_temperature FROM vital_signs_daily WHERE tenant_id = $1 AND patient_id = $2 AND deleted_at IS NULL ORDER BY record_date DESC LIMIT 1",
[tenant_id.into(), patient_id.into()],
),
)
.one(db)
.await?;
// 最新化验报告异常计数(最近 90 天)
#[derive(FromQueryResult)]
struct LabAbnormal {
report_type: String,
abnormal_count: i64,
}
let lab_abnormals: Vec<LabAbnormal> = LabAbnormal::find_by_statement(
sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"SELECT report_type, COUNT(*) as abnormal_count FROM lab_reports WHERE tenant_id = $1 AND patient_id = $2 AND deleted_at IS NULL AND is_abnormal = true AND report_date >= NOW() - INTERVAL '90 days' GROUP BY report_type",
[tenant_id.into(), patient_id.into()],
),
)
.all(db)
.await?;
let mut data = serde_json::Map::new();
if let Some(v) = vital {
if let Some(bp_sys) = v.systolic_bp_morning {
data.insert("systolic_bp_morning".into(), serde_json::json!(bp_sys));
}
if let Some(bp_dia) = v.diastolic_bp_morning {
data.insert("diastolic_bp_morning".into(), serde_json::json!(bp_dia));
}
if let Some(hr) = v.heart_rate {
data.insert("heart_rate".into(), serde_json::json!(hr));
}
if let Some(bs) = v.blood_sugar {
data.insert("blood_sugar".into(), serde_json::json!(bs));
}
if let Some(w) = v.weight {
data.insert("weight".into(), serde_json::json!(w));
}
if let Some(spo2) = v.spo2 {
data.insert("spo2".into(), serde_json::json!(spo2));
}
if let Some(temp) = v.body_temperature {
data.insert("body_temperature".into(), serde_json::json!(temp));
}
}
for lab in lab_abnormals {
data.insert(
format!("lab_abnormal_{}", lab.report_type),
serde_json::json!(lab.abnormal_count),
);
}
Ok(serde_json::Value::Object(data))
}
/// 每日批量刷新所有在管患者的风险快照
/// 通过 raw SQL 查询患者列表(因为 erp-ai 不依赖 erp-health entity
pub async fn refresh_all_patients(
db: &sea_orm::DatabaseConnection,
event_bus: Option<&erp_core::events::EventBus>,
) -> AppResult<u64> {
#[derive(sea_orm::FromQueryResult)]
struct PatientRow {
id: Uuid,
tenant_id: Uuid,
}
let patients: Vec<PatientRow> =
PatientRow::find_by_statement(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"SELECT id, tenant_id FROM patients WHERE deleted_at IS NULL",
[],
))
.all(db)
.await?;
let total = patients.len() as u64;
for p in &patients {
match Self::compute_risk(db, p.tenant_id, p.id).await {
Ok(risk) => {
if risk.level == "high" || risk.level == "critical" {
Self::create_risk_insight(db, event_bus, p.tenant_id, p.id, &risk).await;
}
}
Err(e) => {
tracing::warn!(
patient_id = %p.id,
tenant_id = %p.tenant_id,
error = %e,
"风险评分刷新失败"
);
}
}
}
Ok(total)
}
/// 为高风险患者创建风险洞察
async fn create_risk_insight(
db: &sea_orm::DatabaseConnection,
event_bus: Option<&erp_core::events::EventBus>,
tenant_id: Uuid,
patient_id: Uuid,
risk: &RiskScore,
) {
let matched_with_severity: Vec<_> = risk
.matched_rules
.iter()
.map(|r| {
(
r.rule_id,
r.name.clone(),
r.score,
r.severity.clone(),
r.suggestion.clone(),
)
})
.collect();
let insights = crate::copilot::engine::generate_anomaly_insights(
&patient_id.to_string(),
&matched_with_severity,
);
for insight_data in insights {
let severity = insight_data["severity"]
.as_str()
.unwrap_or("warning")
.to_string();
let title = insight_data["title"]
.as_str()
.unwrap_or("风险告警")
.to_string();
let content = insight_data
.get("content")
.cloned()
.unwrap_or(insight_data.clone());
match crate::service::insight_service::InsightService::create_insight(
db,
tenant_id,
patient_id,
"daily_scan".into(),
"risk_refresh".into(),
Some(severity.clone()),
title.clone(),
content,
None,
168,
None,
)
.await
{
Ok(_insight_id) => {
if let Some(bus) = event_bus {
let event = erp_core::events::DomainEvent::new(
"copilot.insight.created",
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": patient_id.to_string(),
"insight_type": "daily_scan",
"severity": severity,
"title": title,
})),
);
bus.publish(event, db).await;
}
}
Err(e) => {
tracing::warn!(
patient_id = %patient_id,
error = %e,
"每日扫描洞察创建失败"
);
}
}
}
}
}