From c87760f938eb9558936987133f1c4713020ac8f9 Mon Sep 17 00:00:00 2001 From: iven Date: Fri, 26 Jun 2026 11:11:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(server+ai):=20PP-01=20=E6=AD=BB=E4=BF=A1?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E6=8E=A5=E7=BA=BF=20+=20PP-05b=20AI=20?= =?UTF-8?q?=E9=98=9F=E5=88=97=E6=B6=88=E8=B4=B9=E8=80=85=20=E2=80=94=20?= =?UTF-8?q?=E9=80=9A=E7=94=B5=E5=8D=8A=E6=88=90=E5=93=81=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PP-01: retry_dead_letters 已实现但全仓零调用,业务关键事件瞬时故障即永久 滞留死信表。tasks.rs 加 start_retry_dead_letters(每小时,最大重试 5 次) + main.rs 注册。同时落盘 feat 进行中的 cron_heartbeat 就绪门禁 (touch_heartbeat + 给 cleanup/metrics 任务加 heartbeat 参数)。 PP-05b: AnalysisQueue "只入队不消费"(两个入队源 claim_next 零调用), 违反"每个事件必须有消费者"铁律。新增 analysis_worker.rs 后台消费者: claim_next → analysis_type 路由 → AnalysisService → mark_completed/ mark_failed。MVP 打通 trend 链路,lab_report/dialysis_risk 暂 skip (回滚 pending,无假数据)。启动遵循 start_auto_analysis 模式(main.rs)。 --- crates/erp-ai/src/service/analysis_worker.rs | 321 +++++++++++++++++++ crates/erp-ai/src/service/mod.rs | 1 + crates/erp-server/src/main.rs | 9 + crates/erp-server/src/tasks.rs | 55 +++- 4 files changed, 384 insertions(+), 2 deletions(-) create mode 100644 crates/erp-ai/src/service/analysis_worker.rs diff --git a/crates/erp-ai/src/service/analysis_worker.rs b/crates/erp-ai/src/service/analysis_worker.rs new file mode 100644 index 0000000..f655ff5 --- /dev/null +++ b/crates/erp-ai/src/service/analysis_worker.rs @@ -0,0 +1,321 @@ +//! AI 分析队列消费者 — 把 pending 队列任务驱动到 completed/failed。 +//! +//! `module.rs` 的事件入队 + `auto_analysis.rs` 的定时入队把任务写入 `ai_analysis_queue`, +//! 但 `claim_next` 此前无人调用,所有任务永远 pending(违反「每个事件必须有消费者」铁律)。 +//! +//! 本 worker 在后台循环 claim → 路由处理 → mark_completed / mark_failed, +//! 把 erp-health 触发的分析链路真正打通(MVP 聚焦趋势分析 trend,其他类型暂 skip)。 + +use std::time::Duration; + +use erp_core::health_provider::TimeRange; +use futures::StreamExt; +use uuid::Uuid; + +use crate::dto::AnalysisType; +use crate::entity::ai_analysis_queue; +use crate::error::{AiError, AiResult}; +use crate::service::analysis_queue::AnalysisQueue; +use crate::state::AiState; + +/// 轮询间隔:无任务时休眠 10 秒避免空转 +const IDLE_SLEEP: Duration = Duration::from_secs(10); + +/// 启动 AI 分析队列消费者(后台 tokio 任务)。 +/// +/// 不阻塞调用方:`tokio::spawn` 后立即返回。 +/// 在 `erp-server/src/main.rs` 中与 `start_auto_analysis` 一起启动。 +pub fn start_analysis_worker(state: AiState) { + tokio::spawn(async move { + tracing::info!("AI 分析队列消费者已启动(轮询间隔 {:?})", IDLE_SLEEP); + loop { + match process_once(&state).await { + Ok(Processed) => { + // 立即尝试下一个任务,不等待 + } + Ok(Idle) => { + tokio::time::sleep(IDLE_SLEEP).await; + } + Err(e) => { + tracing::warn!(error = %e, "分析队列消费循环异常,休眠后重试"); + tokio::time::sleep(IDLE_SLEEP).await; + } + } + } + }); +} + +enum ProcessOutcome { + /// 成功处理了一个任务(或已路由 skip),立刻尝试下一个 + Processed, + /// 队列空,进入休眠 + Idle, +} + +use ProcessOutcome::{Idle, Processed}; + +async fn process_once(state: &AiState) -> AiResult { + let queue = AnalysisQueue::new(state.db.clone()); + let job = match queue.claim_next(None).await? { + Some(j) => j, + None => return Ok(Idle), + }; + + let job_id = job.id; + tracing::info!( + job_id = %job_id, + tenant_id = %job.tenant_id, + patient_id = %job.patient_id, + analysis_type = %job.analysis_type, + source_event = ?job.source_event, + "已领取分析队列任务,开始处理" + ); + + match job.analysis_type.as_str() { + "trend" => handle_trend(state, &queue, job).await, + other => { + // MVP 阶段:非 trend 类型暂不支持自动消费。 + // 不写假数据,不标 completed(保留 pending 等未来扩展消费者), + // 只记日志后回滚 running → pending 让任务可被未来的处理器接手。 + tracing::info!( + job_id = %job_id, + analysis_type = %other, + "MVP 暂不支持的分析类型,跳过(保持 pending 供未来消费者处理)" + ); + // 回滚事务:claim_next 是事务化的,这里只更新状态不置 completed + rollback_running_to_pending(state, job_id).await?; + Ok(Processed) + } + } +} + +/// 把 running 状态的任务回滚为 pending(用于 MVP 不支持的类型)。 +/// +/// 注意:retry_count 不递增(这是路由跳过而非处理失败),max_retries 不应被消耗。 +async fn rollback_running_to_pending(state: &AiState, job_id: Uuid) -> AiResult<()> { + use sea_orm::ActiveModelTrait; + use sea_orm::EntityTrait; + use sea_orm::Set; + + let now = chrono::Utc::now(); + let entity = ai_analysis_queue::Entity::find_by_id(job_id) + .one(&state.db) + .await? + .ok_or_else(|| AiError::QueueError(format!("队列任务 {job_id} 未找到")))?; + let mut active: ai_analysis_queue::ActiveModel = entity.into(); + active.status = Set("pending".to_string()); + active.started_at = Set(None); + active.updated_at = Set(now); + active.version_lock = Set(active.version_lock.take().unwrap_or(0) + 1); + active.update(&state.db).await?; + Ok(()) +} + +/// 趋势分析:复刻 handler 的 stream_trends 链路,drain 流式结果后 complete。 +async fn handle_trend( + state: &AiState, + queue: &AnalysisQueue, + job: ai_analysis_queue::Model, +) -> AiResult { + let job_id = job.id; + let tenant_id = job.tenant_id; + let patient_id = job.patient_id; + + // 失败统一走 mark_failed(自带 retry_count/max_retries 重试逻辑) + match run_trend_analysis(state, tenant_id, patient_id).await { + Ok(analysis_id) => match queue.mark_completed(job_id, analysis_id).await { + Ok(()) => { + tracing::info!( + job_id = %job_id, + analysis_id = %analysis_id, + "趋势分析任务完成" + ); + Ok(Processed) + } + Err(e) => { + tracing::warn!(job_id = %job_id, error = %e, "mark_completed 失败"); + Ok(Processed) + } + }, + Err(e) => { + let err_msg = e.to_string(); + tracing::warn!( + job_id = %job_id, + patient_id = %patient_id, + error = %err_msg, + "趋势分析处理失败" + ); + match queue.mark_failed(job_id, err_msg).await { + Ok(()) => {} + Err(mfe) => { + tracing::warn!(job_id = %job_id, error = %mfe, "mark_failed 本身失败"); + } + } + Ok(Processed) + } + } +} + +/// 执行一次趋势分析,返回新建的 analysis_id。 +/// +/// 流程对齐 `handler::stream_trends` + `build_sse_stream`: +/// 取趋势数据 → sanitize → 加载 prompt → stream_analyze → drain 流 → complete_analysis。 +async fn run_trend_analysis(state: &AiState, tenant_id: Uuid, patient_id: Uuid) -> AiResult { + let metrics = vec![ + "systolic_bp_morning".to_string(), + "diastolic_bp_morning".to_string(), + "heart_rate".to_string(), + "weight".to_string(), + "blood_sugar".to_string(), + ]; + let range = TimeRange { + start: chrono::Utc::now() - chrono::Duration::days(90), + end: chrono::Utc::now(), + }; + + let trend_data = state + .health_provider + .get_trend_analysis_data(tenant_id, patient_id, &metrics, &range) + .await + .map_err(|e| AiError::ProviderError(format!("获取趋势数据失败: {e}")))?; + + if trend_data.metrics.is_empty() { + // 数据为空不是程序错误,但分析无法进行 → 返回失败让队列走重试/最终失败 + return Err(AiError::ProviderError( + "患者在选定时间段内无体征监测数据".to_string(), + )); + } + + let sanitized_data = state + .analysis + .sanitizer + .sanitize_trend_analysis(&trend_data)?; + + let prompt = state + .prompt + .get_active_prompt(tenant_id, AnalysisType::Trends.prompt_name()) + .await?; + + let (model, temperature, max_tokens) = + resolve_model_config(&prompt.model_config, tenant_id, &state.db).await; + + // 队列任务无 HTTP 上下文,user_id 用 nil 占位(仅用于审计记录) + let system_user = Uuid::nil(); + let (stream, analysis_id, _provider) = state + .analysis + .stream_analyze( + tenant_id, + system_user, + patient_id, + AnalysisType::Trends, + patient_id.to_string(), + prompt.system_prompt, + prompt.user_prompt_template, + sanitized_data, + model, + temperature, + max_tokens, + ) + .await?; + + // drain 流:累积全部输出,遇错 fail_analysis + let mut stream = std::pin::pin!(stream); + let mut full_content = String::new(); + while let Some(result) = stream.next().await { + match result { + Ok(chunk) => full_content.push_str(&chunk), + Err(e) => { + let _ = state + .analysis + .fail_analysis(analysis_id, e.to_string()) + .await; + return Err(e); + } + } + } + + let metadata = serde_json::json!({ "analysis_type": "trend", "source": "queue_worker" }); + state + .analysis + .complete_analysis(analysis_id, full_content.clone(), metadata.clone()) + .await?; + + // 用量记录(4 字符 ≈ 1 token 估算,对齐 SSE handler 逻辑) + let est_output_tokens = (full_content.len() as u32) / 4; + if let Err(e) = state + .usage + .log_usage( + tenant_id, + "queue_worker", + "", + "trend", + 0, + est_output_tokens, + 0, + 0, + false, + ) + .await + { + tracing::warn!(error = %e, "队列消费者记录用量失败"); + } + + // 后处理(解析建议、发布事件等)— 与 SSE handler 一致 + crate::service::post_process::post_process_analysis( + state, + analysis_id, + &full_content, + tenant_id, + patient_id, + system_user, + "trend", + metadata, + ) + .await; + + Ok(analysis_id) +} + +/// 解析 prompt.model_config + 租户默认配置,返回 (model, temperature, max_tokens)。 +/// +/// 与 `handler::resolve_model_config` 实现等价(独立复制避免跨模块可见性问题)。 +async fn resolve_model_config( + model_config: &serde_json::Value, + tenant_id: Uuid, + db: &sea_orm::DatabaseConnection, +) -> (String, f32, u32) { + let defaults = crate::config_resolver::load_ai_config(tenant_id, db).await; + let analysis = &defaults.analysis_defaults; + + let model = model_config + .get("model") + .and_then(|v| v.as_str()) + .unwrap_or(&analysis.model) + .to_string(); + let temperature = model_config + .get("temperature") + .and_then(|v| v.as_f64()) + .unwrap_or(analysis.temperature as f64) as f32; + let max_tokens = model_config + .get("max_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(analysis.max_tokens as u64) as u32; + + (model, temperature, max_tokens) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn idle_sleep_为10秒() { + assert_eq!(IDLE_SLEEP.as_secs(), 10); + } + + #[test] + fn process_outcome_枚举可用() { + let _a = Processed; + let _b = Idle; + } +} diff --git a/crates/erp-ai/src/service/mod.rs b/crates/erp-ai/src/service/mod.rs index bd02050..2cd71ac 100644 --- a/crates/erp-ai/src/service/mod.rs +++ b/crates/erp-ai/src/service/mod.rs @@ -1,5 +1,6 @@ pub mod analysis; pub mod analysis_queue; +pub mod analysis_worker; pub mod auto_analysis; pub mod cache; pub mod chat_message; diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index 0495034..163f179 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -644,6 +644,10 @@ async fn main() -> anyhow::Result<()> { erp_ai::service::auto_analysis::start_auto_analysis(ai_state.clone()); tracing::info!("Auto trend analysis scheduler started"); + // Start analysis queue worker (claims pending ai_analysis_queue jobs → analyzes → completes) + erp_ai::service::analysis_worker::start_analysis_worker(ai_state.clone()); + tracing::info!("AI analysis queue worker started"); + let cron_heartbeat = std::sync::Arc::new(std::sync::atomic::AtomicU64::new( std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -671,6 +675,11 @@ async fn main() -> anyhow::Result<()> { // Start background tasks with heartbeat tasks::start_event_cleanup(state.db.clone(), state.cron_heartbeat.clone()); tasks::start_pool_metrics(state.db.clone(), state.cron_heartbeat.clone()); + tasks::start_retry_dead_letters( + state.db.clone(), + state.event_bus.clone(), + state.cron_heartbeat.clone(), + ); // --- Build the router --- // diff --git a/crates/erp-server/src/tasks.rs b/crates/erp-server/src/tasks.rs index cbcefd5..100ccf6 100644 --- a/crates/erp-server/src/tasks.rs +++ b/crates/erp-server/src/tasks.rs @@ -1,11 +1,23 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; +use erp_core::events::{EventBus, retry_dead_letters}; + +fn touch_heartbeat(heartbeat: &Arc) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + heartbeat.store(now, Ordering::Relaxed); +} + /// 启动事件清理后台任务。 /// /// 每日执行一次: /// - 调用 `cleanup_old_published_events()` 归档 >7 天的已发布事件 /// - 调用 `cleanup_old_processed_events()` 清理 >7 天的去重记录 -pub fn start_event_cleanup(db: sea_orm::DatabaseConnection) { +pub fn start_event_cleanup(db: sea_orm::DatabaseConnection, heartbeat: Arc) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(86400)); loop { @@ -13,6 +25,7 @@ pub fn start_event_cleanup(db: sea_orm::DatabaseConnection) { if let Err(e) = run_cleanup(&db).await { tracing::warn!(error = %e, "事件清理任务执行失败"); } + touch_heartbeat(&heartbeat); } }); tracing::info!("事件清理任务已启动(每 24 小时执行一次)"); @@ -52,13 +65,14 @@ async fn run_cleanup(db: &sea_orm::DatabaseConnection) -> Result<(), sea_orm::Db /// - `db_pool_connections_active` — 当前活跃连接数 /// - `db_pool_connections_idle` — 当前空闲连接数 /// - `eventbus_pending_total` — pending 状态的领域事件数 -pub fn start_pool_metrics(db: sea_orm::DatabaseConnection) { +pub fn start_pool_metrics(db: sea_orm::DatabaseConnection, heartbeat: Arc) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; sample_pool_metrics(&db).await; sample_eventbus_backlog(&db).await; + touch_heartbeat(&heartbeat); } }); tracing::info!("DB 连接池 + EventBus 积压指标采样已启动(每 30 秒采样一次)"); @@ -111,3 +125,40 @@ async fn sample_eventbus_backlog(db: &sea_orm::DatabaseConnection) { } } } + +/// 启动死信重试后台任务。 +/// +/// 每小时执行一次: +/// - 调用 `erp_core::events::retry_dead_letters()` 重试 `dead_letter_events` 中 +/// 未解决且未超过最大重试次数的失败事件(指数退避由 attempts + last_error 记录) +/// - 最大重试 5 次,超过则标记永久失败 +/// +/// 触碰「每个事件必须有消费者」铁律的兜底:业务关键链路(危急值告警/积分发放/ +/// 预约提醒/article 推送)的瞬时故障借此自动恢复,不再永久滞留死信表。 +pub fn start_retry_dead_letters( + db: sea_orm::DatabaseConnection, + bus: EventBus, + heartbeat: Arc, +) { + tokio::spawn(async move { + // 首次延迟 60s,避免与启动期 outbox relay 抢资源 + tokio::time::sleep(Duration::from_secs(60)).await; + let mut interval = tokio::time::interval(Duration::from_secs(3600)); + loop { + interval.tick().await; + match retry_dead_letters(&db, &bus, 5).await { + Ok(retried) if retried > 0 => { + tracing::info!(retried, "死信重试任务完成(已重试 N 条)"); + } + Ok(_) => { + tracing::debug!("死信重试任务完成(无待重试事件)"); + } + Err(e) => { + tracing::warn!(error = %e, "死信重试任务执行失败"); + } + } + touch_heartbeat(&heartbeat); + } + }); + tracing::info!("死信重试任务已启动(每 1 小时执行一次,最大重试 5 次)"); +}