From 0aab27295cb1f5257d2e56a8da572a2d31409382 Mon Sep 17 00:00:00 2001 From: iven Date: Tue, 28 Apr 2026 19:08:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(ai):=20=E5=AE=9E=E7=8E=B0=20AI=20=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=A1=A5=E6=8E=A5=20=E2=80=94=204=20=E4=B8=AA=20Healt?= =?UTF-8?q?hDataProvider=20=E6=96=B9=E6=B3=95=E4=BB=8E=20stub=20=E6=9B=BF?= =?UTF-8?q?=E6=8D=A2=E4=B8=BA=E7=9C=9F=E5=AE=9E=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - get_lab_report: 查询 lab_report + patient,解析 JSON items 构造 LabReportDto - get_vital_signs: 查询 vital_signs 时间序列,按指标提取 8 种体征数据 - get_patient_summary: 聚合 patient + diagnosis + medication_record + health_record - get_full_report: 查询 health_record + 关联诊断和化验报告构造章节 - AiState 新增 health_provider 字段,erp-server 注入 HealthDataProviderImpl - 4 个 SSE handler 从 placeholder JSON 改为调用 provider + sanitizer 真实数据流 --- crates/erp-ai/src/handler/mod.rs | 101 +++-- crates/erp-ai/src/state.rs | 2 + crates/erp-health/src/health_provider_impl.rs | 389 ++++++++++++++++-- crates/erp-server/src/main.rs | 4 + 4 files changed, 419 insertions(+), 77 deletions(-) diff --git a/crates/erp-ai/src/handler/mod.rs b/crates/erp-ai/src/handler/mod.rs index d36d3a0..00d3597 100644 --- a/crates/erp-ai/src/handler/mod.rs +++ b/crates/erp-ai/src/handler/mod.rs @@ -1,6 +1,7 @@ use axum::extract::{Extension, FromRef, Path, Query, State}; use axum::response::sse::{Event, KeepAlive, Sse}; use axum::Json; +use erp_core::health_provider::TimeRange; use erp_core::rbac::require_permission; use erp_core::types::{ApiResponse, TenantContext}; use futures::StreamExt; @@ -35,6 +36,12 @@ where erp_core::error::AppError::Validation("report_id 必填".into()) })?; + let lab_dto = state + .health_provider + .get_lab_report(ctx.tenant_id, report_id) + .await?; + let sanitized_data = state.analysis.sanitizer.sanitize_lab_report(&lab_dto)?; + let prompt = state .prompt .get_active_prompt(ctx.tenant_id, "lab_report_interpretation") @@ -55,7 +62,7 @@ where report_id.to_string(), prompt.system_prompt, prompt.user_prompt_template, - serde_json::json!({"placeholder": true}), + sanitized_data, model, temperature, max_tokens, @@ -65,53 +72,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = async_stream::stream! { - let mut full_content = String::new(); - let mut index: u32 = 0; - - let mut stream = std::pin::pin!(stream); - while let Some(result) = stream.next().await { - match result { - Ok(chunk) => { - full_content.push_str(&chunk); - index += 1; - let event = AnalysisSseEvent::Chunk { - content: chunk, - index, - }; - let data = serde_json::to_string(&event).unwrap_or_default(); - yield Ok(Event::default().event("chunk").data(data)); - } - Err(e) => { - let event = AnalysisSseEvent::Error { - message: e.to_string(), - }; - let data = serde_json::to_string(&event).unwrap_or_default(); - yield Ok(Event::default().event("error").data(data)); - let _ = state_clone - .analysis - .fail_analysis(analysis_id_clone, e.to_string()) - .await; - return; - } - } - } - - // 完成后存储结果 - let metadata = serde_json::json!({"analysis_type": "lab_report"}); - let _ = state_clone - .analysis - .complete_analysis(analysis_id_clone, full_content, metadata) - .await; - - let done_event = AnalysisSseEvent::Done { - analysis_id: analysis_id_clone, - status: "completed".into(), - }; - let data = serde_json::to_string(&done_event).unwrap_or_default(); - yield Ok(Event::default().event("done").data(data)); - }; - + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report"); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -129,6 +90,26 @@ where erp_core::error::AppError::Validation("patient_id 必填".into()) })?; + let metrics = body.metrics.unwrap_or_else(|| { + vec![ + "systolic_bp_morning".into(), + "diastolic_bp_morning".into(), + "heart_rate".into(), + "weight".into(), + "blood_sugar".into(), + ] + }); + let range = TimeRange { + start: chrono::Utc::now() - chrono::Duration::days(90), + end: chrono::Utc::now(), + }; + + let vital_dtos = state + .health_provider + .get_vital_signs(ctx.tenant_id, patient_id, &metrics, &range) + .await?; + let sanitized_data = state.analysis.sanitizer.sanitize_vital_signs(&vital_dtos)?; + let prompt = state .prompt .get_active_prompt(ctx.tenant_id, "health_trend_analysis") @@ -149,7 +130,7 @@ where patient_id.to_string(), prompt.system_prompt, prompt.user_prompt_template, - serde_json::json!({"placeholder": true}), + sanitized_data, model, temperature, max_tokens, @@ -177,6 +158,15 @@ where erp_core::error::AppError::Validation("patient_id 必填".into()) })?; + let summary_dto = state + .health_provider + .get_patient_summary(ctx.tenant_id, patient_id) + .await?; + let sanitized_data = state + .analysis + .sanitizer + .sanitize_patient_summary(&summary_dto)?; + let prompt = state .prompt .get_active_prompt(ctx.tenant_id, "personalized_checkup_plan") @@ -197,7 +187,7 @@ where patient_id.to_string(), prompt.system_prompt, prompt.user_prompt_template, - serde_json::json!({"placeholder": true}), + sanitized_data, model, temperature, max_tokens, @@ -225,6 +215,15 @@ where erp_core::error::AppError::Validation("report_id 必填".into()) })?; + let report_dto = state + .health_provider + .get_full_report(ctx.tenant_id, report_id) + .await?; + let sanitized_data = state + .analysis + .sanitizer + .sanitize_health_report(&report_dto)?; + let prompt = state .prompt .get_active_prompt(ctx.tenant_id, "report_summary_generation") @@ -245,7 +244,7 @@ where report_id.to_string(), prompt.system_prompt, prompt.user_prompt_template, - serde_json::json!({"placeholder": true}), + sanitized_data, model, temperature, max_tokens, diff --git a/crates/erp-ai/src/state.rs b/crates/erp-ai/src/state.rs index d6f7cb9..8f11351 100644 --- a/crates/erp-ai/src/state.rs +++ b/crates/erp-ai/src/state.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use erp_core::events::EventBus; +use erp_core::health_provider::HealthDataProvider; use sea_orm::DatabaseConnection; use crate::service::analysis::AnalysisService; @@ -14,4 +15,5 @@ pub struct AiState { pub analysis: Arc, pub prompt: Arc, pub usage: Arc, + pub health_provider: Arc, } diff --git a/crates/erp-health/src/health_provider_impl.rs b/crates/erp-health/src/health_provider_impl.rs index 6fa16dd..07ed3f0 100644 --- a/crates/erp-health/src/health_provider_impl.rs +++ b/crates/erp-health/src/health_provider_impl.rs @@ -1,61 +1,398 @@ use async_trait::async_trait; +use chrono::Datelike; use erp_core::error::{AppError, AppResult}; +use num_traits::ToPrimitive; use erp_core::health_provider::{ - HealthDataProvider, HealthReportDto, LabReportDto, PatientSummaryDto, TimeRange, VitalSignDto, + HealthDataProvider, HealthReportDto, LabItemDto, LabReportDto, PatientSummaryDto, + ReportSectionDto, TimeRange, VitalSignDto, }; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; use uuid::Uuid; -/// # Experimental -/// -/// 此实现为渐进式开发中的 stub。所有方法当前返回 "尚未实现" 错误。 -/// 调用方不应在生产路径中依赖此实现。等 AI 集成需求明确后将渐进实现各方法。 -/// 参见: docs/superpowers/specs/2026-04-25-notification-realtime-architecture-design.md Phase E +use crate::entity::{diagnosis, health_record, lab_report, medication_record, patient, vital_signs}; + pub struct HealthDataProviderImpl { pub db: sea_orm::DatabaseConnection, } -macro_rules! stub_unimplemented { - ($method:ident) => { - Err(AppError::Internal(format!( - "HealthDataProvider::{} 尚未实现 — 此 trait 为 experimental,不应在生产路径中调用", - stringify!($method), - ))) +fn compute_age_group(birth_date: Option) -> String { + let Some(bd) = birth_date else { + return "未知".to_string(); }; + let age = (chrono::Utc::now().date_naive().year() - bd.year()) as i32; + match age { + a if a < 14 => "儿童", + a if a < 36 => "青年", + a if a < 56 => "中年", + _ => "老年", + } + .to_string() +} + +async fn find_patient( + db: &sea_orm::DatabaseConnection, + tenant_id: Uuid, + patient_id: Uuid, +) -> AppResult { + patient::Entity::find_by_id(patient_id) + .filter(patient::Column::TenantId.eq(tenant_id)) + .filter(patient::Column::DeletedAt.is_null()) + .one(db) + .await? + .ok_or_else(|| AppError::NotFound(format!("患者 {patient_id} 不存在"))) +} + +async fn find_lab_report( + db: &sea_orm::DatabaseConnection, + tenant_id: Uuid, + report_id: Uuid, +) -> AppResult { + lab_report::Entity::find_by_id(report_id) + .filter(lab_report::Column::TenantId.eq(tenant_id)) + .filter(lab_report::Column::DeletedAt.is_null()) + .one(db) + .await? + .ok_or_else(|| AppError::NotFound(format!("化验报告 {report_id} 不存在"))) +} + +fn parse_lab_items(items_json: &Option) -> Vec { + let Some(arr) = items_json.as_ref().and_then(|v| v.as_array()) else { + return vec![]; + }; + arr.iter() + .filter_map(|item| { + let name = item.get("name")?.as_str()?.to_string(); + let value = item.get("value").and_then(|v| v.as_f64()).unwrap_or(0.0); + let unit = item + .get("unit") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let low = item + .get("reference_low") + .and_then(|v| v.as_f64()) + .map(|l| l.to_string()); + let high = item + .get("reference_high") + .and_then(|v| v.as_f64()) + .map(|h| h.to_string()); + let is_abnormal = item + .get("is_abnormal") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let reference_range = match (low, high) { + (Some(l), Some(h)) => format!("{l}-{h}"), + (Some(l), None) => format!(">={l}"), + (None, Some(h)) => format!("<={h}"), + _ => "-".to_string(), + }; + Some(LabItemDto { + name, + value, + unit, + reference_range, + is_abnormal, + }) + }) + .collect() +} + +fn report_type_to_department(report_type: &str) -> &str { + match report_type { + "kidney_function" => "肾内科", + "blood_routine" => "血液科", + "electrolyte" => "检验科", + "liver_function" => "肝胆科", + _ => "检验科", + } } #[async_trait] impl HealthDataProvider for HealthDataProviderImpl { async fn get_lab_report( &self, - _tenant_id: Uuid, - _report_id: Uuid, + tenant_id: Uuid, + report_id: Uuid, ) -> AppResult { - stub_unimplemented!(get_lab_report) + let report = find_lab_report(&self.db, tenant_id, report_id).await?; + let patient = find_patient(&self.db, tenant_id, report.patient_id).await?; + + Ok(LabReportDto { + age_group: compute_age_group(patient.birth_date), + sex: patient.gender.unwrap_or_else(|| "未知".to_string()), + department: report_type_to_department(&report.report_type).to_string(), + report_date: report.report_date.to_string(), + items: parse_lab_items(&report.items), + }) } async fn get_vital_signs( &self, - _tenant_id: Uuid, - _patient_id: Uuid, - _metrics: &[String], - _range: &TimeRange, + tenant_id: Uuid, + patient_id: Uuid, + metrics: &[String], + range: &TimeRange, ) -> AppResult> { - stub_unimplemented!(get_vital_signs) + let _ = find_patient(&self.db, tenant_id, patient_id).await?; + + let start_date = range.start.date_naive(); + let end_date = range.end.date_naive(); + + let records = vital_signs::Entity::find() + .filter(vital_signs::Column::TenantId.eq(tenant_id)) + .filter(vital_signs::Column::PatientId.eq(patient_id)) + .filter(vital_signs::Column::DeletedAt.is_null()) + .filter(vital_signs::Column::RecordDate.gte(start_date)) + .filter(vital_signs::Column::RecordDate.lte(end_date)) + .order_by_asc(vital_signs::Column::RecordDate) + .all(&self.db) + .await?; + + let metric_extractors: [(&str, Box Option>); 8] = [ + ( + "systolic_bp_morning", + Box::new(|r| r.systolic_bp_morning.map(|v| v as f64)), + ), + ( + "diastolic_bp_morning", + Box::new(|r| r.diastolic_bp_morning.map(|v| v as f64)), + ), + ( + "heart_rate", + Box::new(|r| r.heart_rate.map(|v| v as f64)), + ), + ( + "weight", + Box::new(|r| r.weight.map(|v| v.to_f64().unwrap_or(0.0))), + ), + ( + "blood_sugar", + Box::new(|r| r.blood_sugar.map(|v| v.to_f64().unwrap_or(0.0))), + ), + ( + "body_temperature", + Box::new(|r| r.body_temperature.map(|v| v.to_f64().unwrap_or(0.0))), + ), + ("spo2", Box::new(|r| r.spo2.map(|v| v as f64))), + ( + "urine_output_ml", + Box::new(|r| r.urine_output_ml.map(|v| v as f64)), + ), + ]; + + let mut result = Vec::new(); + + for (metric_name, extractor) in &metric_extractors { + if !metrics.is_empty() && !metrics.iter().any(|m| m == *metric_name) { + continue; + } + + let values: Vec<(String, f64)> = records + .iter() + .filter_map(|r| { + extractor(r).map(|v| (r.record_date.to_string(), v)) + }) + .collect(); + + if values.is_empty() { + continue; + } + + let unit = match *metric_name { + "systolic_bp_morning" | "diastolic_bp_morning" => "mmHg", + "heart_rate" => "bpm", + "weight" => "kg", + "blood_sugar" => "mmol/L", + "body_temperature" => "°C", + "spo2" => "%", + "urine_output_ml" => "ml", + _ => "", + }; + + result.push(VitalSignDto { + metric: metric_name.to_string(), + values, + unit: unit.to_string(), + }); + } + + Ok(result) } async fn get_patient_summary( &self, - _tenant_id: Uuid, - _patient_id: Uuid, + tenant_id: Uuid, + patient_id: Uuid, ) -> AppResult { - stub_unimplemented!(get_patient_summary) + let patient = find_patient(&self.db, tenant_id, patient_id).await?; + + let diagnoses: Vec = diagnosis::Entity::find() + .filter(diagnosis::Column::TenantId.eq(tenant_id)) + .filter(diagnosis::Column::PatientId.eq(patient_id)) + .filter(diagnosis::Column::DeletedAt.is_null()) + .filter(diagnosis::Column::Status.eq("active")) + .order_by_desc(diagnosis::Column::DiagnosedDate) + .all(&self.db) + .await? + .iter() + .map(|d| format!("{}({})", d.diagnosis_name, d.icd_code)) + .collect(); + + let medications: Vec = medication_record::Entity::find() + .filter(medication_record::Column::TenantId.eq(tenant_id)) + .filter(medication_record::Column::PatientId.eq(patient_id)) + .filter(medication_record::Column::DeletedAt.is_null()) + .filter(medication_record::Column::IsCurrent.eq(true)) + .all(&self.db) + .await? + .iter() + .map(|m| { + let mut s = m.medication_name.clone(); + if let Some(ref dosage) = m.dosage { + s.push_str(&format!(" {dosage}")); + } + s + }) + .collect(); + + let family_history = patient + .medical_history_summary + .as_ref() + .map(|h| { + h.split(';') + .chain(h.split(';')) + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + }) + .unwrap_or_default(); + + let last_checkup = health_record::Entity::find() + .filter(health_record::Column::TenantId.eq(tenant_id)) + .filter(health_record::Column::PatientId.eq(patient_id)) + .filter(health_record::Column::DeletedAt.is_null()) + .order_by_desc(health_record::Column::RecordDate) + .one(&self.db) + .await?; + + let last_checkup_date = last_checkup + .map(|r| r.record_date.to_string()) + .unwrap_or_else(|| "无".to_string()); + + Ok(PatientSummaryDto { + age_group: compute_age_group(patient.birth_date), + sex: patient.gender.unwrap_or_else(|| "未知".to_string()), + chronic_conditions: diagnoses, + medications, + family_history, + last_checkup_date, + }) } async fn get_full_report( &self, - _tenant_id: Uuid, - _report_id: Uuid, + tenant_id: Uuid, + report_id: Uuid, ) -> AppResult { - stub_unimplemented!(get_full_report) + let record = health_record::Entity::find_by_id(report_id) + .filter(health_record::Column::TenantId.eq(tenant_id)) + .filter(health_record::Column::DeletedAt.is_null()) + .one(&self.db) + .await? + .ok_or_else(|| AppError::NotFound(format!("健康报告 {report_id} 不存在")))?; + + let patient = find_patient(&self.db, tenant_id, record.patient_id).await?; + + let mut sections = Vec::new(); + + let findings: Vec = record + .overall_assessment + .as_ref() + .map(|a| { + a.split(';') + .chain(a.split(';')) + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + }) + .unwrap_or_default(); + + if !findings.is_empty() { + sections.push(ReportSectionDto { + title: "总体评估".to_string(), + findings, + abnormal_items: vec![], + }); + } + + let report_diagnoses = diagnosis::Entity::find() + .filter(diagnosis::Column::TenantId.eq(tenant_id)) + .filter(diagnosis::Column::PatientId.eq(record.patient_id)) + .filter(diagnosis::Column::DeletedAt.is_null()) + .filter( + diagnosis::Column::HealthRecordId + .eq(report_id) + .or(diagnosis::Column::Status.eq("active")), + ) + .all(&self.db) + .await?; + + if !report_diagnoses.is_empty() { + let (abnormal, findings): (Vec<_>, Vec<_>) = report_diagnoses + .iter() + .partition(|d| d.status == "active"); + sections.push(ReportSectionDto { + title: "诊断记录".to_string(), + findings: findings + .iter() + .map(|d| format!("{}({}) — {}", d.diagnosis_name, d.icd_code, d.diagnosed_date)) + .collect(), + abnormal_items: abnormal + .iter() + .map(|d| format!("{}({})", d.diagnosis_name, d.icd_code)) + .collect(), + }); + } + + let lab_reports = lab_report::Entity::find() + .filter(lab_report::Column::TenantId.eq(tenant_id)) + .filter(lab_report::Column::PatientId.eq(record.patient_id)) + .filter(lab_report::Column::DeletedAt.is_null()) + .filter( + lab_report::Column::ReportDate + .gte(record.record_date - chrono::Duration::days(30)), + ) + .filter(lab_report::Column::ReportDate.lte(record.record_date)) + .order_by_desc(lab_report::Column::ReportDate) + .all(&self.db) + .await?; + + for lr in &lab_reports { + let items = parse_lab_items(&lr.items); + let abnormal: Vec = items + .iter() + .filter(|i| i.is_abnormal) + .map(|i| format!("{} {}{}", i.name, i.value, i.unit)) + .collect(); + let findings: Vec = items + .iter() + .map(|i| format!("{}: {}{} ({})", i.name, i.value, i.unit, i.reference_range)) + .collect(); + if !findings.is_empty() { + sections.push(ReportSectionDto { + title: format!("化验报告 — {} ({})", lr.report_type, lr.report_date), + findings, + abnormal_items: abnormal, + }); + } + } + + Ok(HealthReportDto { + age_group: compute_age_group(patient.birth_date), + sex: patient.gender.unwrap_or_else(|| "未知".to_string()), + department: record.record_type.clone(), + report_date: record.record_date.to_string(), + sections, + }) } } diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index c2912f8..8e59d5a 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -467,12 +467,16 @@ async fn main() -> anyhow::Result<()> { ); let prompt = std::sync::Arc::new(erp_ai::service::prompt::PromptService::new(db.clone())); let usage = std::sync::Arc::new(erp_ai::service::usage::UsageService::new(db.clone())); + let health_provider = std::sync::Arc::new(erp_health::HealthDataProviderImpl { + db: db.clone(), + }); erp_ai::AiState { db: db.clone(), event_bus: event_bus.clone(), analysis, prompt, usage, + health_provider, } };