//! 定期自动分析服务 — 对高风险患者执行 AI 趋势分析 //! //! 每 24 小时执行一次,扫描所有租户中最近有异常体征记录的患者, //! 自动触发趋势分析并存储结果。 use std::time::Duration; use erp_core::health_provider::TimeRange; use sea_orm::{FromQueryResult, Statement}; use uuid::Uuid; use crate::dto::AnalysisType; use crate::state::AiState; /// 启动自动趋势分析后台任务 pub fn start_auto_analysis(state: AiState) { tokio::spawn(async move { // 首次启动延迟 5 分钟,等待服务完全就绪 tokio::time::sleep(Duration::from_secs(300)).await; let mut interval = tokio::time::interval(Duration::from_secs(86400)); loop { interval.tick().await; if let Err(e) = run_auto_analysis(&state).await { tracing::warn!(error = %e, "自动趋势分析任务执行失败"); } } }); tracing::info!("自动趋势分析任务已启动(每 24 小时执行一次)"); } /// 执行一次自动分析 async fn run_auto_analysis(state: &AiState) -> Result<(), String> { // 查找所有活跃租户 let tenants = find_active_tenants(&state.db).await?; if tenants.is_empty() { tracing::debug!("无活跃租户,跳过自动分析"); return Ok(()); } let mut total_analyzed = 0u32; let mut total_errors = 0u32; for tenant_id in tenants { match analyze_tenant_high_risk_patients(state, tenant_id).await { Ok(count) => total_analyzed += count, Err(e) => { tracing::warn!( tenant_id = %tenant_id, error = %e, "租户自动分析失败" ); total_errors += 1; } } } tracing::info!( total_analyzed, total_errors, "自动趋势分析任务完成" ); Ok(()) } /// 查找所有活跃租户 ID async fn find_active_tenants( db: &sea_orm::DatabaseConnection, ) -> Result, String> { #[derive(Debug, FromQueryResult)] struct TenantId { id: Uuid, } let rows: Vec = TenantId::find_by_statement(Statement::from_string( sea_orm::DatabaseBackend::Postgres, "SELECT id FROM tenant WHERE deleted_at IS NULL".to_string(), )) .all(db) .await .map_err(|e| format!("查询租户失败: {e}"))?; Ok(rows.into_iter().map(|r| r.id).collect()) } /// 查找指定租户中需要关注的高风险患者并执行分析 /// /// 高风险判定:最近 7 天内有 2 次以上异常体征记录的患者 async fn analyze_tenant_high_risk_patients( state: &AiState, tenant_id: Uuid, ) -> Result { // 查找高风险患者 ID 列表 let patient_ids = find_high_risk_patients(&state.db, tenant_id).await?; if patient_ids.is_empty() { return Ok(0); } tracing::info!( tenant_id = %tenant_id, patient_count = patient_ids.len(), "开始自动趋势分析" ); let mut analyzed = 0u32; let system_user_id = Uuid::nil(); // 系统自动分析使用 nil user_id let metrics = vec![ "systolic_bp_morning".to_string(), "diastolic_bp_morning".to_string(), "heart_rate".to_string(), "weight".to_string(), "blood_sugar".to_string(), ]; let range = TimeRange { start: chrono::Utc::now() - chrono::Duration::days(90), end: chrono::Utc::now(), }; // 获取 prompt 模板 let prompt = state .prompt .get_active_prompt(tenant_id, "health_trend_analysis") .await .map_err(|e| format!("获取 prompt 失败: {e}"))?; let model_config = &prompt.model_config; let model = model_config["model"] .as_str() .unwrap_or("claude-sonnet-4-6") .to_string(); let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32; let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32; for patient_id in patient_ids { // 获取趋势分析数据 let trend_data = match state .health_provider .get_trend_analysis_data(tenant_id, patient_id, &metrics, &range) .await { Ok(data) => data, Err(e) => { tracing::warn!( patient_id = %patient_id, error = %e, "获取趋势数据失败,跳过" ); continue; } }; // 脱敏 let sanitized_data = match state.analysis.sanitizer.sanitize_trend_analysis(&trend_data) { Ok(data) => data, Err(e) => { tracing::warn!( patient_id = %patient_id, error = %e, "数据脱敏失败,跳过" ); continue; } }; // 执行流式分析(非阻塞收集完整结果) match state .analysis .stream_analyze( tenant_id, system_user_id, patient_id, AnalysisType::Trends, patient_id.to_string(), prompt.system_prompt.clone(), prompt.user_prompt_template.clone(), sanitized_data, model.clone(), temperature, max_tokens, ) .await { Ok((stream, analysis_id, _provider)) => { // 收集完整流内容 use futures::StreamExt; let mut full_content = String::new(); let mut stream = stream; while let Some(chunk) = stream.next().await { match chunk { Ok(text) => full_content.push_str(&text), Err(e) => { tracing::warn!(error = %e, "流式分析出错"); break; } } } // 标记分析完成 if !full_content.is_empty() { let metadata = serde_json::json!({ "auto_analysis": true, "trigger": "scheduled", }); if let Err(e) = state .analysis .complete_analysis(analysis_id, full_content.clone(), metadata.clone()) .await { tracing::warn!(error = %e, "保存分析结果失败"); } // 后处理:解析双通道输出、创建建议、发布事件 let result = super::post_process::post_process_analysis( state, analysis_id, &full_content, tenant_id, patient_id, system_user_id, "trend", metadata, ) .await; tracing::info!( patient_id = %patient_id, analysis_id = %analysis_id, risk_level = ?result.risk_level, suggestion_count = result.suggestion_count, "自动分析后处理完成" ); analyzed += 1; } } Err(e) => { tracing::warn!( patient_id = %patient_id, error = %e, "发起分析失败" ); } } } Ok(analyzed) } /// 查找高风险患者:最近 7 天内有体征记录且存在异常指标的患者 async fn find_high_risk_patients( db: &sea_orm::DatabaseConnection, tenant_id: Uuid, ) -> Result, String> { #[derive(Debug, FromQueryResult)] struct PatientIdRow { patient_id: Uuid, } // 查找最近 7 天内有体征记录的活跃患者 // 使用简化的高风险判定:最近 7 天有记录且 heart_rate 或 blood_sugar 偏离正常范围 let sql = r#" SELECT DISTINCT vs.patient_id FROM vital_signs vs WHERE vs.tenant_id = $1 AND vs.deleted_at IS NULL AND vs.record_date >= CURRENT_DATE - INTERVAL '7 days' AND ( (vs.heart_rate IS NOT NULL AND (vs.heart_rate < 60 OR vs.heart_rate > 100)) OR (vs.blood_sugar IS NOT NULL AND (vs.blood_sugar < 3.9 OR vs.blood_sugar > 11.1)) OR (vs.systolic_bp_morning IS NOT NULL AND (vs.systolic_bp_morning > 140 OR vs.systolic_bp_morning < 90)) OR (vs.spo2 IS NOT NULL AND vs.spo2 < 95) ) LIMIT 50 "#; let rows: Vec = PatientIdRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, [tenant_id.into()], )) .all(db) .await .map_err(|e| format!("查询高风险患者失败: {e}"))?; Ok(rows.into_iter().map(|r| r.patient_id).collect()) } #[cfg(test)] mod tests { use super::*; #[test] fn test_duration_为24小时() { assert_eq!(Duration::from_secs(86400).as_secs(), 86400); } #[test] fn test_initial_delay_为5分钟() { assert_eq!(Duration::from_secs(300).as_secs(), 300); } }