refactor(ai): auto_analysis 改为入队模式
高风险患者扫描结果构造 AnalysisJob 入队而非直接调用 Provider 保留定时扫描逻辑(每 24h),分析执行由队列消费者负责
This commit is contained in:
@@ -1,15 +1,10 @@
|
|||||||
//! 定期自动分析服务 — 对高风险患者执行 AI 趋势分析
|
//! 定期自动分析服务 — 对高风险患者入队 AI 趋势分析
|
||||||
//!
|
|
||||||
//! 每 24 小时执行一次,扫描所有租户中最近有异常体征记录的患者,
|
|
||||||
//! 自动触发趋势分析并存储结果。
|
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use erp_core::health_provider::TimeRange;
|
|
||||||
use sea_orm::{FromQueryResult, Statement};
|
use sea_orm::{FromQueryResult, Statement};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::dto::AnalysisType;
|
|
||||||
use crate::state::AiState;
|
use crate::state::AiState;
|
||||||
|
|
||||||
/// 启动自动趋势分析后台任务
|
/// 启动自动趋势分析后台任务
|
||||||
@@ -84,14 +79,13 @@ async fn find_active_tenants(
|
|||||||
Ok(rows.into_iter().map(|r| r.id).collect())
|
Ok(rows.into_iter().map(|r| r.id).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 查找指定租户中需要关注的高风险患者并执行分析
|
/// 查找指定租户中需要关注的高风险患者并入队分析
|
||||||
///
|
///
|
||||||
/// 高风险判定:最近 7 天内有 2 次以上异常体征记录的患者
|
/// 高风险判定:最近 7 天内有 2 次以上异常体征记录的患者
|
||||||
async fn analyze_tenant_high_risk_patients(
|
async fn analyze_tenant_high_risk_patients(
|
||||||
state: &AiState,
|
state: &AiState,
|
||||||
tenant_id: Uuid,
|
tenant_id: Uuid,
|
||||||
) -> Result<u32, String> {
|
) -> Result<u32, String> {
|
||||||
// 查找高风险患者 ID 列表
|
|
||||||
let patient_ids = find_high_risk_patients(&state.db, tenant_id).await?;
|
let patient_ids = find_high_risk_patients(&state.db, tenant_id).await?;
|
||||||
|
|
||||||
if patient_ids.is_empty() {
|
if patient_ids.is_empty() {
|
||||||
@@ -101,154 +95,31 @@ async fn analyze_tenant_high_risk_patients(
|
|||||||
tracing::info!(
|
tracing::info!(
|
||||||
tenant_id = %tenant_id,
|
tenant_id = %tenant_id,
|
||||||
patient_count = patient_ids.len(),
|
patient_count = patient_ids.len(),
|
||||||
"开始自动趋势分析"
|
"高风险患者入队趋势分析"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut analyzed = 0u32;
|
let queue = crate::service::analysis_queue::AnalysisQueue::new(state.db.clone());
|
||||||
let system_user_id = Uuid::nil(); // 系统自动分析使用 nil user_id
|
let mut enqueued = 0u32;
|
||||||
|
|
||||||
let metrics = vec![
|
|
||||||
"systolic_bp_morning".to_string(),
|
|
||||||
"diastolic_bp_morning".to_string(),
|
|
||||||
"heart_rate".to_string(),
|
|
||||||
"weight".to_string(),
|
|
||||||
"blood_sugar".to_string(),
|
|
||||||
];
|
|
||||||
|
|
||||||
let range = TimeRange {
|
|
||||||
start: chrono::Utc::now() - chrono::Duration::days(90),
|
|
||||||
end: chrono::Utc::now(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// 获取 prompt 模板
|
|
||||||
let prompt = state
|
|
||||||
.prompt
|
|
||||||
.get_active_prompt(tenant_id, "health_trend_analysis")
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("获取 prompt 失败: {e}"))?;
|
|
||||||
|
|
||||||
let model_config = &prompt.model_config;
|
|
||||||
let model = model_config["model"]
|
|
||||||
.as_str()
|
|
||||||
.unwrap_or("claude-sonnet-4-6")
|
|
||||||
.to_string();
|
|
||||||
let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32;
|
|
||||||
let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32;
|
|
||||||
|
|
||||||
for patient_id in patient_ids {
|
for patient_id in patient_ids {
|
||||||
// 获取趋势分析数据
|
let job = crate::service::analysis_queue::AnalysisJob {
|
||||||
let trend_data = match state
|
tenant_id,
|
||||||
.health_provider
|
patient_id,
|
||||||
.get_trend_analysis_data(tenant_id, patient_id, &metrics, &range)
|
analysis_type: "trend".into(),
|
||||||
.await
|
priority: 1,
|
||||||
{
|
source_event: Some("auto_analysis.scheduled".into()),
|
||||||
Ok(data) => data,
|
source_ref: patient_id.to_string(),
|
||||||
Err(e) => {
|
created_by: None,
|
||||||
tracing::warn!(
|
|
||||||
patient_id = %patient_id,
|
|
||||||
error = %e,
|
|
||||||
"获取趋势数据失败,跳过"
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
match queue.enqueue(job).await {
|
||||||
// 脱敏
|
Ok(_) => enqueued += 1,
|
||||||
let sanitized_data = match state.analysis.sanitizer.sanitize_trend_analysis(&trend_data) {
|
|
||||||
Ok(data) => data,
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!(
|
tracing::warn!(patient_id = %patient_id, error = %e, "入队失败");
|
||||||
patient_id = %patient_id,
|
|
||||||
error = %e,
|
|
||||||
"数据脱敏失败,跳过"
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// 执行流式分析(非阻塞收集完整结果)
|
|
||||||
match state
|
|
||||||
.analysis
|
|
||||||
.stream_analyze(
|
|
||||||
tenant_id,
|
|
||||||
system_user_id,
|
|
||||||
patient_id,
|
|
||||||
AnalysisType::Trends,
|
|
||||||
patient_id.to_string(),
|
|
||||||
prompt.system_prompt.clone(),
|
|
||||||
prompt.user_prompt_template.clone(),
|
|
||||||
sanitized_data,
|
|
||||||
model.clone(),
|
|
||||||
temperature,
|
|
||||||
max_tokens,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok((stream, analysis_id, _provider)) => {
|
|
||||||
// 收集完整流内容
|
|
||||||
use futures::StreamExt;
|
|
||||||
let mut full_content = String::new();
|
|
||||||
let mut stream = stream;
|
|
||||||
|
|
||||||
while let Some(chunk) = stream.next().await {
|
|
||||||
match chunk {
|
|
||||||
Ok(text) => full_content.push_str(&text),
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "流式分析出错");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 标记分析完成
|
|
||||||
if !full_content.is_empty() {
|
|
||||||
let metadata = serde_json::json!({
|
|
||||||
"auto_analysis": true,
|
|
||||||
"trigger": "scheduled",
|
|
||||||
});
|
|
||||||
if let Err(e) = state
|
|
||||||
.analysis
|
|
||||||
.complete_analysis(analysis_id, full_content.clone(), metadata.clone())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::warn!(error = %e, "保存分析结果失败");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 后处理:解析双通道输出、创建建议、发布事件
|
|
||||||
let result = super::post_process::post_process_analysis(
|
|
||||||
state,
|
|
||||||
analysis_id,
|
|
||||||
&full_content,
|
|
||||||
tenant_id,
|
|
||||||
patient_id,
|
|
||||||
system_user_id,
|
|
||||||
"trend",
|
|
||||||
metadata,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
tracing::info!(
|
|
||||||
patient_id = %patient_id,
|
|
||||||
analysis_id = %analysis_id,
|
|
||||||
risk_level = ?result.risk_level,
|
|
||||||
suggestion_count = result.suggestion_count,
|
|
||||||
"自动分析后处理完成"
|
|
||||||
);
|
|
||||||
|
|
||||||
analyzed += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(
|
|
||||||
patient_id = %patient_id,
|
|
||||||
error = %e,
|
|
||||||
"发起分析失败"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(analyzed)
|
Ok(enqueued)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 查找高风险患者:最近 7 天内有体征记录且存在异常指标的患者
|
/// 查找高风险患者:最近 7 天内有体征记录且存在异常指标的患者
|
||||||
|
|||||||
Reference in New Issue
Block a user