diff --git a/crates/erp-ai/src/service/analysis_queue.rs b/crates/erp-ai/src/service/analysis_queue.rs new file mode 100644 index 0000000..219696d --- /dev/null +++ b/crates/erp-ai/src/service/analysis_queue.rs @@ -0,0 +1,258 @@ +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, Set, Statement}; +use uuid::Uuid; + +use crate::entity::ai_analysis_queue; +use crate::error::{AiError, AiResult}; + +#[derive(Debug, FromQueryResult)] +struct QueueRow { + id: Uuid, + tenant_id: Uuid, + patient_id: Uuid, + analysis_type: String, + priority: i32, + status: String, + source_event: Option, + source_ref: String, + scheduled_at: chrono::DateTime, + started_at: Option>, + completed_at: Option>, + result_analysis_id: Option, + error_message: Option, + retry_count: i32, + max_retries: i32, + created_at: chrono::DateTime, + updated_at: chrono::DateTime, + created_by: Option, + updated_by: Option, + deleted_at: Option>, + version_lock: i32, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct AnalysisJob { + pub tenant_id: Uuid, + pub patient_id: Uuid, + pub analysis_type: String, + pub priority: i32, + pub source_event: Option, + pub source_ref: String, + pub created_by: Option, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct QueueStatus { + pub tenant_id: Uuid, + pub pending: i64, + pub running: i64, + pub completed: i64, + pub failed: i64, +} + +pub struct AnalysisQueue { + db: sea_orm::DatabaseConnection, +} + +impl AnalysisQueue { + pub fn new(db: sea_orm::DatabaseConnection) -> Self { + Self { db } + } + + pub async fn enqueue(&self, job: AnalysisJob) -> AiResult { + let id = Uuid::now_v7(); + let now = chrono::Utc::now(); + let active = ai_analysis_queue::ActiveModel { + id: Set(id), + tenant_id: Set(job.tenant_id), + patient_id: Set(job.patient_id), + analysis_type: Set(job.analysis_type), + priority: Set(job.priority), + status: Set("pending".to_string()), + source_event: Set(job.source_event), + source_ref: Set(job.source_ref), + scheduled_at: Set(now), + started_at: Set(None), + completed_at: Set(None), + result_analysis_id: Set(None), + error_message: Set(None), + retry_count: Set(0), + max_retries: Set(3), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(job.created_by), + updated_by: Set(None), + deleted_at: Set(None), + version_lock: Set(1), + }; + active.insert(&self.db).await?; + Ok(id) + } + + pub async fn claim_next(&self) -> AiResult> { + let sql = r#" + SELECT * FROM ai_analysis_queue + WHERE status = 'pending' + AND deleted_at IS NULL + AND scheduled_at <= NOW() + ORDER BY priority DESC, scheduled_at ASC + LIMIT 1 + "#; + + let row: Option = QueueRow::find_by_statement( + Statement::from_string(sea_orm::DatabaseBackend::Postgres, sql.to_string()), + ) + .one(&self.db) + .await?; + + match row { + Some(r) => { + let now = chrono::Utc::now(); + let mut active: ai_analysis_queue::ActiveModel = self.find_by_id(r.id).await?.into(); + active.status = Set("running".to_string()); + active.started_at = Set(Some(now)); + active.updated_at = Set(now); + active.version_lock = Set(active.version_lock.unwrap() + 1); + let model = active.update(&self.db).await?; + Ok(Some(model)) + } + None => Ok(None), + } + } + + pub async fn mark_completed( + &self, + id: Uuid, + result_analysis_id: Uuid, + ) -> AiResult<()> { + let job = self.find_by_id(id).await?; + let now = chrono::Utc::now(); + let mut active: ai_analysis_queue::ActiveModel = job.into(); + active.status = Set("completed".to_string()); + active.completed_at = Set(Some(now)); + active.result_analysis_id = Set(Some(result_analysis_id)); + active.updated_at = Set(now); + active.version_lock = Set(active.version_lock.unwrap() + 1); + active.update(&self.db).await?; + Ok(()) + } + + pub async fn mark_failed(&self, id: Uuid, error: String) -> AiResult<()> { + let job = self.find_by_id(id).await?; + let now = chrono::Utc::now(); + let retry_count = job.retry_count; + let max_retries = job.max_retries; + + let new_status = if retry_count < max_retries { + "pending" + } else { + "failed" + }; + + let mut active: ai_analysis_queue::ActiveModel = job.into(); + active.status = Set(new_status.to_string()); + active.error_message = Set(Some(error)); + active.retry_count = Set(retry_count + 1); + active.started_at = Set(None); + active.updated_at = Set(now); + active.version_lock = Set(active.version_lock.unwrap() + 1); + active.update(&self.db).await?; + Ok(()) + } + + pub async fn queue_status(&self, tenant_id: Uuid) -> AiResult { + #[derive(Debug, FromQueryResult)] + struct StatusCount { + status: String, + count: i64, + } + + let sql = r#" + SELECT status, COUNT(*) AS count + FROM ai_analysis_queue + WHERE tenant_id = $1 AND deleted_at IS NULL + GROUP BY status + "#; + + let rows: Vec = StatusCount::find_by_statement( + Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + sql, + [tenant_id.into()], + ), + ) + .all(&self.db) + .await?; + + let mut pending = 0i64; + let mut running = 0i64; + let mut completed = 0i64; + let mut failed = 0i64; + + for row in rows { + match row.status.as_str() { + "pending" => pending = row.count, + "running" => running = row.count, + "completed" => completed = row.count, + "failed" => failed = row.count, + _ => {} + } + } + + Ok(QueueStatus { + tenant_id, + pending, + running, + completed, + failed, + }) + } + + async fn find_by_id(&self, id: Uuid) -> AiResult { + ai_analysis_queue::Entity::find_by_id(id) + .one(&self.db) + .await? + .ok_or_else(|| AiError::QueueError(format!("队列任务 {id} 未找到"))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn analysis_job_construction() { + let job = AnalysisJob { + tenant_id: Uuid::now_v7(), + patient_id: Uuid::now_v7(), + analysis_type: "lab_report".into(), + priority: 2, + source_event: Some("health_data.critical_alert".into()), + source_ref: "auto".into(), + created_by: Some(Uuid::now_v7()), + }; + assert_eq!(job.analysis_type, "lab_report"); + assert_eq!(job.priority, 2); + } + + #[test] + fn queue_status_serialization() { + let status = QueueStatus { + tenant_id: Uuid::now_v7(), + pending: 5, + running: 2, + completed: 100, + failed: 1, + }; + let json = serde_json::to_value(&status).unwrap(); + assert_eq!(json["pending"], 5); + assert_eq!(json["running"], 2); + assert_eq!(json["completed"], 100); + assert_eq!(json["failed"], 1); + } + + #[test] + fn retry_logic() { + assert!(0 < 3); + assert!(!(3 < 3)); + } +} diff --git a/crates/erp-ai/src/service/mod.rs b/crates/erp-ai/src/service/mod.rs index 8c23d09..bfb0ac8 100644 --- a/crates/erp-ai/src/service/mod.rs +++ b/crates/erp-ai/src/service/mod.rs @@ -1,4 +1,5 @@ pub mod analysis; +pub mod analysis_queue; pub mod auto_analysis; pub mod cache; pub mod comparison;