From 6e761ae22bace35dee6b9e1927d6742a9e4ed974 Mon Sep 17 00:00:00 2001 From: iven Date: Fri, 1 May 2026 08:11:23 +0800 Subject: [PATCH] =?UTF-8?q?feat(ai):=20=E9=9B=86=E6=88=90=E5=8F=8C?= =?UTF-8?q?=E9=80=9A=E9=81=93=E8=BE=93=E5=87=BA=E8=A7=A3=E6=9E=90=E5=88=B0?= =?UTF-8?q?=20SSE=20handler=20=E2=80=94=20=E8=87=AA=E5=8A=A8=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E5=BB=BA=E8=AE=AE=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 build_sse_stream 完成回调中: - 调用 parse_dual_channel 解析 AI 输出 - 有结构化建议时调用 SuggestionService::create_suggestions 创建记录 - 解析失败时调用 mark_parse_failed 记录日志 - 扩展 ai.analysis.completed 事件 payload 含 risk_level + suggestion_count --- crates/erp-ai/src/handler/mod.rs | 52 +++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/crates/erp-ai/src/handler/mod.rs b/crates/erp-ai/src/handler/mod.rs index 0a9f521..b0ffcc7 100644 --- a/crates/erp-ai/src/handler/mod.rs +++ b/crates/erp-ai/src/handler/mod.rs @@ -9,6 +9,7 @@ use serde::Deserialize; use std::convert::Infallible; use crate::dto::{AnalysisSseEvent, AnalysisType}; +use crate::service::suggestion::SuggestionService; use crate::state::AiState; // === 分析请求 Body === @@ -71,8 +72,10 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); + let patient_id_clone = uuid::Uuid::nil(); // lab report 场景 patient_id 从 report 关联 + let doctor_id_clone = ctx.user_id; - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report", ctx.tenant_id); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report", ctx.tenant_id, patient_id_clone, doctor_id_clone); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -140,7 +143,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend", ctx.tenant_id); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend", ctx.tenant_id, uuid::Uuid::nil(), ctx.user_id); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -197,7 +200,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan", ctx.tenant_id); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan", ctx.tenant_id, uuid::Uuid::nil(), ctx.user_id); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -254,7 +257,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary", ctx.tenant_id); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary", ctx.tenant_id, uuid::Uuid::nil(), ctx.user_id); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -453,6 +456,8 @@ fn build_sse_stream( state: AiState, analysis_type: &'static str, tenant_id: uuid::Uuid, + patient_id: uuid::Uuid, + doctor_id: uuid::Uuid, ) -> impl futures::Stream> { async_stream::stream! { let mut full_content = String::new(); @@ -491,14 +496,45 @@ fn build_sse_stream( let metadata = serde_json::json!({"analysis_type": analysis_type}); let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata).await; + // 解析双通道输出并创建建议记录 + let parsed = crate::service::output_parser::parse_dual_channel(&full_content).unwrap_or( + crate::dto::suggestion::ParsedOutput { + text_content: full_content.clone(), + structured: None, + }, + ); + + let mut event_payload = serde_json::json!({ + "analysis_id": analysis_id, + "analysis_type": analysis_type, + "patient_id": patient_id, + "doctor_id": doctor_id, + }); + + if let Some(ref structured) = parsed.structured { + event_payload["risk_level"] = serde_json::json!(structured.risk_level.as_str()); + event_payload["suggestion_count"] = serde_json::json!(structured.suggestions.len()); + + if !structured.suggestions.is_empty() { + let _ = SuggestionService::create_suggestions( + &state.db, + tenant_id, + analysis_id, + &structured.suggestions, + structured.risk_level, + &structured.baseline_summary, + Some(doctor_id), + ).await; + } + } else { + let _ = SuggestionService::mark_parse_failed(&state.db, analysis_id).await; + } + // 发布 AI 分析完成事件 let event = erp_core::events::DomainEvent::new( "ai.analysis.completed", tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "analysis_id": analysis_id, - "analysis_type": analysis_type, - })), + erp_core::events::build_event_payload(event_payload), ); state.event_bus.publish(event, &state.db).await;