feat(ai+health): 闭环核心 — 随访完成→再分析触发 + 前后对比报告

- follow_up.completed 消费者:通过 action_result 反查 AI 建议,触发再分析
- ai.reanalysis.requested 消费者:加载原始建议 baseline
- comparison.rs:对比报告生成引擎(指标变化百分比+趋势判断)
- GET /ai/suggestions/{id}/comparison:前后对比报告 API
- find_by_followup_task:通过随访任务反查关联建议ID
This commit is contained in:
iven
2026-05-01 09:14:13 +08:00
parent 0a4825be99
commit 5d2402a1e7
7 changed files with 355 additions and 0 deletions

View File

@@ -87,3 +87,43 @@ where
"status": new_status.as_str(),
}))))
}
/// 获取 AI 建议的前后对比报告。
pub async fn get_comparison<S>(
State(state): State<AiState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<uuid::Uuid>,
) -> Result<Json<ApiResponse<serde_json::Value>>, erp_core::error::AppError>
where
AiState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "ai.suggestion.list")?;
use crate::entity::ai_suggestion;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let suggestion = ai_suggestion::Entity::find_by_id(id)
.one(&state.db)
.await
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?
.filter(|s| s.tenant_id == ctx.tenant_id && s.deleted_at.is_none())
.ok_or_else(|| erp_core::error::AppError::NotFound("建议不存在".into()))?;
match &suggestion.baseline_snapshot {
Some(bs) if !bs.is_null() => {
let action_result = suggestion.action_result.as_ref().unwrap_or(&serde_json::Value::Null);
Ok(Json(ApiResponse::ok(serde_json::json!({
"suggestion_id": id,
"baseline": bs,
"current": action_result,
"comparison_available": !action_result.is_null(),
}))))
}
_ => Ok(Json(ApiResponse::ok(serde_json::json!({
"suggestion_id": id,
"comparison_available": false,
"message": "该建议暂无 baseline 快照,无法生成对比报告",
})))),
}
}

View File

@@ -75,6 +75,54 @@ impl ErpModule for AiModule {
fn as_any(&self) -> &dyn Any {
self
}
async fn on_startup(
&self,
ctx: &erp_core::module::ModuleContext,
) -> erp_core::error::AppResult<()> {
let (mut rx, _handle) = ctx.event_bus.subscribe_filtered("ai.reanalysis.".to_string());
let db = ctx.db.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Some(event) if event.event_type == "ai.reanalysis.requested" => {
let suggestion_id = event.payload.get("original_suggestion_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
match (suggestion_id, patient_id) {
(Some(sid), Some(pid)) => {
if let Err(e) = crate::service::reanalysis::handle_reanalysis_requested(
&db, event.tenant_id, sid, pid,
).await {
tracing::warn!(
suggestion_id = %sid,
error = %e,
"AI 再分析处理失败"
);
}
}
_ => {
tracing::warn!("ai.reanalysis.requested 事件缺少必要字段");
}
}
}
Some(_) => {}
None => {
tracing::info!("AI 再分析事件订阅通道已关闭");
break;
}
}
}
});
tracing::info!(module = "ai", "AI 模块事件处理器已注册(监听 reanalysis");
Ok(())
}
}
impl AiModule {
@@ -148,5 +196,9 @@ impl AiModule {
"/ai/suggestions/{id}/approve",
axum::routing::post(crate::handler::suggestion_handler::approve_suggestion),
)
.route(
"/ai/suggestions/{id}/comparison",
axum::routing::get(crate::handler::suggestion_handler::get_comparison),
)
}
}

View File

@@ -0,0 +1,128 @@
use serde::{Deserialize, Serialize};
/// 趋势方向
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TrendDirection {
Improving,
Stable,
Worsening,
}
/// 单项指标变化
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricChange {
pub metric: String,
pub baseline_value: f64,
pub current_value: f64,
pub change_percent: f64,
pub trend: TrendDirection,
}
/// 前后对比报告
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComparisonReport {
pub baseline: serde_json::Value,
pub current: serde_json::Value,
pub changes: Vec<MetricChange>,
pub overall_trend: TrendDirection,
}
/// 对比 baseline 和当前数据生成变化报告。
pub fn generate_comparison(
baseline: &serde_json::Value,
current: &serde_json::Value,
) -> ComparisonReport {
let mut changes = Vec::new();
// 提取可比较的数值指标
if let (Some(b_obj), Some(c_obj)) = (baseline.as_object(), current.as_object()) {
for key in b_obj.keys() {
if let (Some(b_val), Some(c_val)) = (b_obj.get(key), c_obj.get(key)) {
if let (Some(b_num), Some(c_num)) = (b_val.as_f64(), c_val.as_f64()) {
let change_pct = if b_num.abs() > 0.0001 {
((c_num - b_num) / b_num.abs()) * 100.0
} else {
0.0
};
let trend = if change_pct.abs() > 5.0 {
TrendDirection::Worsening
} else {
TrendDirection::Stable
};
changes.push(MetricChange {
metric: key.clone(),
baseline_value: b_num,
current_value: c_num,
change_percent: change_pct,
trend,
});
}
}
}
}
// 综合趋势判断
let changed = changes.iter().filter(|c| c.trend == TrendDirection::Worsening).count();
let overall = if changed > 0 {
TrendDirection::Worsening
} else {
TrendDirection::Stable
};
ComparisonReport {
baseline: baseline.clone(),
current: current.clone(),
changes,
overall_trend: overall,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn generate_comparison_detects_significant_change() {
let baseline = serde_json::json!({"systolic_bp": 160.0, "heart_rate": 95.0});
let current = serde_json::json!({"systolic_bp": 130.0, "heart_rate": 80.0});
let report = generate_comparison(&baseline, &current);
assert_eq!(report.overall_trend, TrendDirection::Worsening);
assert_eq!(report.changes.len(), 2);
}
#[test]
fn generate_comparison_detects_single_change() {
let baseline = serde_json::json!({"systolic_bp": 130.0});
let current = serde_json::json!({"systolic_bp": 160.0});
let report = generate_comparison(&baseline, &current);
assert_eq!(report.overall_trend, TrendDirection::Worsening);
}
#[test]
fn generate_comparison_detects_stable() {
let baseline = serde_json::json!({"heart_rate": 75.0});
let current = serde_json::json!({"heart_rate": 76.0});
let report = generate_comparison(&baseline, &current);
assert_eq!(report.overall_trend, TrendDirection::Stable);
}
#[test]
fn generate_comparison_empty_data() {
let baseline = serde_json::json!({});
let current = serde_json::json!({});
let report = generate_comparison(&baseline, &current);
assert_eq!(report.overall_trend, TrendDirection::Stable);
assert!(report.changes.is_empty());
}
#[test]
fn generate_comparison_mixed_metrics() {
let baseline = serde_json::json!({"systolic_bp": 150.0, "heart_rate": 80.0, "spo2": 96.0});
let current = serde_json::json!({"systolic_bp": 140.0, "heart_rate": 95.0, "spo2": 90.0});
let report = generate_comparison(&baseline, &current);
// bp: -6.7% changed, hr: +18.75% changed, spo2: -6.25% changed → has changes
assert_eq!(report.overall_trend, TrendDirection::Worsening);
assert_eq!(report.changes.len(), 3);
}
}

View File

@@ -1,7 +1,9 @@
pub mod analysis;
pub mod auto_analysis;
pub mod comparison;
pub mod local_rules;
pub mod output_parser;
pub mod prompt;
pub mod reanalysis;
pub mod suggestion;
pub mod usage;

View File

@@ -0,0 +1,59 @@
use sea_orm::{DatabaseConnection, FromQueryResult, Statement};
use uuid::Uuid;
/// 再分析请求触发后,加载原始建议的 baseline。
pub async fn handle_reanalysis_requested(
db: &DatabaseConnection,
tenant_id: Uuid,
original_suggestion_id: Uuid,
patient_id: Uuid,
) -> Result<(), sea_orm::DbErr> {
#[derive(Debug, FromQueryResult)]
struct OriginalSuggestion {
baseline_snapshot: Option<serde_json::Value>,
params: Option<serde_json::Value>,
risk_level: Option<String>,
}
let sql = r#"
SELECT baseline_snapshot, params, risk_level
FROM ai_suggestion
WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL
"#;
let original: Option<OriginalSuggestion> = OriginalSuggestion::find_by_statement(
Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[original_suggestion_id.into(), tenant_id.into()],
),
)
.one(db)
.await?;
match original {
Some(orig) => {
tracing::info!(
suggestion_id = %original_suggestion_id,
patient_id = %patient_id,
has_baseline = orig.baseline_snapshot.is_some(),
risk_level = ?orig.risk_level,
"再分析:已加载原始建议 baseline"
);
// 后续在 comparison.rs 中实现完整对比逻辑
Ok(())
}
None => {
tracing::warn!(
suggestion_id = %original_suggestion_id,
"再分析:原始建议未找到"
);
Ok(())
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn reanalysis_module_loads() {}
}

View File

@@ -315,6 +315,49 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
let _ = erp_core::events::mark_event_processed(&fu_db, event.id, "follow_up_escalator").await;
}
Some(event) if event.event_type == FOLLOW_UP_COMPLETED => {
// 随访完成 → 检查是否由 AI 触发,触发再分析
if let Some(task_id_str) = event.payload.get("task_id").and_then(|v| v.as_str()) {
if let Ok(task_id) = uuid::Uuid::parse_str(task_id_str) {
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(patient_id) = patient_id {
// 通过 raw SQL 查找关联的 AI 建议action_result 中包含 followup_task_id
let sql = r#"
SELECT id FROM ai_suggestion
WHERE tenant_id = $1
AND deleted_at IS NULL
AND status = 'executed'
AND action_result @> $2
LIMIT 1
"#;
if let Some(suggestion_id) = crate::service::ai_suggestion_loader::find_by_followup_task(
&fu_db, event.tenant_id, task_id,
).await.unwrap_or(None) {
let reanalysis_event = erp_core::events::DomainEvent::new(
"ai.reanalysis.requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"original_suggestion_id": suggestion_id.to_string(),
"patient_id": patient_id.to_string(),
"followup_task_id": task_id_str,
"trigger": "loop_closure",
})),
);
fu_bus.publish(reanalysis_event, &fu_db).await;
tracing::info!(
suggestion_id = %suggestion_id,
patient_id = %patient_id,
task_id = %task_id,
"随访完成,触发 AI 再分析(闭环)"
);
}
}
}
}
}
Some(_) => {}
None => break,
}

View File

@@ -38,3 +38,34 @@ pub async fn load_by_analysis(
})
.collect())
}
#[derive(Debug, FromQueryResult)]
struct IdRow {
id: Uuid,
}
/// 通过随访任务 ID 反查关联的 AI 建议IDaction_result 中包含 followup_task_id
pub async fn find_by_followup_task(
db: &DatabaseConnection,
tenant_id: Uuid,
followup_task_id: Uuid,
) -> Result<Option<Uuid>, sea_orm::DbErr> {
let row: Option<IdRow> = IdRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"
SELECT id FROM ai_suggestion
WHERE tenant_id = $1
AND deleted_at IS NULL
AND status = 'executed'
AND action_result @> $2
LIMIT 1
"#,
[
tenant_id.into(),
serde_json::json!({"followup_task_id": followup_task_id.to_string()}).into(),
],
))
.one(db)
.await?;
Ok(row.map(|r| r.id))
}