feat(ai): SSE 流式分析 Handler 实现 (4 端点 + 历史)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,20 +1,348 @@
|
||||
use axum::response::IntoResponse;
|
||||
use axum::extract::{Extension, FromRef, Path, Query, State};
|
||||
use axum::response::sse::{Event, KeepAlive, Sse};
|
||||
use axum::Json;
|
||||
use erp_core::rbac::require_permission;
|
||||
use erp_core::types::{ApiResponse, TenantContext};
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
use std::convert::Infallible;
|
||||
|
||||
pub async fn stream_lab_report() -> impl IntoResponse {
|
||||
"stub"
|
||||
use crate::dto::{AnalysisSseEvent, AnalysisType};
|
||||
use crate::state::AiState;
|
||||
|
||||
// === 分析请求 Body ===
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AnalyzeBody {
|
||||
pub report_id: Option<uuid::Uuid>,
|
||||
pub patient_id: Option<uuid::Uuid>,
|
||||
pub metrics: Option<Vec<String>>,
|
||||
}
|
||||
pub async fn stream_trends() -> impl IntoResponse {
|
||||
"stub"
|
||||
|
||||
// === SSE 分析端点 ===
|
||||
|
||||
pub async fn stream_lab_report<S>(
|
||||
State(state): State<AiState>,
|
||||
Extension(ctx): Extension<TenantContext>,
|
||||
Json(body): Json<AnalyzeBody>,
|
||||
) -> Result<Sse<impl futures::Stream<Item = Result<Event, Infallible>>>, erp_core::error::AppError>
|
||||
where
|
||||
AiState: FromRef<S>,
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
require_permission(&ctx, "ai.analysis.manage")?;
|
||||
let report_id = body.report_id.ok_or_else(|| {
|
||||
erp_core::error::AppError::Validation("report_id 必填".into())
|
||||
})?;
|
||||
|
||||
let prompt = state
|
||||
.prompt
|
||||
.get_active_prompt(ctx.tenant_id, "lab_report_interpretation")
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let model_config = &prompt.model_config;
|
||||
let model = model_config["model"].as_str().unwrap_or("claude-sonnet-4-6").to_string();
|
||||
let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32;
|
||||
let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32;
|
||||
|
||||
let (stream, analysis_id, _provider_name) = state
|
||||
.analysis
|
||||
.stream_analyze(
|
||||
ctx.tenant_id,
|
||||
ctx.user_id,
|
||||
uuid::Uuid::nil(),
|
||||
AnalysisType::LabReport,
|
||||
report_id.to_string(),
|
||||
prompt.system_prompt,
|
||||
prompt.user_prompt_template,
|
||||
serde_json::json!({"placeholder": true}),
|
||||
model,
|
||||
temperature,
|
||||
max_tokens,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = async_stream::stream! {
|
||||
let mut full_content = String::new();
|
||||
let mut index: u32 = 0;
|
||||
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(chunk) => {
|
||||
full_content.push_str(&chunk);
|
||||
index += 1;
|
||||
let event = AnalysisSseEvent::Chunk {
|
||||
content: chunk,
|
||||
index,
|
||||
};
|
||||
let data = serde_json::to_string(&event).unwrap_or_default();
|
||||
yield Ok(Event::default().data(data));
|
||||
}
|
||||
Err(e) => {
|
||||
let event = AnalysisSseEvent::Error {
|
||||
message: e.to_string(),
|
||||
};
|
||||
let data = serde_json::to_string(&event).unwrap_or_default();
|
||||
yield Ok(Event::default().data(data));
|
||||
let _ = state_clone
|
||||
.analysis
|
||||
.fail_analysis(analysis_id_clone, e.to_string())
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 完成后存储结果
|
||||
let metadata = serde_json::json!({"analysis_type": "lab_report"});
|
||||
let _ = state_clone
|
||||
.analysis
|
||||
.complete_analysis(analysis_id_clone, full_content, metadata)
|
||||
.await;
|
||||
|
||||
let done_event = AnalysisSseEvent::Done {
|
||||
analysis_id: analysis_id_clone,
|
||||
status: "completed".into(),
|
||||
};
|
||||
let data = serde_json::to_string(&done_event).unwrap_or_default();
|
||||
yield Ok(Event::default().data(data));
|
||||
};
|
||||
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
pub async fn stream_checkup_plan() -> impl IntoResponse {
|
||||
"stub"
|
||||
|
||||
pub async fn stream_trends<S>(
|
||||
State(state): State<AiState>,
|
||||
Extension(ctx): Extension<TenantContext>,
|
||||
Json(body): Json<AnalyzeBody>,
|
||||
) -> Result<Sse<impl futures::Stream<Item = Result<Event, Infallible>>>, erp_core::error::AppError>
|
||||
where
|
||||
AiState: FromRef<S>,
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
require_permission(&ctx, "ai.analysis.manage")?;
|
||||
let patient_id = body.patient_id.ok_or_else(|| {
|
||||
erp_core::error::AppError::Validation("patient_id 必填".into())
|
||||
})?;
|
||||
|
||||
let prompt = state
|
||||
.prompt
|
||||
.get_active_prompt(ctx.tenant_id, "health_trend_analysis")
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let model_config = &prompt.model_config;
|
||||
let model = model_config["model"].as_str().unwrap_or("claude-sonnet-4-6").to_string();
|
||||
let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32;
|
||||
let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32;
|
||||
|
||||
let (stream, analysis_id, _) = state
|
||||
.analysis
|
||||
.stream_analyze(
|
||||
ctx.tenant_id,
|
||||
ctx.user_id,
|
||||
patient_id,
|
||||
AnalysisType::Trends,
|
||||
patient_id.to_string(),
|
||||
prompt.system_prompt,
|
||||
prompt.user_prompt_template,
|
||||
serde_json::json!({"placeholder": true}),
|
||||
model,
|
||||
temperature,
|
||||
max_tokens,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend");
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
pub async fn stream_report_summary() -> impl IntoResponse {
|
||||
"stub"
|
||||
|
||||
pub async fn stream_checkup_plan<S>(
|
||||
State(state): State<AiState>,
|
||||
Extension(ctx): Extension<TenantContext>,
|
||||
Json(body): Json<AnalyzeBody>,
|
||||
) -> Result<Sse<impl futures::Stream<Item = Result<Event, Infallible>>>, erp_core::error::AppError>
|
||||
where
|
||||
AiState: FromRef<S>,
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
require_permission(&ctx, "ai.analysis.manage")?;
|
||||
let patient_id = body.patient_id.ok_or_else(|| {
|
||||
erp_core::error::AppError::Validation("patient_id 必填".into())
|
||||
})?;
|
||||
|
||||
let prompt = state
|
||||
.prompt
|
||||
.get_active_prompt(ctx.tenant_id, "personalized_checkup_plan")
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let model_config = &prompt.model_config;
|
||||
let model = model_config["model"].as_str().unwrap_or("claude-sonnet-4-6").to_string();
|
||||
let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32;
|
||||
let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32;
|
||||
|
||||
let (stream, analysis_id, _) = state
|
||||
.analysis
|
||||
.stream_analyze(
|
||||
ctx.tenant_id,
|
||||
ctx.user_id,
|
||||
patient_id,
|
||||
AnalysisType::CheckupPlan,
|
||||
patient_id.to_string(),
|
||||
prompt.system_prompt,
|
||||
prompt.user_prompt_template,
|
||||
serde_json::json!({"placeholder": true}),
|
||||
model,
|
||||
temperature,
|
||||
max_tokens,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan");
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
pub async fn list_analysis() -> impl IntoResponse {
|
||||
"stub"
|
||||
|
||||
pub async fn stream_report_summary<S>(
|
||||
State(state): State<AiState>,
|
||||
Extension(ctx): Extension<TenantContext>,
|
||||
Json(body): Json<AnalyzeBody>,
|
||||
) -> Result<Sse<impl futures::Stream<Item = Result<Event, Infallible>>>, erp_core::error::AppError>
|
||||
where
|
||||
AiState: FromRef<S>,
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
require_permission(&ctx, "ai.analysis.manage")?;
|
||||
let report_id = body.report_id.ok_or_else(|| {
|
||||
erp_core::error::AppError::Validation("report_id 必填".into())
|
||||
})?;
|
||||
|
||||
let prompt = state
|
||||
.prompt
|
||||
.get_active_prompt(ctx.tenant_id, "report_summary_generation")
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let model_config = &prompt.model_config;
|
||||
let model = model_config["model"].as_str().unwrap_or("claude-sonnet-4-6").to_string();
|
||||
let temperature = model_config["temperature"].as_f64().unwrap_or(0.3) as f32;
|
||||
let max_tokens = model_config["max_tokens"].as_u64().unwrap_or(2048) as u32;
|
||||
|
||||
let (stream, analysis_id, _) = state
|
||||
.analysis
|
||||
.stream_analyze(
|
||||
ctx.tenant_id,
|
||||
ctx.user_id,
|
||||
uuid::Uuid::nil(),
|
||||
AnalysisType::ReportSummary,
|
||||
report_id.to_string(),
|
||||
prompt.system_prompt,
|
||||
prompt.user_prompt_template,
|
||||
serde_json::json!({"placeholder": true}),
|
||||
model,
|
||||
temperature,
|
||||
max_tokens,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| erp_core::error::AppError::Internal(e.to_string()))?;
|
||||
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary");
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
pub async fn get_analysis() -> impl IntoResponse {
|
||||
"stub"
|
||||
|
||||
// === 分析历史 ===
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ListAnalysisQuery {
|
||||
pub patient_id: Option<uuid::Uuid>,
|
||||
pub analysis_type: Option<String>,
|
||||
pub page: Option<u64>,
|
||||
pub page_size: Option<u64>,
|
||||
}
|
||||
|
||||
pub async fn list_analysis<S>(
|
||||
State(_state): State<AiState>,
|
||||
Extension(ctx): Extension<TenantContext>,
|
||||
Query(_params): Query<ListAnalysisQuery>,
|
||||
) -> Result<Json<ApiResponse<()>>, erp_core::error::AppError>
|
||||
where
|
||||
AiState: FromRef<S>,
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
require_permission(&ctx, "ai.analysis.list")?;
|
||||
Ok(Json(ApiResponse::ok(())))
|
||||
}
|
||||
|
||||
pub async fn get_analysis<S>(
|
||||
State(_state): State<AiState>,
|
||||
Extension(ctx): Extension<TenantContext>,
|
||||
Path(_id): Path<uuid::Uuid>,
|
||||
) -> Result<Json<ApiResponse<()>>, erp_core::error::AppError>
|
||||
where
|
||||
AiState: FromRef<S>,
|
||||
S: Clone + Send + Sync + 'static,
|
||||
{
|
||||
require_permission(&ctx, "ai.analysis.list")?;
|
||||
Ok(Json(ApiResponse::ok(())))
|
||||
}
|
||||
|
||||
// === SSE 流构建辅助 ===
|
||||
|
||||
fn build_sse_stream(
|
||||
stream: std::pin::Pin<Box<dyn futures::Stream<Item = crate::error::AiResult<String>> + Send>>,
|
||||
analysis_id: uuid::Uuid,
|
||||
state: AiState,
|
||||
analysis_type: &'static str,
|
||||
) -> impl futures::Stream<Item = Result<Event, Infallible>> {
|
||||
async_stream::stream! {
|
||||
let mut full_content = String::new();
|
||||
let mut index: u32 = 0;
|
||||
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(chunk) => {
|
||||
full_content.push_str(&chunk);
|
||||
index += 1;
|
||||
let event = AnalysisSseEvent::Chunk { content: chunk, index };
|
||||
let data = serde_json::to_string(&event).unwrap_or_default();
|
||||
yield Ok(Event::default().data(data));
|
||||
}
|
||||
Err(e) => {
|
||||
let event = AnalysisSseEvent::Error { message: e.to_string() };
|
||||
let data = serde_json::to_string(&event).unwrap_or_default();
|
||||
yield Ok(Event::default().data(data));
|
||||
let _ = state.analysis.fail_analysis(analysis_id, e.to_string()).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let metadata = serde_json::json!({"analysis_type": analysis_type});
|
||||
let _ = state.analysis.complete_analysis(analysis_id, full_content, metadata).await;
|
||||
|
||||
let done_event = AnalysisSseEvent::Done {
|
||||
analysis_id,
|
||||
status: "completed".into(),
|
||||
};
|
||||
let data = serde_json::to_string(&done_event).unwrap_or_default();
|
||||
yield Ok(Event::default().data(data));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user