diff --git a/crates/erp-ai/src/handler/mod.rs b/crates/erp-ai/src/handler/mod.rs index 87e758f..3721cd2 100644 --- a/crates/erp-ai/src/handler/mod.rs +++ b/crates/erp-ai/src/handler/mod.rs @@ -9,7 +9,6 @@ use serde::Deserialize; use std::convert::Infallible; use crate::dto::{AnalysisSseEvent, AnalysisType}; -use crate::service::suggestion::SuggestionService; use crate::state::AiState; pub mod suggestion_handler; @@ -534,49 +533,19 @@ fn build_sse_stream( } let metadata = serde_json::json!({"analysis_type": analysis_type}); - let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata).await; + let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata.clone()).await; - // 解析双通道输出并创建建议记录 - let parsed = crate::service::output_parser::parse_dual_channel(&full_content).unwrap_or( - crate::dto::suggestion::ParsedOutput { - text_content: full_content.clone(), - structured: None, - }, - ); - - let mut event_payload = serde_json::json!({ - "analysis_id": analysis_id, - "analysis_type": analysis_type, - "patient_id": patient_id, - "doctor_id": doctor_id, - }); - - if let Some(ref structured) = parsed.structured { - event_payload["risk_level"] = serde_json::json!(structured.risk_level.as_str()); - event_payload["suggestion_count"] = serde_json::json!(structured.suggestions.len()); - - if !structured.suggestions.is_empty() { - let _ = SuggestionService::create_suggestions( - &state.db, - tenant_id, - analysis_id, - &structured.suggestions, - structured.risk_level, - &structured.baseline_summary, - Some(doctor_id), - ).await; - } - } else { - let _ = SuggestionService::mark_parse_failed(&state.db, analysis_id).await; - } - - // 发布 AI 分析完成事件 - let event = erp_core::events::DomainEvent::new( - "ai.analysis.completed", + // 后处理:解析双通道输出、创建建议、发布事件 + crate::service::post_process::post_process_analysis( + &state, + analysis_id, + &full_content, tenant_id, - erp_core::events::build_event_payload(event_payload), - ); - state.event_bus.publish(event, &state.db).await; + patient_id, + doctor_id, + analysis_type, + metadata, + ).await; let done_event = AnalysisSseEvent::Done { analysis_id, diff --git a/crates/erp-ai/src/service/auto_analysis.rs b/crates/erp-ai/src/service/auto_analysis.rs index 2b05009..5acb979 100644 --- a/crates/erp-ai/src/service/auto_analysis.rs +++ b/crates/erp-ai/src/service/auto_analysis.rs @@ -208,11 +208,33 @@ async fn analyze_tenant_high_risk_patients( }); if let Err(e) = state .analysis - .complete_analysis(analysis_id, full_content, metadata) + .complete_analysis(analysis_id, full_content.clone(), metadata.clone()) .await { tracing::warn!(error = %e, "保存分析结果失败"); } + + // 后处理:解析双通道输出、创建建议、发布事件 + let result = super::post_process::post_process_analysis( + state, + analysis_id, + &full_content, + tenant_id, + patient_id, + system_user_id, + "trend", + metadata, + ) + .await; + + tracing::info!( + patient_id = %patient_id, + analysis_id = %analysis_id, + risk_level = ?result.risk_level, + suggestion_count = result.suggestion_count, + "自动分析后处理完成" + ); + analyzed += 1; } } diff --git a/crates/erp-ai/src/service/mod.rs b/crates/erp-ai/src/service/mod.rs index 129948a..e36fe35 100644 --- a/crates/erp-ai/src/service/mod.rs +++ b/crates/erp-ai/src/service/mod.rs @@ -3,6 +3,7 @@ pub mod auto_analysis; pub mod comparison; pub mod local_rules; pub mod output_parser; +pub mod post_process; pub mod prompt; pub mod reanalysis; pub mod suggestion; diff --git a/crates/erp-ai/src/service/post_process.rs b/crates/erp-ai/src/service/post_process.rs new file mode 100644 index 0000000..0ca4590 --- /dev/null +++ b/crates/erp-ai/src/service/post_process.rs @@ -0,0 +1,92 @@ +//! 分析后处理 — 解析双通道输出、创建建议、发布事件 +//! +//! 被 SSE handler(`build_sse_stream`)和自动分析(`auto_analysis`)共用。 + +use uuid::Uuid; + +use crate::dto::suggestion::ParsedOutput; +use crate::service::output_parser; +use crate::service::suggestion::SuggestionService; +use crate::state::AiState; + +/// 分析后处理结果 +pub struct PostProcessResult { + pub risk_level: Option, + pub suggestion_count: usize, + pub suggestion_ids: Vec, +} + +/// 对完成的分析执行后处理:解析双通道输出、创建建议、发布事件 +pub async fn post_process_analysis( + state: &AiState, + analysis_id: Uuid, + full_content: &str, + tenant_id: Uuid, + patient_id: Uuid, + user_id: Uuid, + analysis_type: &str, + _metadata: serde_json::Value, +) -> PostProcessResult { + // 1. 解析双通道输出 + let parsed = output_parser::parse_dual_channel(full_content).unwrap_or(ParsedOutput { + text_content: full_content.to_string(), + structured: None, + }); + + // 2. 构建事件 payload + let mut event_payload = serde_json::json!({ + "analysis_id": analysis_id, + "analysis_type": analysis_type, + "patient_id": patient_id, + "doctor_id": user_id, + }); + + let mut risk_level_str: Option = None; + let mut suggestion_ids = Vec::new(); + + if let Some(ref structured) = parsed.structured { + risk_level_str = Some(structured.risk_level.as_str().to_string()); + event_payload["risk_level"] = serde_json::json!(structured.risk_level.as_str()); + event_payload["suggestion_count"] = serde_json::json!(structured.suggestions.len()); + + // 3. 创建建议记录 + if !structured.suggestions.is_empty() { + match SuggestionService::create_suggestions( + &state.db, + tenant_id, + analysis_id, + &structured.suggestions, + structured.risk_level, + &structured.baseline_summary, + Some(user_id), + ) + .await + { + Ok(ids) => suggestion_ids = ids, + Err(e) => { + tracing::warn!( + analysis_id = %analysis_id, + error = %e, + "自动分析创建建议记录失败" + ); + } + } + } + } else { + let _ = SuggestionService::mark_parse_failed(&state.db, analysis_id).await; + } + + // 4. 发布 AI 分析完成事件 + let event = erp_core::events::DomainEvent::new( + "ai.analysis.completed", + tenant_id, + erp_core::events::build_event_payload(event_payload), + ); + state.event_bus.publish(event, &state.db).await; + + PostProcessResult { + risk_level: risk_level_str, + suggestion_count: suggestion_ids.len(), + suggestion_ids, + } +}