fix(ai): 修复自动分析管道 — 补全建议生成 + 事件发布
自动分析批处理(auto_analysis.rs)在完成流式分析后仅保存结果, 缺少三个关键步骤导致关怀引擎无法启动: 1. 不解析双通道输出(StructuredOutput)→ 无结构化建议 2. 不调用 SuggestionService.create_suggestions() → 无建议记录 3. 不发布 ai.analysis.completed 事件 → 下游消费者无感知 修复方案:提取 post_process_analysis() 共享函数,统一处理 解析→创建建议→发布事件的后处理逻辑,SSE handler 和自动分析共用。
This commit is contained in:
@@ -9,7 +9,6 @@ use serde::Deserialize;
|
|||||||
use std::convert::Infallible;
|
use std::convert::Infallible;
|
||||||
|
|
||||||
use crate::dto::{AnalysisSseEvent, AnalysisType};
|
use crate::dto::{AnalysisSseEvent, AnalysisType};
|
||||||
use crate::service::suggestion::SuggestionService;
|
|
||||||
use crate::state::AiState;
|
use crate::state::AiState;
|
||||||
|
|
||||||
pub mod suggestion_handler;
|
pub mod suggestion_handler;
|
||||||
@@ -534,49 +533,19 @@ fn build_sse_stream(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let metadata = serde_json::json!({"analysis_type": analysis_type});
|
let metadata = serde_json::json!({"analysis_type": analysis_type});
|
||||||
let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata).await;
|
let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata.clone()).await;
|
||||||
|
|
||||||
// 解析双通道输出并创建建议记录
|
// 后处理:解析双通道输出、创建建议、发布事件
|
||||||
let parsed = crate::service::output_parser::parse_dual_channel(&full_content).unwrap_or(
|
crate::service::post_process::post_process_analysis(
|
||||||
crate::dto::suggestion::ParsedOutput {
|
&state,
|
||||||
text_content: full_content.clone(),
|
analysis_id,
|
||||||
structured: None,
|
&full_content,
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut event_payload = serde_json::json!({
|
|
||||||
"analysis_id": analysis_id,
|
|
||||||
"analysis_type": analysis_type,
|
|
||||||
"patient_id": patient_id,
|
|
||||||
"doctor_id": doctor_id,
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Some(ref structured) = parsed.structured {
|
|
||||||
event_payload["risk_level"] = serde_json::json!(structured.risk_level.as_str());
|
|
||||||
event_payload["suggestion_count"] = serde_json::json!(structured.suggestions.len());
|
|
||||||
|
|
||||||
if !structured.suggestions.is_empty() {
|
|
||||||
let _ = SuggestionService::create_suggestions(
|
|
||||||
&state.db,
|
|
||||||
tenant_id,
|
|
||||||
analysis_id,
|
|
||||||
&structured.suggestions,
|
|
||||||
structured.risk_level,
|
|
||||||
&structured.baseline_summary,
|
|
||||||
Some(doctor_id),
|
|
||||||
).await;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let _ = SuggestionService::mark_parse_failed(&state.db, analysis_id).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 发布 AI 分析完成事件
|
|
||||||
let event = erp_core::events::DomainEvent::new(
|
|
||||||
"ai.analysis.completed",
|
|
||||||
tenant_id,
|
tenant_id,
|
||||||
erp_core::events::build_event_payload(event_payload),
|
patient_id,
|
||||||
);
|
doctor_id,
|
||||||
state.event_bus.publish(event, &state.db).await;
|
analysis_type,
|
||||||
|
metadata,
|
||||||
|
).await;
|
||||||
|
|
||||||
let done_event = AnalysisSseEvent::Done {
|
let done_event = AnalysisSseEvent::Done {
|
||||||
analysis_id,
|
analysis_id,
|
||||||
|
|||||||
@@ -208,11 +208,33 @@ async fn analyze_tenant_high_risk_patients(
|
|||||||
});
|
});
|
||||||
if let Err(e) = state
|
if let Err(e) = state
|
||||||
.analysis
|
.analysis
|
||||||
.complete_analysis(analysis_id, full_content, metadata)
|
.complete_analysis(analysis_id, full_content.clone(), metadata.clone())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::warn!(error = %e, "保存分析结果失败");
|
tracing::warn!(error = %e, "保存分析结果失败");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 后处理:解析双通道输出、创建建议、发布事件
|
||||||
|
let result = super::post_process::post_process_analysis(
|
||||||
|
state,
|
||||||
|
analysis_id,
|
||||||
|
&full_content,
|
||||||
|
tenant_id,
|
||||||
|
patient_id,
|
||||||
|
system_user_id,
|
||||||
|
"trend",
|
||||||
|
metadata,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
patient_id = %patient_id,
|
||||||
|
analysis_id = %analysis_id,
|
||||||
|
risk_level = ?result.risk_level,
|
||||||
|
suggestion_count = result.suggestion_count,
|
||||||
|
"自动分析后处理完成"
|
||||||
|
);
|
||||||
|
|
||||||
analyzed += 1;
|
analyzed += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ pub mod auto_analysis;
|
|||||||
pub mod comparison;
|
pub mod comparison;
|
||||||
pub mod local_rules;
|
pub mod local_rules;
|
||||||
pub mod output_parser;
|
pub mod output_parser;
|
||||||
|
pub mod post_process;
|
||||||
pub mod prompt;
|
pub mod prompt;
|
||||||
pub mod reanalysis;
|
pub mod reanalysis;
|
||||||
pub mod suggestion;
|
pub mod suggestion;
|
||||||
|
|||||||
92
crates/erp-ai/src/service/post_process.rs
Normal file
92
crates/erp-ai/src/service/post_process.rs
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
//! 分析后处理 — 解析双通道输出、创建建议、发布事件
|
||||||
|
//!
|
||||||
|
//! 被 SSE handler(`build_sse_stream`)和自动分析(`auto_analysis`)共用。
|
||||||
|
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::dto::suggestion::ParsedOutput;
|
||||||
|
use crate::service::output_parser;
|
||||||
|
use crate::service::suggestion::SuggestionService;
|
||||||
|
use crate::state::AiState;
|
||||||
|
|
||||||
|
/// 分析后处理结果
|
||||||
|
pub struct PostProcessResult {
|
||||||
|
pub risk_level: Option<String>,
|
||||||
|
pub suggestion_count: usize,
|
||||||
|
pub suggestion_ids: Vec<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 对完成的分析执行后处理:解析双通道输出、创建建议、发布事件
|
||||||
|
pub async fn post_process_analysis(
|
||||||
|
state: &AiState,
|
||||||
|
analysis_id: Uuid,
|
||||||
|
full_content: &str,
|
||||||
|
tenant_id: Uuid,
|
||||||
|
patient_id: Uuid,
|
||||||
|
user_id: Uuid,
|
||||||
|
analysis_type: &str,
|
||||||
|
_metadata: serde_json::Value,
|
||||||
|
) -> PostProcessResult {
|
||||||
|
// 1. 解析双通道输出
|
||||||
|
let parsed = output_parser::parse_dual_channel(full_content).unwrap_or(ParsedOutput {
|
||||||
|
text_content: full_content.to_string(),
|
||||||
|
structured: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
// 2. 构建事件 payload
|
||||||
|
let mut event_payload = serde_json::json!({
|
||||||
|
"analysis_id": analysis_id,
|
||||||
|
"analysis_type": analysis_type,
|
||||||
|
"patient_id": patient_id,
|
||||||
|
"doctor_id": user_id,
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut risk_level_str: Option<String> = None;
|
||||||
|
let mut suggestion_ids = Vec::new();
|
||||||
|
|
||||||
|
if let Some(ref structured) = parsed.structured {
|
||||||
|
risk_level_str = Some(structured.risk_level.as_str().to_string());
|
||||||
|
event_payload["risk_level"] = serde_json::json!(structured.risk_level.as_str());
|
||||||
|
event_payload["suggestion_count"] = serde_json::json!(structured.suggestions.len());
|
||||||
|
|
||||||
|
// 3. 创建建议记录
|
||||||
|
if !structured.suggestions.is_empty() {
|
||||||
|
match SuggestionService::create_suggestions(
|
||||||
|
&state.db,
|
||||||
|
tenant_id,
|
||||||
|
analysis_id,
|
||||||
|
&structured.suggestions,
|
||||||
|
structured.risk_level,
|
||||||
|
&structured.baseline_summary,
|
||||||
|
Some(user_id),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(ids) => suggestion_ids = ids,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
analysis_id = %analysis_id,
|
||||||
|
error = %e,
|
||||||
|
"自动分析创建建议记录失败"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let _ = SuggestionService::mark_parse_failed(&state.db, analysis_id).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 发布 AI 分析完成事件
|
||||||
|
let event = erp_core::events::DomainEvent::new(
|
||||||
|
"ai.analysis.completed",
|
||||||
|
tenant_id,
|
||||||
|
erp_core::events::build_event_payload(event_payload),
|
||||||
|
);
|
||||||
|
state.event_bus.publish(event, &state.db).await;
|
||||||
|
|
||||||
|
PostProcessResult {
|
||||||
|
risk_level: risk_level_str,
|
||||||
|
suggestion_count: suggestion_ids.len(),
|
||||||
|
suggestion_ids,
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user