feat(ai): 定期自动分析定时任务 — 每 24 小时扫描高风险患者
新增 auto_analysis.rs 服务: - 启动后延迟 5 分钟,每 24 小时执行一次 - 查找所有活跃租户中高风险患者(异常体征指标) - 自动调用趋势分析并存储分析结果 - 每租户限制 50 名患者,防止过载 - erp-server main.rs 中注册后台任务
This commit is contained in:
284
crates/erp-ai/src/service/auto_analysis.rs
Normal file
284
crates/erp-ai/src/service/auto_analysis.rs
Normal file
@@ -0,0 +1,284 @@
|
||||
//! 定期自动分析服务 — 对高风险患者执行 AI 趋势分析
|
||||
//!
|
||||
//! 每 24 小时执行一次,扫描所有租户中最近有异常体征记录的患者,
|
||||
//! 自动触发趋势分析并存储结果。
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use erp_core::health_provider::{HealthDataProvider, TimeRange};
|
||||
use sea_orm::{ColumnTrait, EntityTrait, FromQueryResult, QueryFilter, Statement};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::dto::AnalysisType;
|
||||
use crate::state::AiState;
|
||||
|
||||
/// 启动自动趋势分析后台任务
|
||||
pub fn start_auto_analysis(state: AiState) {
|
||||
tokio::spawn(async move {
|
||||
// 首次启动延迟 5 分钟,等待服务完全就绪
|
||||
tokio::time::sleep(Duration::from_secs(300)).await;
|
||||
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(86400));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = run_auto_analysis(&state).await {
|
||||
tracing::warn!(error = %e, "自动趋势分析任务执行失败");
|
||||
}
|
||||
}
|
||||
});
|
||||
tracing::info!("自动趋势分析任务已启动(每 24 小时执行一次)");
|
||||
}
|
||||
|
||||
/// 执行一次自动分析
|
||||
async fn run_auto_analysis(state: &AiState) -> Result<(), String> {
|
||||
// 查找所有活跃租户
|
||||
let tenants = find_active_tenants(&state.db).await?;
|
||||
if tenants.is_empty() {
|
||||
tracing::debug!("无活跃租户,跳过自动分析");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut total_analyzed = 0u32;
|
||||
let mut total_errors = 0u32;
|
||||
|
||||
for tenant_id in tenants {
|
||||
match analyze_tenant_high_risk_patients(state, tenant_id).await {
|
||||
Ok(count) => total_analyzed += count,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id = %tenant_id,
|
||||
error = %e,
|
||||
"租户自动分析失败"
|
||||
);
|
||||
total_errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
total_analyzed,
|
||||
total_errors,
|
||||
"自动趋势分析任务完成"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 查找所有活跃租户 ID
|
||||
async fn find_active_tenants(
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
) -> Result<Vec<Uuid>, String> {
|
||||
#[derive(Debug, FromQueryResult)]
|
||||
struct TenantId {
|
||||
id: Uuid,
|
||||
}
|
||||
|
||||
let rows: Vec<TenantId> = TenantId::find_by_statement(Statement::from_string(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
"SELECT id FROM tenant WHERE deleted_at IS NULL".to_string(),
|
||||
))
|
||||
.all(db)
|
||||
.await
|
||||
.map_err(|e| format!("查询租户失败: {e}"))?;
|
||||
|
||||
Ok(rows.into_iter().map(|r| r.id).collect())
|
||||
}
|
||||
|
||||
/// 查找指定租户中需要关注的高风险患者并执行分析
|
||||
///
|
||||
/// 高风险判定:最近 7 天内有 2 次以上异常体征记录的患者
|
||||
async fn analyze_tenant_high_risk_patients(
|
||||
state: &AiState,
|
||||
tenant_id: Uuid,
|
||||
) -> Result<u32, String> {
|
||||
// 查找高风险患者 ID 列表
|
||||
let patient_ids = find_high_risk_patients(&state.db, tenant_id).await?;
|
||||
|
||||
if patient_ids.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_id,
|
||||
patient_count = patient_ids.len(),
|
||||
"开始自动趋势分析"
|
||||
);
|
||||
|
||||
let mut analyzed = 0u32;
|
||||
let system_user_id = Uuid::nil(); // 系统自动分析使用 nil user_id
|
||||
|
||||
let metrics = vec![
|
||||
"systolic_bp_morning".to_string(),
|
||||
"diastolic_bp_morning".to_string(),
|
||||
"heart_rate".to_string(),
|
||||
"weight".to_string(),
|
||||
"blood_sugar".to_string(),
|
||||
];
|
||||
|
||||
let range = TimeRange {
|
||||
start: chrono::Utc::now() - chrono::Duration::days(90),
|
||||
end: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
// 获取 prompt 模板
|
||||
let prompt = state
|
||||
.prompt
|
||||
.get_active_prompt(tenant_id, "health_trend_analysis")
|
||||
.await
|
||||
.map_err(|e| format!("获取 prompt 失败: {e}"))?;
|
||||
|
||||
let model_config = &prompt.model_config;
|
||||
let model = model_config["model"]
|
||||
.as_str()
|
||||
.unwrap_or("claude-sonnet-4-6")
|
||||
.to_string();
|
||||
let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32;
|
||||
let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32;
|
||||
|
||||
for patient_id in patient_ids {
|
||||
// 获取趋势分析数据
|
||||
let trend_data = match state
|
||||
.health_provider
|
||||
.get_trend_analysis_data(tenant_id, patient_id, &metrics, &range)
|
||||
.await
|
||||
{
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
patient_id = %patient_id,
|
||||
error = %e,
|
||||
"获取趋势数据失败,跳过"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// 脱敏
|
||||
let sanitized_data = match state.analysis.sanitizer.sanitize_trend_analysis(&trend_data) {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
patient_id = %patient_id,
|
||||
error = %e,
|
||||
"数据脱敏失败,跳过"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// 执行流式分析(非阻塞收集完整结果)
|
||||
match state
|
||||
.analysis
|
||||
.stream_analyze(
|
||||
tenant_id,
|
||||
system_user_id,
|
||||
patient_id,
|
||||
AnalysisType::Trends,
|
||||
patient_id.to_string(),
|
||||
prompt.system_prompt.clone(),
|
||||
prompt.user_prompt_template.clone(),
|
||||
sanitized_data,
|
||||
model.clone(),
|
||||
temperature,
|
||||
max_tokens,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((stream, analysis_id, _provider)) => {
|
||||
// 收集完整流内容
|
||||
use futures::StreamExt;
|
||||
let mut full_content = String::new();
|
||||
let mut stream = stream;
|
||||
|
||||
while let Some(chunk) = stream.next().await {
|
||||
match chunk {
|
||||
Ok(text) => full_content.push_str(&text),
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "流式分析出错");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 标记分析完成
|
||||
if !full_content.is_empty() {
|
||||
let metadata = serde_json::json!({
|
||||
"auto_analysis": true,
|
||||
"trigger": "scheduled",
|
||||
});
|
||||
if let Err(e) = state
|
||||
.analysis
|
||||
.complete_analysis(analysis_id, full_content, metadata)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "保存分析结果失败");
|
||||
}
|
||||
analyzed += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
patient_id = %patient_id,
|
||||
error = %e,
|
||||
"发起分析失败"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(analyzed)
|
||||
}
|
||||
|
||||
/// 查找高风险患者:最近 7 天内有体征记录且存在异常指标的患者
|
||||
async fn find_high_risk_patients(
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
tenant_id: Uuid,
|
||||
) -> Result<Vec<Uuid>, String> {
|
||||
#[derive(Debug, FromQueryResult)]
|
||||
struct PatientIdRow {
|
||||
patient_id: Uuid,
|
||||
}
|
||||
|
||||
// 查找最近 7 天内有体征记录的活跃患者
|
||||
// 使用简化的高风险判定:最近 7 天有记录且 heart_rate 或 blood_sugar 偏离正常范围
|
||||
let sql = r#"
|
||||
SELECT DISTINCT vs.patient_id
|
||||
FROM vital_signs vs
|
||||
WHERE vs.tenant_id = $1
|
||||
AND vs.deleted_at IS NULL
|
||||
AND vs.record_date >= CURRENT_DATE - INTERVAL '7 days'
|
||||
AND (
|
||||
(vs.heart_rate IS NOT NULL AND (vs.heart_rate < 60 OR vs.heart_rate > 100))
|
||||
OR (vs.blood_sugar IS NOT NULL AND (vs.blood_sugar < 3.9 OR vs.blood_sugar > 11.1))
|
||||
OR (vs.systolic_bp_morning IS NOT NULL AND (vs.systolic_bp_morning > 140 OR vs.systolic_bp_morning < 90))
|
||||
OR (vs.spo2 IS NOT NULL AND vs.spo2 < 95)
|
||||
)
|
||||
LIMIT 50
|
||||
"#;
|
||||
|
||||
let rows: Vec<PatientIdRow> = PatientIdRow::find_by_statement(Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
sql,
|
||||
[tenant_id.into()],
|
||||
))
|
||||
.all(db)
|
||||
.await
|
||||
.map_err(|e| format!("查询高风险患者失败: {e}"))?;
|
||||
|
||||
Ok(rows.into_iter().map(|r| r.patient_id).collect())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_duration_为24小时() {
|
||||
assert_eq!(Duration::from_secs(86400).as_secs(), 86400);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_initial_delay_为5分钟() {
|
||||
assert_eq!(Duration::from_secs(300).as_secs(), 300);
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod analysis;
|
||||
pub mod auto_analysis;
|
||||
pub mod prompt;
|
||||
pub mod usage;
|
||||
|
||||
@@ -480,6 +480,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// Start auto trend analysis (every 24h, scans high-risk patients)
|
||||
erp_ai::service::auto_analysis::start_auto_analysis(ai_state.clone());
|
||||
tracing::info!("Auto trend analysis scheduler started");
|
||||
|
||||
// Build shared state
|
||||
let pii_crypto = if config.crypto.kek == "__MUST_SET_VIA_ENV__" {
|
||||
#[cfg(debug_assertions)]
|
||||
|
||||
Reference in New Issue
Block a user