feat(ai): 实现 AI 数据桥接 — 4 个 HealthDataProvider 方法从 stub 替换为真实查询
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

- get_lab_report: 查询 lab_report + patient,解析 JSON items 构造 LabReportDto
- get_vital_signs: 查询 vital_signs 时间序列,按指标提取 8 种体征数据
- get_patient_summary: 聚合 patient + diagnosis + medication_record + health_record
- get_full_report: 查询 health_record + 关联诊断和化验报告构造章节
- AiState 新增 health_provider 字段,erp-server 注入 HealthDataProviderImpl
- 4 个 SSE handler 从 placeholder JSON 改为调用 provider + sanitizer 真实数据流
This commit is contained in:
iven
2026-04-28 19:08:38 +08:00
parent ace04ee56d
commit 0aab27295c
4 changed files with 419 additions and 77 deletions

View File

@@ -1,6 +1,7 @@
use axum::extract::{Extension, FromRef, Path, Query, State};
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::Json;
use erp_core::health_provider::TimeRange;
use erp_core::rbac::require_permission;
use erp_core::types::{ApiResponse, TenantContext};
use futures::StreamExt;
@@ -35,6 +36,12 @@ where
erp_core::error::AppError::Validation("report_id 必填".into())
})?;
let lab_dto = state
.health_provider
.get_lab_report(ctx.tenant_id, report_id)
.await?;
let sanitized_data = state.analysis.sanitizer.sanitize_lab_report(&lab_dto)?;
let prompt = state
.prompt
.get_active_prompt(ctx.tenant_id, "lab_report_interpretation")
@@ -55,7 +62,7 @@ where
report_id.to_string(),
prompt.system_prompt,
prompt.user_prompt_template,
serde_json::json!({"placeholder": true}),
sanitized_data,
model,
temperature,
max_tokens,
@@ -65,53 +72,7 @@ where
let analysis_id_clone = analysis_id;
let state_clone = state.clone();
let sse_stream = async_stream::stream! {
let mut full_content = String::new();
let mut index: u32 = 0;
let mut stream = std::pin::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => {
full_content.push_str(&chunk);
index += 1;
let event = AnalysisSseEvent::Chunk {
content: chunk,
index,
};
let data = serde_json::to_string(&event).unwrap_or_default();
yield Ok(Event::default().event("chunk").data(data));
}
Err(e) => {
let event = AnalysisSseEvent::Error {
message: e.to_string(),
};
let data = serde_json::to_string(&event).unwrap_or_default();
yield Ok(Event::default().event("error").data(data));
let _ = state_clone
.analysis
.fail_analysis(analysis_id_clone, e.to_string())
.await;
return;
}
}
}
// 完成后存储结果
let metadata = serde_json::json!({"analysis_type": "lab_report"});
let _ = state_clone
.analysis
.complete_analysis(analysis_id_clone, full_content, metadata)
.await;
let done_event = AnalysisSseEvent::Done {
analysis_id: analysis_id_clone,
status: "completed".into(),
};
let data = serde_json::to_string(&done_event).unwrap_or_default();
yield Ok(Event::default().event("done").data(data));
};
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report");
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
@@ -129,6 +90,26 @@ where
erp_core::error::AppError::Validation("patient_id 必填".into())
})?;
let metrics = body.metrics.unwrap_or_else(|| {
vec![
"systolic_bp_morning".into(),
"diastolic_bp_morning".into(),
"heart_rate".into(),
"weight".into(),
"blood_sugar".into(),
]
});
let range = TimeRange {
start: chrono::Utc::now() - chrono::Duration::days(90),
end: chrono::Utc::now(),
};
let vital_dtos = state
.health_provider
.get_vital_signs(ctx.tenant_id, patient_id, &metrics, &range)
.await?;
let sanitized_data = state.analysis.sanitizer.sanitize_vital_signs(&vital_dtos)?;
let prompt = state
.prompt
.get_active_prompt(ctx.tenant_id, "health_trend_analysis")
@@ -149,7 +130,7 @@ where
patient_id.to_string(),
prompt.system_prompt,
prompt.user_prompt_template,
serde_json::json!({"placeholder": true}),
sanitized_data,
model,
temperature,
max_tokens,
@@ -177,6 +158,15 @@ where
erp_core::error::AppError::Validation("patient_id 必填".into())
})?;
let summary_dto = state
.health_provider
.get_patient_summary(ctx.tenant_id, patient_id)
.await?;
let sanitized_data = state
.analysis
.sanitizer
.sanitize_patient_summary(&summary_dto)?;
let prompt = state
.prompt
.get_active_prompt(ctx.tenant_id, "personalized_checkup_plan")
@@ -197,7 +187,7 @@ where
patient_id.to_string(),
prompt.system_prompt,
prompt.user_prompt_template,
serde_json::json!({"placeholder": true}),
sanitized_data,
model,
temperature,
max_tokens,
@@ -225,6 +215,15 @@ where
erp_core::error::AppError::Validation("report_id 必填".into())
})?;
let report_dto = state
.health_provider
.get_full_report(ctx.tenant_id, report_id)
.await?;
let sanitized_data = state
.analysis
.sanitizer
.sanitize_health_report(&report_dto)?;
let prompt = state
.prompt
.get_active_prompt(ctx.tenant_id, "report_summary_generation")
@@ -245,7 +244,7 @@ where
report_id.to_string(),
prompt.system_prompt,
prompt.user_prompt_template,
serde_json::json!({"placeholder": true}),
sanitized_data,
model,
temperature,
max_tokens,

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use erp_core::events::EventBus;
use erp_core::health_provider::HealthDataProvider;
use sea_orm::DatabaseConnection;
use crate::service::analysis::AnalysisService;
@@ -14,4 +15,5 @@ pub struct AiState {
pub analysis: Arc<AnalysisService>,
pub prompt: Arc<PromptService>,
pub usage: Arc<UsageService>,
pub health_provider: Arc<dyn HealthDataProvider>,
}

View File

@@ -1,61 +1,398 @@
use async_trait::async_trait;
use chrono::Datelike;
use erp_core::error::{AppError, AppResult};
use num_traits::ToPrimitive;
use erp_core::health_provider::{
HealthDataProvider, HealthReportDto, LabReportDto, PatientSummaryDto, TimeRange, VitalSignDto,
HealthDataProvider, HealthReportDto, LabItemDto, LabReportDto, PatientSummaryDto,
ReportSectionDto, TimeRange, VitalSignDto,
};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
use uuid::Uuid;
/// # Experimental
///
/// 此实现为渐进式开发中的 stub。所有方法当前返回 "尚未实现" 错误。
/// 调用方不应在生产路径中依赖此实现。等 AI 集成需求明确后将渐进实现各方法。
/// 参见: docs/superpowers/specs/2026-04-25-notification-realtime-architecture-design.md Phase E
use crate::entity::{diagnosis, health_record, lab_report, medication_record, patient, vital_signs};
pub struct HealthDataProviderImpl {
pub db: sea_orm::DatabaseConnection,
}
macro_rules! stub_unimplemented {
($method:ident) => {
Err(AppError::Internal(format!(
"HealthDataProvider::{} 尚未实现 — 此 trait 为 experimental不应在生产路径中调用",
stringify!($method),
)))
fn compute_age_group(birth_date: Option<chrono::NaiveDate>) -> String {
let Some(bd) = birth_date else {
return "未知".to_string();
};
let age = (chrono::Utc::now().date_naive().year() - bd.year()) as i32;
match age {
a if a < 14 => "儿童",
a if a < 36 => "青年",
a if a < 56 => "中年",
_ => "老年",
}
.to_string()
}
async fn find_patient(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
) -> AppResult<patient::Model> {
patient::Entity::find_by_id(patient_id)
.filter(patient::Column::TenantId.eq(tenant_id))
.filter(patient::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| AppError::NotFound(format!("患者 {patient_id} 不存在")))
}
async fn find_lab_report(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
report_id: Uuid,
) -> AppResult<lab_report::Model> {
lab_report::Entity::find_by_id(report_id)
.filter(lab_report::Column::TenantId.eq(tenant_id))
.filter(lab_report::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| AppError::NotFound(format!("化验报告 {report_id} 不存在")))
}
fn parse_lab_items(items_json: &Option<serde_json::Value>) -> Vec<LabItemDto> {
let Some(arr) = items_json.as_ref().and_then(|v| v.as_array()) else {
return vec![];
};
arr.iter()
.filter_map(|item| {
let name = item.get("name")?.as_str()?.to_string();
let value = item.get("value").and_then(|v| v.as_f64()).unwrap_or(0.0);
let unit = item
.get("unit")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let low = item
.get("reference_low")
.and_then(|v| v.as_f64())
.map(|l| l.to_string());
let high = item
.get("reference_high")
.and_then(|v| v.as_f64())
.map(|h| h.to_string());
let is_abnormal = item
.get("is_abnormal")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let reference_range = match (low, high) {
(Some(l), Some(h)) => format!("{l}-{h}"),
(Some(l), None) => format!(">={l}"),
(None, Some(h)) => format!("<={h}"),
_ => "-".to_string(),
};
Some(LabItemDto {
name,
value,
unit,
reference_range,
is_abnormal,
})
})
.collect()
}
fn report_type_to_department(report_type: &str) -> &str {
match report_type {
"kidney_function" => "肾内科",
"blood_routine" => "血液科",
"electrolyte" => "检验科",
"liver_function" => "肝胆科",
_ => "检验科",
}
}
#[async_trait]
impl HealthDataProvider for HealthDataProviderImpl {
async fn get_lab_report(
&self,
_tenant_id: Uuid,
_report_id: Uuid,
tenant_id: Uuid,
report_id: Uuid,
) -> AppResult<LabReportDto> {
stub_unimplemented!(get_lab_report)
let report = find_lab_report(&self.db, tenant_id, report_id).await?;
let patient = find_patient(&self.db, tenant_id, report.patient_id).await?;
Ok(LabReportDto {
age_group: compute_age_group(patient.birth_date),
sex: patient.gender.unwrap_or_else(|| "未知".to_string()),
department: report_type_to_department(&report.report_type).to_string(),
report_date: report.report_date.to_string(),
items: parse_lab_items(&report.items),
})
}
async fn get_vital_signs(
&self,
_tenant_id: Uuid,
_patient_id: Uuid,
_metrics: &[String],
_range: &TimeRange,
tenant_id: Uuid,
patient_id: Uuid,
metrics: &[String],
range: &TimeRange,
) -> AppResult<Vec<VitalSignDto>> {
stub_unimplemented!(get_vital_signs)
let _ = find_patient(&self.db, tenant_id, patient_id).await?;
let start_date = range.start.date_naive();
let end_date = range.end.date_naive();
let records = vital_signs::Entity::find()
.filter(vital_signs::Column::TenantId.eq(tenant_id))
.filter(vital_signs::Column::PatientId.eq(patient_id))
.filter(vital_signs::Column::DeletedAt.is_null())
.filter(vital_signs::Column::RecordDate.gte(start_date))
.filter(vital_signs::Column::RecordDate.lte(end_date))
.order_by_asc(vital_signs::Column::RecordDate)
.all(&self.db)
.await?;
let metric_extractors: [(&str, Box<dyn Fn(&vital_signs::Model) -> Option<f64>>); 8] = [
(
"systolic_bp_morning",
Box::new(|r| r.systolic_bp_morning.map(|v| v as f64)),
),
(
"diastolic_bp_morning",
Box::new(|r| r.diastolic_bp_morning.map(|v| v as f64)),
),
(
"heart_rate",
Box::new(|r| r.heart_rate.map(|v| v as f64)),
),
(
"weight",
Box::new(|r| r.weight.map(|v| v.to_f64().unwrap_or(0.0))),
),
(
"blood_sugar",
Box::new(|r| r.blood_sugar.map(|v| v.to_f64().unwrap_or(0.0))),
),
(
"body_temperature",
Box::new(|r| r.body_temperature.map(|v| v.to_f64().unwrap_or(0.0))),
),
("spo2", Box::new(|r| r.spo2.map(|v| v as f64))),
(
"urine_output_ml",
Box::new(|r| r.urine_output_ml.map(|v| v as f64)),
),
];
let mut result = Vec::new();
for (metric_name, extractor) in &metric_extractors {
if !metrics.is_empty() && !metrics.iter().any(|m| m == *metric_name) {
continue;
}
let values: Vec<(String, f64)> = records
.iter()
.filter_map(|r| {
extractor(r).map(|v| (r.record_date.to_string(), v))
})
.collect();
if values.is_empty() {
continue;
}
let unit = match *metric_name {
"systolic_bp_morning" | "diastolic_bp_morning" => "mmHg",
"heart_rate" => "bpm",
"weight" => "kg",
"blood_sugar" => "mmol/L",
"body_temperature" => "°C",
"spo2" => "%",
"urine_output_ml" => "ml",
_ => "",
};
result.push(VitalSignDto {
metric: metric_name.to_string(),
values,
unit: unit.to_string(),
});
}
Ok(result)
}
async fn get_patient_summary(
&self,
_tenant_id: Uuid,
_patient_id: Uuid,
tenant_id: Uuid,
patient_id: Uuid,
) -> AppResult<PatientSummaryDto> {
stub_unimplemented!(get_patient_summary)
let patient = find_patient(&self.db, tenant_id, patient_id).await?;
let diagnoses: Vec<String> = diagnosis::Entity::find()
.filter(diagnosis::Column::TenantId.eq(tenant_id))
.filter(diagnosis::Column::PatientId.eq(patient_id))
.filter(diagnosis::Column::DeletedAt.is_null())
.filter(diagnosis::Column::Status.eq("active"))
.order_by_desc(diagnosis::Column::DiagnosedDate)
.all(&self.db)
.await?
.iter()
.map(|d| format!("{}({})", d.diagnosis_name, d.icd_code))
.collect();
let medications: Vec<String> = medication_record::Entity::find()
.filter(medication_record::Column::TenantId.eq(tenant_id))
.filter(medication_record::Column::PatientId.eq(patient_id))
.filter(medication_record::Column::DeletedAt.is_null())
.filter(medication_record::Column::IsCurrent.eq(true))
.all(&self.db)
.await?
.iter()
.map(|m| {
let mut s = m.medication_name.clone();
if let Some(ref dosage) = m.dosage {
s.push_str(&format!(" {dosage}"));
}
s
})
.collect();
let family_history = patient
.medical_history_summary
.as_ref()
.map(|h| {
h.split('')
.chain(h.split(';'))
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_default();
let last_checkup = health_record::Entity::find()
.filter(health_record::Column::TenantId.eq(tenant_id))
.filter(health_record::Column::PatientId.eq(patient_id))
.filter(health_record::Column::DeletedAt.is_null())
.order_by_desc(health_record::Column::RecordDate)
.one(&self.db)
.await?;
let last_checkup_date = last_checkup
.map(|r| r.record_date.to_string())
.unwrap_or_else(|| "".to_string());
Ok(PatientSummaryDto {
age_group: compute_age_group(patient.birth_date),
sex: patient.gender.unwrap_or_else(|| "未知".to_string()),
chronic_conditions: diagnoses,
medications,
family_history,
last_checkup_date,
})
}
async fn get_full_report(
&self,
_tenant_id: Uuid,
_report_id: Uuid,
tenant_id: Uuid,
report_id: Uuid,
) -> AppResult<HealthReportDto> {
stub_unimplemented!(get_full_report)
let record = health_record::Entity::find_by_id(report_id)
.filter(health_record::Column::TenantId.eq(tenant_id))
.filter(health_record::Column::DeletedAt.is_null())
.one(&self.db)
.await?
.ok_or_else(|| AppError::NotFound(format!("健康报告 {report_id} 不存在")))?;
let patient = find_patient(&self.db, tenant_id, record.patient_id).await?;
let mut sections = Vec::new();
let findings: Vec<String> = record
.overall_assessment
.as_ref()
.map(|a| {
a.split('')
.chain(a.split(';'))
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_default();
if !findings.is_empty() {
sections.push(ReportSectionDto {
title: "总体评估".to_string(),
findings,
abnormal_items: vec![],
});
}
let report_diagnoses = diagnosis::Entity::find()
.filter(diagnosis::Column::TenantId.eq(tenant_id))
.filter(diagnosis::Column::PatientId.eq(record.patient_id))
.filter(diagnosis::Column::DeletedAt.is_null())
.filter(
diagnosis::Column::HealthRecordId
.eq(report_id)
.or(diagnosis::Column::Status.eq("active")),
)
.all(&self.db)
.await?;
if !report_diagnoses.is_empty() {
let (abnormal, findings): (Vec<_>, Vec<_>) = report_diagnoses
.iter()
.partition(|d| d.status == "active");
sections.push(ReportSectionDto {
title: "诊断记录".to_string(),
findings: findings
.iter()
.map(|d| format!("{}({}) — {}", d.diagnosis_name, d.icd_code, d.diagnosed_date))
.collect(),
abnormal_items: abnormal
.iter()
.map(|d| format!("{}({})", d.diagnosis_name, d.icd_code))
.collect(),
});
}
let lab_reports = lab_report::Entity::find()
.filter(lab_report::Column::TenantId.eq(tenant_id))
.filter(lab_report::Column::PatientId.eq(record.patient_id))
.filter(lab_report::Column::DeletedAt.is_null())
.filter(
lab_report::Column::ReportDate
.gte(record.record_date - chrono::Duration::days(30)),
)
.filter(lab_report::Column::ReportDate.lte(record.record_date))
.order_by_desc(lab_report::Column::ReportDate)
.all(&self.db)
.await?;
for lr in &lab_reports {
let items = parse_lab_items(&lr.items);
let abnormal: Vec<String> = items
.iter()
.filter(|i| i.is_abnormal)
.map(|i| format!("{} {}{}", i.name, i.value, i.unit))
.collect();
let findings: Vec<String> = items
.iter()
.map(|i| format!("{}: {}{} ({})", i.name, i.value, i.unit, i.reference_range))
.collect();
if !findings.is_empty() {
sections.push(ReportSectionDto {
title: format!("化验报告 — {} ({})", lr.report_type, lr.report_date),
findings,
abnormal_items: abnormal,
});
}
}
Ok(HealthReportDto {
age_group: compute_age_group(patient.birth_date),
sex: patient.gender.unwrap_or_else(|| "未知".to_string()),
department: record.record_type.clone(),
report_date: record.record_date.to_string(),
sections,
})
}
}

View File

@@ -467,12 +467,16 @@ async fn main() -> anyhow::Result<()> {
);
let prompt = std::sync::Arc::new(erp_ai::service::prompt::PromptService::new(db.clone()));
let usage = std::sync::Arc::new(erp_ai::service::usage::UsageService::new(db.clone()));
let health_provider = std::sync::Arc::new(erp_health::HealthDataProviderImpl {
db: db.clone(),
});
erp_ai::AiState {
db: db.clone(),
event_bus: event_bus.clone(),
analysis,
prompt,
usage,
health_provider,
}
};