diff --git a/crates/erp-ai/src/service/auto_analysis.rs b/crates/erp-ai/src/service/auto_analysis.rs index 5acb979..c377ffe 100644 --- a/crates/erp-ai/src/service/auto_analysis.rs +++ b/crates/erp-ai/src/service/auto_analysis.rs @@ -1,15 +1,10 @@ -//! 定期自动分析服务 — 对高风险患者执行 AI 趋势分析 -//! -//! 每 24 小时执行一次,扫描所有租户中最近有异常体征记录的患者, -//! 自动触发趋势分析并存储结果。 +//! 定期自动分析服务 — 对高风险患者入队 AI 趋势分析 use std::time::Duration; -use erp_core::health_provider::TimeRange; use sea_orm::{FromQueryResult, Statement}; use uuid::Uuid; -use crate::dto::AnalysisType; use crate::state::AiState; /// 启动自动趋势分析后台任务 @@ -84,14 +79,13 @@ async fn find_active_tenants( Ok(rows.into_iter().map(|r| r.id).collect()) } -/// 查找指定租户中需要关注的高风险患者并执行分析 +/// 查找指定租户中需要关注的高风险患者并入队分析 /// /// 高风险判定:最近 7 天内有 2 次以上异常体征记录的患者 async fn analyze_tenant_high_risk_patients( state: &AiState, tenant_id: Uuid, ) -> Result { - // 查找高风险患者 ID 列表 let patient_ids = find_high_risk_patients(&state.db, tenant_id).await?; if patient_ids.is_empty() { @@ -101,154 +95,31 @@ async fn analyze_tenant_high_risk_patients( tracing::info!( tenant_id = %tenant_id, patient_count = patient_ids.len(), - "开始自动趋势分析" + "高风险患者入队趋势分析" ); - let mut analyzed = 0u32; - let system_user_id = Uuid::nil(); // 系统自动分析使用 nil user_id - - let metrics = vec![ - "systolic_bp_morning".to_string(), - "diastolic_bp_morning".to_string(), - "heart_rate".to_string(), - "weight".to_string(), - "blood_sugar".to_string(), - ]; - - let range = TimeRange { - start: chrono::Utc::now() - chrono::Duration::days(90), - end: chrono::Utc::now(), - }; - - // 获取 prompt 模板 - let prompt = state - .prompt - .get_active_prompt(tenant_id, "health_trend_analysis") - .await - .map_err(|e| format!("获取 prompt 失败: {e}"))?; - - let model_config = &prompt.model_config; - let model = model_config["model"] - .as_str() - .unwrap_or("claude-sonnet-4-6") - .to_string(); - let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32; - let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32; + let queue = crate::service::analysis_queue::AnalysisQueue::new(state.db.clone()); + let mut enqueued = 0u32; for patient_id in patient_ids { - // 获取趋势分析数据 - let trend_data = match state - .health_provider - .get_trend_analysis_data(tenant_id, patient_id, &metrics, &range) - .await - { - Ok(data) => data, - Err(e) => { - tracing::warn!( - patient_id = %patient_id, - error = %e, - "获取趋势数据失败,跳过" - ); - continue; - } + let job = crate::service::analysis_queue::AnalysisJob { + tenant_id, + patient_id, + analysis_type: "trend".into(), + priority: 1, + source_event: Some("auto_analysis.scheduled".into()), + source_ref: patient_id.to_string(), + created_by: None, }; - - // 脱敏 - let sanitized_data = match state.analysis.sanitizer.sanitize_trend_analysis(&trend_data) { - Ok(data) => data, + match queue.enqueue(job).await { + Ok(_) => enqueued += 1, Err(e) => { - tracing::warn!( - patient_id = %patient_id, - error = %e, - "数据脱敏失败,跳过" - ); - continue; - } - }; - - // 执行流式分析(非阻塞收集完整结果) - match state - .analysis - .stream_analyze( - tenant_id, - system_user_id, - patient_id, - AnalysisType::Trends, - patient_id.to_string(), - prompt.system_prompt.clone(), - prompt.user_prompt_template.clone(), - sanitized_data, - model.clone(), - temperature, - max_tokens, - ) - .await - { - Ok((stream, analysis_id, _provider)) => { - // 收集完整流内容 - use futures::StreamExt; - let mut full_content = String::new(); - let mut stream = stream; - - while let Some(chunk) = stream.next().await { - match chunk { - Ok(text) => full_content.push_str(&text), - Err(e) => { - tracing::warn!(error = %e, "流式分析出错"); - break; - } - } - } - - // 标记分析完成 - if !full_content.is_empty() { - let metadata = serde_json::json!({ - "auto_analysis": true, - "trigger": "scheduled", - }); - if let Err(e) = state - .analysis - .complete_analysis(analysis_id, full_content.clone(), metadata.clone()) - .await - { - tracing::warn!(error = %e, "保存分析结果失败"); - } - - // 后处理:解析双通道输出、创建建议、发布事件 - let result = super::post_process::post_process_analysis( - state, - analysis_id, - &full_content, - tenant_id, - patient_id, - system_user_id, - "trend", - metadata, - ) - .await; - - tracing::info!( - patient_id = %patient_id, - analysis_id = %analysis_id, - risk_level = ?result.risk_level, - suggestion_count = result.suggestion_count, - "自动分析后处理完成" - ); - - analyzed += 1; - } - } - Err(e) => { - tracing::warn!( - patient_id = %patient_id, - error = %e, - "发起分析失败" - ); + tracing::warn!(patient_id = %patient_id, error = %e, "入队失败"); } } } - Ok(analyzed) + Ok(enqueued) } /// 查找高风险患者:最近 7 天内有体征记录且存在异常指标的患者