feat(server+ai): PP-01 死信重试接线 + PP-05b AI 队列消费者 — 通电半成品自动化

PP-01: retry_dead_letters 已实现但全仓零调用,业务关键事件瞬时故障即永久
滞留死信表。tasks.rs 加 start_retry_dead_letters(每小时,最大重试 5 次)
+ main.rs 注册。同时落盘 feat 进行中的 cron_heartbeat 就绪门禁
(touch_heartbeat + 给 cleanup/metrics 任务加 heartbeat 参数)。

PP-05b: AnalysisQueue "只入队不消费"(两个入队源 claim_next 零调用),
违反"每个事件必须有消费者"铁律。新增 analysis_worker.rs 后台消费者:
claim_next → analysis_type 路由 → AnalysisService → mark_completed/
mark_failed。MVP 打通 trend 链路,lab_report/dialysis_risk 暂 skip
(回滚 pending,无假数据)。启动遵循 start_auto_analysis 模式(main.rs)。
This commit is contained in:
iven
2026-06-26 11:11:14 +08:00
parent 75f0dc4354
commit c87760f938
4 changed files with 384 additions and 2 deletions

View File

@@ -0,0 +1,321 @@
//! AI 分析队列消费者 — 把 pending 队列任务驱动到 completed/failed。
//!
//! `module.rs` 的事件入队 + `auto_analysis.rs` 的定时入队把任务写入 `ai_analysis_queue`
//! 但 `claim_next` 此前无人调用,所有任务永远 pending违反「每个事件必须有消费者」铁律
//!
//! 本 worker 在后台循环 claim → 路由处理 → mark_completed / mark_failed
//! 把 erp-health 触发的分析链路真正打通MVP 聚焦趋势分析 trend其他类型暂 skip
use std::time::Duration;
use erp_core::health_provider::TimeRange;
use futures::StreamExt;
use uuid::Uuid;
use crate::dto::AnalysisType;
use crate::entity::ai_analysis_queue;
use crate::error::{AiError, AiResult};
use crate::service::analysis_queue::AnalysisQueue;
use crate::state::AiState;
/// 轮询间隔:无任务时休眠 10 秒避免空转
const IDLE_SLEEP: Duration = Duration::from_secs(10);
/// 启动 AI 分析队列消费者(后台 tokio 任务)。
///
/// 不阻塞调用方:`tokio::spawn` 后立即返回。
/// 在 `erp-server/src/main.rs` 中与 `start_auto_analysis` 一起启动。
pub fn start_analysis_worker(state: AiState) {
tokio::spawn(async move {
tracing::info!("AI 分析队列消费者已启动(轮询间隔 {:?}", IDLE_SLEEP);
loop {
match process_once(&state).await {
Ok(Processed) => {
// 立即尝试下一个任务,不等待
}
Ok(Idle) => {
tokio::time::sleep(IDLE_SLEEP).await;
}
Err(e) => {
tracing::warn!(error = %e, "分析队列消费循环异常,休眠后重试");
tokio::time::sleep(IDLE_SLEEP).await;
}
}
}
});
}
enum ProcessOutcome {
/// 成功处理了一个任务(或已路由 skip立刻尝试下一个
Processed,
/// 队列空,进入休眠
Idle,
}
use ProcessOutcome::{Idle, Processed};
async fn process_once(state: &AiState) -> AiResult<ProcessOutcome> {
let queue = AnalysisQueue::new(state.db.clone());
let job = match queue.claim_next(None).await? {
Some(j) => j,
None => return Ok(Idle),
};
let job_id = job.id;
tracing::info!(
job_id = %job_id,
tenant_id = %job.tenant_id,
patient_id = %job.patient_id,
analysis_type = %job.analysis_type,
source_event = ?job.source_event,
"已领取分析队列任务,开始处理"
);
match job.analysis_type.as_str() {
"trend" => handle_trend(state, &queue, job).await,
other => {
// MVP 阶段:非 trend 类型暂不支持自动消费。
// 不写假数据,不标 completed保留 pending 等未来扩展消费者),
// 只记日志后回滚 running → pending 让任务可被未来的处理器接手。
tracing::info!(
job_id = %job_id,
analysis_type = %other,
"MVP 暂不支持的分析类型,跳过(保持 pending 供未来消费者处理)"
);
// 回滚事务claim_next 是事务化的,这里只更新状态不置 completed
rollback_running_to_pending(state, job_id).await?;
Ok(Processed)
}
}
}
/// 把 running 状态的任务回滚为 pending用于 MVP 不支持的类型)。
///
/// 注意retry_count 不递增这是路由跳过而非处理失败max_retries 不应被消耗。
async fn rollback_running_to_pending(state: &AiState, job_id: Uuid) -> AiResult<()> {
use sea_orm::ActiveModelTrait;
use sea_orm::EntityTrait;
use sea_orm::Set;
let now = chrono::Utc::now();
let entity = ai_analysis_queue::Entity::find_by_id(job_id)
.one(&state.db)
.await?
.ok_or_else(|| AiError::QueueError(format!("队列任务 {job_id} 未找到")))?;
let mut active: ai_analysis_queue::ActiveModel = entity.into();
active.status = Set("pending".to_string());
active.started_at = Set(None);
active.updated_at = Set(now);
active.version_lock = Set(active.version_lock.take().unwrap_or(0) + 1);
active.update(&state.db).await?;
Ok(())
}
/// 趋势分析:复刻 handler 的 stream_trends 链路drain 流式结果后 complete。
async fn handle_trend(
state: &AiState,
queue: &AnalysisQueue,
job: ai_analysis_queue::Model,
) -> AiResult<ProcessOutcome> {
let job_id = job.id;
let tenant_id = job.tenant_id;
let patient_id = job.patient_id;
// 失败统一走 mark_failed自带 retry_count/max_retries 重试逻辑)
match run_trend_analysis(state, tenant_id, patient_id).await {
Ok(analysis_id) => match queue.mark_completed(job_id, analysis_id).await {
Ok(()) => {
tracing::info!(
job_id = %job_id,
analysis_id = %analysis_id,
"趋势分析任务完成"
);
Ok(Processed)
}
Err(e) => {
tracing::warn!(job_id = %job_id, error = %e, "mark_completed 失败");
Ok(Processed)
}
},
Err(e) => {
let err_msg = e.to_string();
tracing::warn!(
job_id = %job_id,
patient_id = %patient_id,
error = %err_msg,
"趋势分析处理失败"
);
match queue.mark_failed(job_id, err_msg).await {
Ok(()) => {}
Err(mfe) => {
tracing::warn!(job_id = %job_id, error = %mfe, "mark_failed 本身失败");
}
}
Ok(Processed)
}
}
}
/// 执行一次趋势分析,返回新建的 analysis_id。
///
/// 流程对齐 `handler::stream_trends` + `build_sse_stream`
/// 取趋势数据 → sanitize → 加载 prompt → stream_analyze → drain 流 → complete_analysis。
async fn run_trend_analysis(state: &AiState, tenant_id: Uuid, patient_id: Uuid) -> AiResult<Uuid> {
let metrics = vec![
"systolic_bp_morning".to_string(),
"diastolic_bp_morning".to_string(),
"heart_rate".to_string(),
"weight".to_string(),
"blood_sugar".to_string(),
];
let range = TimeRange {
start: chrono::Utc::now() - chrono::Duration::days(90),
end: chrono::Utc::now(),
};
let trend_data = state
.health_provider
.get_trend_analysis_data(tenant_id, patient_id, &metrics, &range)
.await
.map_err(|e| AiError::ProviderError(format!("获取趋势数据失败: {e}")))?;
if trend_data.metrics.is_empty() {
// 数据为空不是程序错误,但分析无法进行 → 返回失败让队列走重试/最终失败
return Err(AiError::ProviderError(
"患者在选定时间段内无体征监测数据".to_string(),
));
}
let sanitized_data = state
.analysis
.sanitizer
.sanitize_trend_analysis(&trend_data)?;
let prompt = state
.prompt
.get_active_prompt(tenant_id, AnalysisType::Trends.prompt_name())
.await?;
let (model, temperature, max_tokens) =
resolve_model_config(&prompt.model_config, tenant_id, &state.db).await;
// 队列任务无 HTTP 上下文user_id 用 nil 占位(仅用于审计记录)
let system_user = Uuid::nil();
let (stream, analysis_id, _provider) = state
.analysis
.stream_analyze(
tenant_id,
system_user,
patient_id,
AnalysisType::Trends,
patient_id.to_string(),
prompt.system_prompt,
prompt.user_prompt_template,
sanitized_data,
model,
temperature,
max_tokens,
)
.await?;
// drain 流:累积全部输出,遇错 fail_analysis
let mut stream = std::pin::pin!(stream);
let mut full_content = String::new();
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => full_content.push_str(&chunk),
Err(e) => {
let _ = state
.analysis
.fail_analysis(analysis_id, e.to_string())
.await;
return Err(e);
}
}
}
let metadata = serde_json::json!({ "analysis_type": "trend", "source": "queue_worker" });
state
.analysis
.complete_analysis(analysis_id, full_content.clone(), metadata.clone())
.await?;
// 用量记录4 字符 ≈ 1 token 估算,对齐 SSE handler 逻辑)
let est_output_tokens = (full_content.len() as u32) / 4;
if let Err(e) = state
.usage
.log_usage(
tenant_id,
"queue_worker",
"",
"trend",
0,
est_output_tokens,
0,
0,
false,
)
.await
{
tracing::warn!(error = %e, "队列消费者记录用量失败");
}
// 后处理(解析建议、发布事件等)— 与 SSE handler 一致
crate::service::post_process::post_process_analysis(
state,
analysis_id,
&full_content,
tenant_id,
patient_id,
system_user,
"trend",
metadata,
)
.await;
Ok(analysis_id)
}
/// 解析 prompt.model_config + 租户默认配置,返回 (model, temperature, max_tokens)。
///
/// 与 `handler::resolve_model_config` 实现等价(独立复制避免跨模块可见性问题)。
async fn resolve_model_config(
model_config: &serde_json::Value,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> (String, f32, u32) {
let defaults = crate::config_resolver::load_ai_config(tenant_id, db).await;
let analysis = &defaults.analysis_defaults;
let model = model_config
.get("model")
.and_then(|v| v.as_str())
.unwrap_or(&analysis.model)
.to_string();
let temperature = model_config
.get("temperature")
.and_then(|v| v.as_f64())
.unwrap_or(analysis.temperature as f64) as f32;
let max_tokens = model_config
.get("max_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(analysis.max_tokens as u64) as u32;
(model, temperature, max_tokens)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn idle_sleep_为10秒() {
assert_eq!(IDLE_SLEEP.as_secs(), 10);
}
#[test]
fn process_outcome_枚举可用() {
let _a = Processed;
let _b = Idle;
}
}

View File

@@ -1,5 +1,6 @@
pub mod analysis;
pub mod analysis_queue;
pub mod analysis_worker;
pub mod auto_analysis;
pub mod cache;
pub mod chat_message;

View File

@@ -644,6 +644,10 @@ async fn main() -> anyhow::Result<()> {
erp_ai::service::auto_analysis::start_auto_analysis(ai_state.clone());
tracing::info!("Auto trend analysis scheduler started");
// Start analysis queue worker (claims pending ai_analysis_queue jobs → analyzes → completes)
erp_ai::service::analysis_worker::start_analysis_worker(ai_state.clone());
tracing::info!("AI analysis queue worker started");
let cron_heartbeat = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
@@ -671,6 +675,11 @@ async fn main() -> anyhow::Result<()> {
// Start background tasks with heartbeat
tasks::start_event_cleanup(state.db.clone(), state.cron_heartbeat.clone());
tasks::start_pool_metrics(state.db.clone(), state.cron_heartbeat.clone());
tasks::start_retry_dead_letters(
state.db.clone(),
state.event_bus.clone(),
state.cron_heartbeat.clone(),
);
// --- Build the router ---
//

View File

@@ -1,11 +1,23 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use erp_core::events::{EventBus, retry_dead_letters};
fn touch_heartbeat(heartbeat: &Arc<AtomicU64>) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
heartbeat.store(now, Ordering::Relaxed);
}
/// 启动事件清理后台任务。
///
/// 每日执行一次:
/// - 调用 `cleanup_old_published_events()` 归档 >7 天的已发布事件
/// - 调用 `cleanup_old_processed_events()` 清理 >7 天的去重记录
pub fn start_event_cleanup(db: sea_orm::DatabaseConnection) {
pub fn start_event_cleanup(db: sea_orm::DatabaseConnection, heartbeat: Arc<AtomicU64>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(86400));
loop {
@@ -13,6 +25,7 @@ pub fn start_event_cleanup(db: sea_orm::DatabaseConnection) {
if let Err(e) = run_cleanup(&db).await {
tracing::warn!(error = %e, "事件清理任务执行失败");
}
touch_heartbeat(&heartbeat);
}
});
tracing::info!("事件清理任务已启动(每 24 小时执行一次)");
@@ -52,13 +65,14 @@ async fn run_cleanup(db: &sea_orm::DatabaseConnection) -> Result<(), sea_orm::Db
/// - `db_pool_connections_active` — 当前活跃连接数
/// - `db_pool_connections_idle` — 当前空闲连接数
/// - `eventbus_pending_total` — pending 状态的领域事件数
pub fn start_pool_metrics(db: sea_orm::DatabaseConnection) {
pub fn start_pool_metrics(db: sea_orm::DatabaseConnection, heartbeat: Arc<AtomicU64>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
sample_pool_metrics(&db).await;
sample_eventbus_backlog(&db).await;
touch_heartbeat(&heartbeat);
}
});
tracing::info!("DB 连接池 + EventBus 积压指标采样已启动(每 30 秒采样一次)");
@@ -111,3 +125,40 @@ async fn sample_eventbus_backlog(db: &sea_orm::DatabaseConnection) {
}
}
}
/// 启动死信重试后台任务。
///
/// 每小时执行一次:
/// - 调用 `erp_core::events::retry_dead_letters()` 重试 `dead_letter_events` 中
/// 未解决且未超过最大重试次数的失败事件(指数退避由 attempts + last_error 记录)
/// - 最大重试 5 次,超过则标记永久失败
///
/// 触碰「每个事件必须有消费者」铁律的兜底:业务关键链路(危急值告警/积分发放/
/// 预约提醒/article 推送)的瞬时故障借此自动恢复,不再永久滞留死信表。
pub fn start_retry_dead_letters(
db: sea_orm::DatabaseConnection,
bus: EventBus,
heartbeat: Arc<AtomicU64>,
) {
tokio::spawn(async move {
// 首次延迟 60s避免与启动期 outbox relay 抢资源
tokio::time::sleep(Duration::from_secs(60)).await;
let mut interval = tokio::time::interval(Duration::from_secs(3600));
loop {
interval.tick().await;
match retry_dead_letters(&db, &bus, 5).await {
Ok(retried) if retried > 0 => {
tracing::info!(retried, "死信重试任务完成(已重试 N 条)");
}
Ok(_) => {
tracing::debug!("死信重试任务完成(无待重试事件)");
}
Err(e) => {
tracing::warn!(error = %e, "死信重试任务执行失败");
}
}
touch_heartbeat(&heartbeat);
}
});
tracing::info!("死信重试任务已启动(每 1 小时执行一次,最大重试 5 次)");
}