feat(ai): 实现 AnalysisQueue 服务
支持 enqueue/claim_next/mark_completed/mark_failed 状态机 失败自动重试(retry_count < max_retries → pending),queue_status 聚合查询
This commit is contained in:
258
crates/erp-ai/src/service/analysis_queue.rs
Normal file
258
crates/erp-ai/src/service/analysis_queue.rs
Normal file
@@ -0,0 +1,258 @@
|
||||
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, Set, Statement};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::entity::ai_analysis_queue;
|
||||
use crate::error::{AiError, AiResult};
|
||||
|
||||
#[derive(Debug, FromQueryResult)]
|
||||
struct QueueRow {
|
||||
id: Uuid,
|
||||
tenant_id: Uuid,
|
||||
patient_id: Uuid,
|
||||
analysis_type: String,
|
||||
priority: i32,
|
||||
status: String,
|
||||
source_event: Option<String>,
|
||||
source_ref: String,
|
||||
scheduled_at: chrono::DateTime<chrono::Utc>,
|
||||
started_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
completed_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
result_analysis_id: Option<Uuid>,
|
||||
error_message: Option<String>,
|
||||
retry_count: i32,
|
||||
max_retries: i32,
|
||||
created_at: chrono::DateTime<chrono::Utc>,
|
||||
updated_at: chrono::DateTime<chrono::Utc>,
|
||||
created_by: Option<Uuid>,
|
||||
updated_by: Option<Uuid>,
|
||||
deleted_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
version_lock: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct AnalysisJob {
|
||||
pub tenant_id: Uuid,
|
||||
pub patient_id: Uuid,
|
||||
pub analysis_type: String,
|
||||
pub priority: i32,
|
||||
pub source_event: Option<String>,
|
||||
pub source_ref: String,
|
||||
pub created_by: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct QueueStatus {
|
||||
pub tenant_id: Uuid,
|
||||
pub pending: i64,
|
||||
pub running: i64,
|
||||
pub completed: i64,
|
||||
pub failed: i64,
|
||||
}
|
||||
|
||||
pub struct AnalysisQueue {
|
||||
db: sea_orm::DatabaseConnection,
|
||||
}
|
||||
|
||||
impl AnalysisQueue {
|
||||
pub fn new(db: sea_orm::DatabaseConnection) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
|
||||
pub async fn enqueue(&self, job: AnalysisJob) -> AiResult<Uuid> {
|
||||
let id = Uuid::now_v7();
|
||||
let now = chrono::Utc::now();
|
||||
let active = ai_analysis_queue::ActiveModel {
|
||||
id: Set(id),
|
||||
tenant_id: Set(job.tenant_id),
|
||||
patient_id: Set(job.patient_id),
|
||||
analysis_type: Set(job.analysis_type),
|
||||
priority: Set(job.priority),
|
||||
status: Set("pending".to_string()),
|
||||
source_event: Set(job.source_event),
|
||||
source_ref: Set(job.source_ref),
|
||||
scheduled_at: Set(now),
|
||||
started_at: Set(None),
|
||||
completed_at: Set(None),
|
||||
result_analysis_id: Set(None),
|
||||
error_message: Set(None),
|
||||
retry_count: Set(0),
|
||||
max_retries: Set(3),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(job.created_by),
|
||||
updated_by: Set(None),
|
||||
deleted_at: Set(None),
|
||||
version_lock: Set(1),
|
||||
};
|
||||
active.insert(&self.db).await?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub async fn claim_next(&self) -> AiResult<Option<ai_analysis_queue::Model>> {
|
||||
let sql = r#"
|
||||
SELECT * FROM ai_analysis_queue
|
||||
WHERE status = 'pending'
|
||||
AND deleted_at IS NULL
|
||||
AND scheduled_at <= NOW()
|
||||
ORDER BY priority DESC, scheduled_at ASC
|
||||
LIMIT 1
|
||||
"#;
|
||||
|
||||
let row: Option<QueueRow> = QueueRow::find_by_statement(
|
||||
Statement::from_string(sea_orm::DatabaseBackend::Postgres, sql.to_string()),
|
||||
)
|
||||
.one(&self.db)
|
||||
.await?;
|
||||
|
||||
match row {
|
||||
Some(r) => {
|
||||
let now = chrono::Utc::now();
|
||||
let mut active: ai_analysis_queue::ActiveModel = self.find_by_id(r.id).await?.into();
|
||||
active.status = Set("running".to_string());
|
||||
active.started_at = Set(Some(now));
|
||||
active.updated_at = Set(now);
|
||||
active.version_lock = Set(active.version_lock.unwrap() + 1);
|
||||
let model = active.update(&self.db).await?;
|
||||
Ok(Some(model))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn mark_completed(
|
||||
&self,
|
||||
id: Uuid,
|
||||
result_analysis_id: Uuid,
|
||||
) -> AiResult<()> {
|
||||
let job = self.find_by_id(id).await?;
|
||||
let now = chrono::Utc::now();
|
||||
let mut active: ai_analysis_queue::ActiveModel = job.into();
|
||||
active.status = Set("completed".to_string());
|
||||
active.completed_at = Set(Some(now));
|
||||
active.result_analysis_id = Set(Some(result_analysis_id));
|
||||
active.updated_at = Set(now);
|
||||
active.version_lock = Set(active.version_lock.unwrap() + 1);
|
||||
active.update(&self.db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn mark_failed(&self, id: Uuid, error: String) -> AiResult<()> {
|
||||
let job = self.find_by_id(id).await?;
|
||||
let now = chrono::Utc::now();
|
||||
let retry_count = job.retry_count;
|
||||
let max_retries = job.max_retries;
|
||||
|
||||
let new_status = if retry_count < max_retries {
|
||||
"pending"
|
||||
} else {
|
||||
"failed"
|
||||
};
|
||||
|
||||
let mut active: ai_analysis_queue::ActiveModel = job.into();
|
||||
active.status = Set(new_status.to_string());
|
||||
active.error_message = Set(Some(error));
|
||||
active.retry_count = Set(retry_count + 1);
|
||||
active.started_at = Set(None);
|
||||
active.updated_at = Set(now);
|
||||
active.version_lock = Set(active.version_lock.unwrap() + 1);
|
||||
active.update(&self.db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn queue_status(&self, tenant_id: Uuid) -> AiResult<QueueStatus> {
|
||||
#[derive(Debug, FromQueryResult)]
|
||||
struct StatusCount {
|
||||
status: String,
|
||||
count: i64,
|
||||
}
|
||||
|
||||
let sql = r#"
|
||||
SELECT status, COUNT(*) AS count
|
||||
FROM ai_analysis_queue
|
||||
WHERE tenant_id = $1 AND deleted_at IS NULL
|
||||
GROUP BY status
|
||||
"#;
|
||||
|
||||
let rows: Vec<StatusCount> = StatusCount::find_by_statement(
|
||||
Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
sql,
|
||||
[tenant_id.into()],
|
||||
),
|
||||
)
|
||||
.all(&self.db)
|
||||
.await?;
|
||||
|
||||
let mut pending = 0i64;
|
||||
let mut running = 0i64;
|
||||
let mut completed = 0i64;
|
||||
let mut failed = 0i64;
|
||||
|
||||
for row in rows {
|
||||
match row.status.as_str() {
|
||||
"pending" => pending = row.count,
|
||||
"running" => running = row.count,
|
||||
"completed" => completed = row.count,
|
||||
"failed" => failed = row.count,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(QueueStatus {
|
||||
tenant_id,
|
||||
pending,
|
||||
running,
|
||||
completed,
|
||||
failed,
|
||||
})
|
||||
}
|
||||
|
||||
async fn find_by_id(&self, id: Uuid) -> AiResult<ai_analysis_queue::Model> {
|
||||
ai_analysis_queue::Entity::find_by_id(id)
|
||||
.one(&self.db)
|
||||
.await?
|
||||
.ok_or_else(|| AiError::QueueError(format!("队列任务 {id} 未找到")))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn analysis_job_construction() {
|
||||
let job = AnalysisJob {
|
||||
tenant_id: Uuid::now_v7(),
|
||||
patient_id: Uuid::now_v7(),
|
||||
analysis_type: "lab_report".into(),
|
||||
priority: 2,
|
||||
source_event: Some("health_data.critical_alert".into()),
|
||||
source_ref: "auto".into(),
|
||||
created_by: Some(Uuid::now_v7()),
|
||||
};
|
||||
assert_eq!(job.analysis_type, "lab_report");
|
||||
assert_eq!(job.priority, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queue_status_serialization() {
|
||||
let status = QueueStatus {
|
||||
tenant_id: Uuid::now_v7(),
|
||||
pending: 5,
|
||||
running: 2,
|
||||
completed: 100,
|
||||
failed: 1,
|
||||
};
|
||||
let json = serde_json::to_value(&status).unwrap();
|
||||
assert_eq!(json["pending"], 5);
|
||||
assert_eq!(json["running"], 2);
|
||||
assert_eq!(json["completed"], 100);
|
||||
assert_eq!(json["failed"], 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retry_logic() {
|
||||
assert!(0 < 3);
|
||||
assert!(!(3 < 3));
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod analysis;
|
||||
pub mod analysis_queue;
|
||||
pub mod auto_analysis;
|
||||
pub mod cache;
|
||||
pub mod comparison;
|
||||
|
||||
Reference in New Issue
Block a user