feat(ai): 集成双通道输出解析到 SSE handler — 自动创建建议记录

在 build_sse_stream 完成回调中:
- 调用 parse_dual_channel 解析 AI 输出
- 有结构化建议时调用 SuggestionService::create_suggestions 创建记录
- 解析失败时调用 mark_parse_failed 记录日志
- 扩展 ai.analysis.completed 事件 payload 含 risk_level + suggestion_count
This commit is contained in:
iven
2026-05-01 08:11:23 +08:00
parent b30897119b
commit 6e761ae22b

View File

@@ -9,6 +9,7 @@ use serde::Deserialize;
use std::convert::Infallible;
use crate::dto::{AnalysisSseEvent, AnalysisType};
use crate::service::suggestion::SuggestionService;
use crate::state::AiState;
// === 分析请求 Body ===
@@ -71,8 +72,10 @@ where
let analysis_id_clone = analysis_id;
let state_clone = state.clone();
let patient_id_clone = uuid::Uuid::nil(); // lab report 场景 patient_id 从 report 关联
let doctor_id_clone = ctx.user_id;
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report", ctx.tenant_id);
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report", ctx.tenant_id, patient_id_clone, doctor_id_clone);
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
@@ -140,7 +143,7 @@ where
let analysis_id_clone = analysis_id;
let state_clone = state.clone();
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend", ctx.tenant_id);
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend", ctx.tenant_id, uuid::Uuid::nil(), ctx.user_id);
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
@@ -197,7 +200,7 @@ where
let analysis_id_clone = analysis_id;
let state_clone = state.clone();
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan", ctx.tenant_id);
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan", ctx.tenant_id, uuid::Uuid::nil(), ctx.user_id);
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
@@ -254,7 +257,7 @@ where
let analysis_id_clone = analysis_id;
let state_clone = state.clone();
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary", ctx.tenant_id);
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary", ctx.tenant_id, uuid::Uuid::nil(), ctx.user_id);
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
@@ -453,6 +456,8 @@ fn build_sse_stream(
state: AiState,
analysis_type: &'static str,
tenant_id: uuid::Uuid,
patient_id: uuid::Uuid,
doctor_id: uuid::Uuid,
) -> impl futures::Stream<Item = Result<Event, Infallible>> {
async_stream::stream! {
let mut full_content = String::new();
@@ -491,14 +496,45 @@ fn build_sse_stream(
let metadata = serde_json::json!({"analysis_type": analysis_type});
let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata).await;
// 解析双通道输出并创建建议记录
let parsed = crate::service::output_parser::parse_dual_channel(&full_content).unwrap_or(
crate::dto::suggestion::ParsedOutput {
text_content: full_content.clone(),
structured: None,
},
);
let mut event_payload = serde_json::json!({
"analysis_id": analysis_id,
"analysis_type": analysis_type,
"patient_id": patient_id,
"doctor_id": doctor_id,
});
if let Some(ref structured) = parsed.structured {
event_payload["risk_level"] = serde_json::json!(structured.risk_level.as_str());
event_payload["suggestion_count"] = serde_json::json!(structured.suggestions.len());
if !structured.suggestions.is_empty() {
let _ = SuggestionService::create_suggestions(
&state.db,
tenant_id,
analysis_id,
&structured.suggestions,
structured.risk_level,
&structured.baseline_summary,
Some(doctor_id),
).await;
}
} else {
let _ = SuggestionService::mark_parse_failed(&state.db, analysis_id).await;
}
// 发布 AI 分析完成事件
let event = erp_core::events::DomainEvent::new(
"ai.analysis.completed",
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"analysis_id": analysis_id,
"analysis_type": analysis_type,
})),
erp_core::events::build_event_payload(event_payload),
);
state.event_bus.publish(event, &state.db).await;