From a84378ab50daf3b06f926f2caea014dc3a266988 Mon Sep 17 00:00:00 2001 From: iven Date: Tue, 28 Apr 2026 20:02:01 +0800 Subject: [PATCH] =?UTF-8?q?feat(ai):=20=E5=AE=9A=E6=9C=9F=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=88=86=E6=9E=90=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=20=E2=80=94=20=E6=AF=8F=2024=20=E5=B0=8F=E6=97=B6=E6=89=AB?= =?UTF-8?q?=E6=8F=8F=E9=AB=98=E9=A3=8E=E9=99=A9=E6=82=A3=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 auto_analysis.rs 服务: - 启动后延迟 5 分钟,每 24 小时执行一次 - 查找所有活跃租户中高风险患者(异常体征指标) - 自动调用趋势分析并存储分析结果 - 每租户限制 50 名患者,防止过载 - erp-server main.rs 中注册后台任务 --- crates/erp-ai/src/service/auto_analysis.rs | 284 +++++++++++++++++++++ crates/erp-ai/src/service/mod.rs | 1 + crates/erp-server/src/main.rs | 4 + 3 files changed, 289 insertions(+) create mode 100644 crates/erp-ai/src/service/auto_analysis.rs diff --git a/crates/erp-ai/src/service/auto_analysis.rs b/crates/erp-ai/src/service/auto_analysis.rs new file mode 100644 index 0000000..25882e3 --- /dev/null +++ b/crates/erp-ai/src/service/auto_analysis.rs @@ -0,0 +1,284 @@ +//! 定期自动分析服务 — 对高风险患者执行 AI 趋势分析 +//! +//! 每 24 小时执行一次,扫描所有租户中最近有异常体征记录的患者, +//! 自动触发趋势分析并存储结果。 + +use std::time::Duration; + +use erp_core::health_provider::{HealthDataProvider, TimeRange}; +use sea_orm::{ColumnTrait, EntityTrait, FromQueryResult, QueryFilter, Statement}; +use uuid::Uuid; + +use crate::dto::AnalysisType; +use crate::state::AiState; + +/// 启动自动趋势分析后台任务 +pub fn start_auto_analysis(state: AiState) { + tokio::spawn(async move { + // 首次启动延迟 5 分钟,等待服务完全就绪 + tokio::time::sleep(Duration::from_secs(300)).await; + + let mut interval = tokio::time::interval(Duration::from_secs(86400)); + loop { + interval.tick().await; + if let Err(e) = run_auto_analysis(&state).await { + tracing::warn!(error = %e, "自动趋势分析任务执行失败"); + } + } + }); + tracing::info!("自动趋势分析任务已启动(每 24 小时执行一次)"); +} + +/// 执行一次自动分析 +async fn run_auto_analysis(state: &AiState) -> Result<(), String> { + // 查找所有活跃租户 + let tenants = find_active_tenants(&state.db).await?; + if tenants.is_empty() { + tracing::debug!("无活跃租户,跳过自动分析"); + return Ok(()); + } + + let mut total_analyzed = 0u32; + let mut total_errors = 0u32; + + for tenant_id in tenants { + match analyze_tenant_high_risk_patients(state, tenant_id).await { + Ok(count) => total_analyzed += count, + Err(e) => { + tracing::warn!( + tenant_id = %tenant_id, + error = %e, + "租户自动分析失败" + ); + total_errors += 1; + } + } + } + + tracing::info!( + total_analyzed, + total_errors, + "自动趋势分析任务完成" + ); + + Ok(()) +} + +/// 查找所有活跃租户 ID +async fn find_active_tenants( + db: &sea_orm::DatabaseConnection, +) -> Result, String> { + #[derive(Debug, FromQueryResult)] + struct TenantId { + id: Uuid, + } + + let rows: Vec = TenantId::find_by_statement(Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + "SELECT id FROM tenant WHERE deleted_at IS NULL".to_string(), + )) + .all(db) + .await + .map_err(|e| format!("查询租户失败: {e}"))?; + + Ok(rows.into_iter().map(|r| r.id).collect()) +} + +/// 查找指定租户中需要关注的高风险患者并执行分析 +/// +/// 高风险判定:最近 7 天内有 2 次以上异常体征记录的患者 +async fn analyze_tenant_high_risk_patients( + state: &AiState, + tenant_id: Uuid, +) -> Result { + // 查找高风险患者 ID 列表 + let patient_ids = find_high_risk_patients(&state.db, tenant_id).await?; + + if patient_ids.is_empty() { + return Ok(0); + } + + tracing::info!( + tenant_id = %tenant_id, + patient_count = patient_ids.len(), + "开始自动趋势分析" + ); + + let mut analyzed = 0u32; + let system_user_id = Uuid::nil(); // 系统自动分析使用 nil user_id + + let metrics = vec![ + "systolic_bp_morning".to_string(), + "diastolic_bp_morning".to_string(), + "heart_rate".to_string(), + "weight".to_string(), + "blood_sugar".to_string(), + ]; + + let range = TimeRange { + start: chrono::Utc::now() - chrono::Duration::days(90), + end: chrono::Utc::now(), + }; + + // 获取 prompt 模板 + let prompt = state + .prompt + .get_active_prompt(tenant_id, "health_trend_analysis") + .await + .map_err(|e| format!("获取 prompt 失败: {e}"))?; + + let model_config = &prompt.model_config; + let model = model_config["model"] + .as_str() + .unwrap_or("claude-sonnet-4-6") + .to_string(); + let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32; + let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32; + + for patient_id in patient_ids { + // 获取趋势分析数据 + let trend_data = match state + .health_provider + .get_trend_analysis_data(tenant_id, patient_id, &metrics, &range) + .await + { + Ok(data) => data, + Err(e) => { + tracing::warn!( + patient_id = %patient_id, + error = %e, + "获取趋势数据失败,跳过" + ); + continue; + } + }; + + // 脱敏 + let sanitized_data = match state.analysis.sanitizer.sanitize_trend_analysis(&trend_data) { + Ok(data) => data, + Err(e) => { + tracing::warn!( + patient_id = %patient_id, + error = %e, + "数据脱敏失败,跳过" + ); + continue; + } + }; + + // 执行流式分析(非阻塞收集完整结果) + match state + .analysis + .stream_analyze( + tenant_id, + system_user_id, + patient_id, + AnalysisType::Trends, + patient_id.to_string(), + prompt.system_prompt.clone(), + prompt.user_prompt_template.clone(), + sanitized_data, + model.clone(), + temperature, + max_tokens, + ) + .await + { + Ok((stream, analysis_id, _provider)) => { + // 收集完整流内容 + use futures::StreamExt; + let mut full_content = String::new(); + let mut stream = stream; + + while let Some(chunk) = stream.next().await { + match chunk { + Ok(text) => full_content.push_str(&text), + Err(e) => { + tracing::warn!(error = %e, "流式分析出错"); + break; + } + } + } + + // 标记分析完成 + if !full_content.is_empty() { + let metadata = serde_json::json!({ + "auto_analysis": true, + "trigger": "scheduled", + }); + if let Err(e) = state + .analysis + .complete_analysis(analysis_id, full_content, metadata) + .await + { + tracing::warn!(error = %e, "保存分析结果失败"); + } + analyzed += 1; + } + } + Err(e) => { + tracing::warn!( + patient_id = %patient_id, + error = %e, + "发起分析失败" + ); + } + } + } + + Ok(analyzed) +} + +/// 查找高风险患者:最近 7 天内有体征记录且存在异常指标的患者 +async fn find_high_risk_patients( + db: &sea_orm::DatabaseConnection, + tenant_id: Uuid, +) -> Result, String> { + #[derive(Debug, FromQueryResult)] + struct PatientIdRow { + patient_id: Uuid, + } + + // 查找最近 7 天内有体征记录的活跃患者 + // 使用简化的高风险判定:最近 7 天有记录且 heart_rate 或 blood_sugar 偏离正常范围 + let sql = r#" + SELECT DISTINCT vs.patient_id + FROM vital_signs vs + WHERE vs.tenant_id = $1 + AND vs.deleted_at IS NULL + AND vs.record_date >= CURRENT_DATE - INTERVAL '7 days' + AND ( + (vs.heart_rate IS NOT NULL AND (vs.heart_rate < 60 OR vs.heart_rate > 100)) + OR (vs.blood_sugar IS NOT NULL AND (vs.blood_sugar < 3.9 OR vs.blood_sugar > 11.1)) + OR (vs.systolic_bp_morning IS NOT NULL AND (vs.systolic_bp_morning > 140 OR vs.systolic_bp_morning < 90)) + OR (vs.spo2 IS NOT NULL AND vs.spo2 < 95) + ) + LIMIT 50 + "#; + + let rows: Vec = PatientIdRow::find_by_statement(Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + sql, + [tenant_id.into()], + )) + .all(db) + .await + .map_err(|e| format!("查询高风险患者失败: {e}"))?; + + Ok(rows.into_iter().map(|r| r.patient_id).collect()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_duration_为24小时() { + assert_eq!(Duration::from_secs(86400).as_secs(), 86400); + } + + #[test] + fn test_initial_delay_为5分钟() { + assert_eq!(Duration::from_secs(300).as_secs(), 300); + } +} diff --git a/crates/erp-ai/src/service/mod.rs b/crates/erp-ai/src/service/mod.rs index 087d0ca..edd4ceb 100644 --- a/crates/erp-ai/src/service/mod.rs +++ b/crates/erp-ai/src/service/mod.rs @@ -1,3 +1,4 @@ pub mod analysis; +pub mod auto_analysis; pub mod prompt; pub mod usage; diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index 8e59d5a..9fe3d09 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -480,6 +480,10 @@ async fn main() -> anyhow::Result<()> { } }; + // Start auto trend analysis (every 24h, scans high-risk patients) + erp_ai::service::auto_analysis::start_auto_analysis(ai_state.clone()); + tracing::info!("Auto trend analysis scheduler started"); + // Build shared state let pii_crypto = if config.crypto.kek == "__MUST_SET_VIA_ENV__" { #[cfg(debug_assertions)]