Compare commits

...

8 Commits

Author SHA1 Message Date
iven
cde3a863a2 feat(health): FHIR 模块类型定义 + converter 依赖
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
2026-05-04 02:56:56 +08:00
iven
8cfc5709dc docs: 事件注册表更新 — 告警降噪 + alert.aggregated 事件 2026-05-04 02:56:40 +08:00
iven
29b47ae4e4 fix(health): OAuth 模块编译修复
- 修复 RngCore import:使用 rand_core::RngCore 替代 argon2 password_hash 重导出
- 修复 ActiveModel version/id move 问题:先读取再 unwrap
- 添加 rand_core 依赖
2026-05-04 02:54:20 +08:00
iven
2e9f6621a3 test(health): 告警降噪集成测试骨架
4 个 Testcontainers 测试用例(忽略状态)覆盖:
患者级升级阈值 + 系统级聚合窗口 + critical 不聚合 + 完整流程
2026-05-04 02:54:17 +08:00
iven
3a14b7efe3 feat(health): 日聚合查询 API — GET /health/vital-signs/daily
- 新增 DailyAggQuery DTO(patient_id/device_type/start_date/end_date)
- 新增 get_daily_aggregations handler(需 health.device-readings.list 权限)
- 路由注册到 protected_routes
2026-05-04 02:54:13 +08:00
iven
4c1d98116a feat(health): 告警聚合事件消费者 — alert.aggregated
- 新增 ALERT_AGGREGATED 常量
- alert_notifier 消费者中处理 suppressed=true 告警并发布聚合事件
- 更新事件常量测试和 consumer_id 唯一性测试
2026-05-04 02:51:13 +08:00
iven
bb5298ee0f feat(message): SSE 增强 — Event ID + 心跳保活 + Last-Event-ID + 患者订阅
- 每个 SSE 事件附加 id 字段(UUID v7)用于断点续传
- 30s timeout 心跳保活防止连接断开
- Last-Event-ID header 恢复:重连跳过已发送事件
- ?patient_ids=id1,id2 查询参数选择性订阅患者
2026-05-04 02:49:23 +08:00
iven
975d699e42 feat(health): 告警降噪集成 alert_engine + OAuth service 编译修复
- alert_engine: create_alert_and_notify 调用 noise_reducer,升级严重度+suppressed标记
- oauth/service: 修复 OsRng import + ActiveModel move 问题
- fhir/handler: linter 补全完整实现
2026-05-04 02:43:32 +08:00
25 changed files with 2348 additions and 133 deletions

3
Cargo.lock generated
View File

@@ -1527,6 +1527,7 @@ name = "erp-health"
version = "0.1.0"
dependencies = [
"aes-gcm",
"argon2",
"async-trait",
"axum",
"base64 0.22.1",
@@ -1534,7 +1535,9 @@ dependencies = [
"erp-core",
"hex",
"hmac",
"jsonwebtoken",
"num-traits",
"rand_core 0.6.4",
"sea-orm",
"serde",
"serde_json",

View File

@@ -24,3 +24,6 @@ sha2 = "0.10"
base64 = "0.22"
hex = "0.4"
zeroize = { version = "1", features = ["derive"] }
argon2.workspace = true
jsonwebtoken.workspace = true
rand_core = "0.6"

View File

@@ -0,0 +1,29 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "api_clients")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Uuid,
pub tenant_id: Uuid,
pub client_id: String,
pub client_secret_hash: String,
pub client_name: String,
pub scopes: serde_json::Value,
pub allowed_patient_ids: Option<serde_json::Value>,
pub rate_limit_per_minute: i32,
pub is_active: bool,
pub token_lifetime_seconds: i32,
pub created_at: chrono::DateTime<chrono::FixedOffset>,
pub updated_at: chrono::DateTime<chrono::FixedOffset>,
pub created_by: Option<Uuid>,
pub updated_by: Option<Uuid>,
pub deleted_at: Option<chrono::DateTime<chrono::FixedOffset>>,
pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -1,4 +1,5 @@
pub mod alert_rules;
pub mod api_client;
pub mod alerts;
pub mod appointment;
pub mod article;

View File

@@ -11,6 +11,7 @@ pub const APPOINTMENT_CREATED: &str = "appointment.created";
// 告警
pub const ALERT_TRIGGERED: &str = "alert.triggered";
pub const ALERT_AGGREGATED: &str = "alert.aggregated";
// 知情同意
pub const CONSENT_GRANTED: &str = "consent.granted";
@@ -181,6 +182,33 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_notifier").await;
}
Some(event) if event.event_type == ALERT_TRIGGERED => {
// 被抑制的告警 → 发布聚合事件
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator").await.unwrap_or(false) {
continue;
}
let is_suppressed = event.payload.get("suppressed")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_suppressed {
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let aggregated_event = erp_core::events::DomainEvent::new(
ALERT_AGGREGATED,
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": pid,
"triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()),
"severity": event.payload.get("severity"),
})),
);
alert_bus.publish(aggregated_event, &alert_db).await;
tracing::info!(patient_id = %pid, "告警聚合事件已发布");
}
}
let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_aggregator").await;
}
Some(_) => {}
None => break,
}
@@ -937,6 +965,7 @@ mod tests {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
@@ -971,6 +1000,7 @@ mod tests {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
@@ -1008,6 +1038,7 @@ mod tests {
// 确保常量值与消费者 switch 匹配中使用的硬编码字符串一致
assert_eq!(APPOINTMENT_CREATED, "appointment.created");
assert_eq!(ALERT_TRIGGERED, "alert.triggered");
assert_eq!(ALERT_AGGREGATED, "alert.aggregated");
assert_eq!(CONSENT_GRANTED, "consent.granted");
assert_eq!(CONSENT_REVOKED, "consent.revoked");
assert_eq!(ARTICLE_PUBLISHED, "article.published");
@@ -1570,6 +1601,12 @@ mod tests {
prefix,
ALERT_TRIGGERED
);
assert!(
ALERT_AGGREGATED.starts_with(prefix),
"前缀 '{}' 应覆盖 '{}'",
prefix,
ALERT_AGGREGATED
);
}
#[test]
@@ -1683,6 +1720,7 @@ mod tests {
// 收集所有消费者的 consumer_id从 mark_event_processed 调用中提取)
let consumer_ids = [
"workflow_task_consumer",
"alert_aggregator",
"alert_notifier",
"patient_welcome",
"appt_created_notifier",

View File

@@ -0,0 +1,419 @@
use crate::entity::{
appointment, consultation_session, device_readings, doctor_profile, follow_up_task,
lab_report, patient, patient_devices,
};
use crate::fhir::types::{device_type_to_category, device_type_to_loinc, device_type_to_unit};
/// HMS Patient → FHIR Patient (JSON Value)
pub fn patient_to_fhir(p: &patient::Model) -> serde_json::Value {
let mut result = serde_json::json!({
"resourceType": "Patient",
"id": p.id.to_string(),
"name": [{
"text": p.name,
}],
"meta": {
"source": "hms",
"lastUpdated": p.updated_at.to_rfc3339(),
},
});
if let Some(ref gender) = p.gender {
result["gender"] = serde_json::json!(match gender.as_str() {
"male" | "M" => "male",
"female" | "F" => "female",
"other" => "other",
_ => "unknown",
});
}
if let Some(bd) = p.birth_date {
result["birthDate"] = serde_json::json!(bd.to_string());
}
if let Some(ref id_number) = p.id_number {
result["identifier"] = serde_json::json!([{
"system": "urn:oid:2.16.156.10011.1.3",
"value": id_number,
}]);
}
result
}
/// HMS DeviceReading → FHIR Observation血压拆分为多个其余为单个
pub fn device_reading_to_fhir_observations(r: &device_readings::Model) -> Vec<serde_json::Value> {
let mut results = Vec::new();
let patient_ref = serde_json::json!({"reference": format!("Patient/{}", r.patient_id)});
let device_ref = r.device_id.as_ref().map(|d| {
serde_json::json!({"reference": format!("Device/{}", d)})
});
let measured = r.measured_at.to_rfc3339();
let category = device_type_to_category(&r.device_type);
let category_json = serde_json::json!([{
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/observation-category",
"code": category,
}]
}]);
match r.device_type.as_str() {
"blood_pressure" => {
let sys = r.raw_value.get("systolic").and_then(|v| v.as_f64());
let dia = r.raw_value.get("diastolic").and_then(|v| v.as_f64());
if let Some(val) = sys {
results.push(make_observation(
&r.id, "8480-6", "Systolic blood pressure",
category_json.clone(), &patient_ref, device_ref.as_ref(),
&measured, val, "mmHg", "mm[Hg]",
));
}
if let Some(val) = dia {
results.push(make_observation(
&r.id, "8462-4", "Diastolic blood pressure",
category_json, &patient_ref, device_ref.as_ref(),
&measured, val, "mmHg", "mm[Hg]",
));
}
}
_ => {
let (loinc_code, loinc_display) = device_type_to_loinc(&r.device_type)
.unwrap_or(("unknown", "Unknown"));
let (unit_display, unit_code) = device_type_to_unit(&r.device_type);
let val = extract_main_value(&r.device_type, &r.raw_value);
if let Some(v) = val {
results.push(make_observation(
&r.id, loinc_code, loinc_display,
category_json, &patient_ref, device_ref.as_ref(),
&measured, v, unit_display, unit_code,
));
}
}
}
results
}
fn make_observation(
reading_id: &uuid::Uuid, code: &str, display: &str,
category: serde_json::Value, subject: &serde_json::Value,
device: Option<&serde_json::Value>, effective: &str,
value: f64, unit_display: &str, unit_code: &str,
) -> serde_json::Value {
let mut obs = serde_json::json!({
"resourceType": "Observation",
"id": reading_id.to_string(),
"status": "final",
"category": category,
"code": {
"coding": [{
"system": "http://loinc.org",
"code": code,
"display": display,
}]
},
"subject": subject,
"effectiveDateTime": effective,
"valueQuantity": {
"value": value,
"unit": unit_display,
"system": "http://unitsofmeasure.org",
"code": unit_code,
},
"meta": { "source": "hms-device" },
});
if let Some(d) = device {
obs["device"] = d.clone();
}
obs
}
fn extract_main_value(device_type: &str, raw: &serde_json::Value) -> Option<f64> {
match device_type {
"heart_rate" => raw.get("heart_rate").and_then(|v| v.as_f64()),
"blood_oxygen" => raw.get("blood_oxygen").and_then(|v| v.as_f64()),
"temperature" => raw.get("temperature").and_then(|v| v.as_f64()),
"blood_glucose" => raw.get("blood_glucose").and_then(|v| v.as_f64()),
"steps" => raw.get("steps").and_then(|v| v.as_f64()),
"sleep" => raw.get("sleep_duration").and_then(|v| v.as_f64()),
"stress" => raw.get("stress_level").and_then(|v| v.as_f64()),
_ => raw.as_f64(),
}
}
/// HMS PatientDevice → FHIR Device
pub fn patient_device_to_fhir(d: &patient_devices::Model) -> serde_json::Value {
let mut device = serde_json::json!({
"resourceType": "Device",
"id": d.id.to_string(),
"identifier": [{
"system": "urn:hms:device-id",
"value": d.device_id,
}],
"status": "active",
"meta": { "source": "hms-device" },
});
if let Some(ref model) = d.device_model {
device["displayName"] = serde_json::json!(model);
}
if let Some(ref dt) = d.device_type {
device["type"] = serde_json::json!({
"coding": [{
"system": "urn:hms:device-type",
"code": dt,
}]
});
}
device
}
/// HMS DoctorProfile → FHIR Practitioner
pub fn doctor_to_fhir(d: &doctor_profile::Model) -> serde_json::Value {
let mut practitioner = serde_json::json!({
"resourceType": "Practitioner",
"id": d.id.to_string(),
"name": [{
"text": d.name,
}],
"meta": { "source": "hms-practitioner" },
});
let mut qualifications = Vec::new();
if let Some(ref title) = d.title {
qualifications.push(serde_json::json!({
"code": {
"coding": [{
"system": "urn:hms:doctor-title",
"display": title,
}]
}
}));
}
if let Some(ref dept) = d.department {
qualifications.push(serde_json::json!({
"code": {
"coding": [{
"system": "urn:hms:department",
"display": dept,
}]
}
}));
}
if !qualifications.is_empty() {
practitioner["qualification"] = serde_json::json!(qualifications);
}
if let Some(ref specialty) = d.specialty {
practitioner["specialty"] = serde_json::json!([{
"coding": [{
"system": "urn:hms:specialty",
"display": specialty,
}]
}]);
}
practitioner
}
/// HMS Appointment → FHIR Appointment
pub fn appointment_to_fhir(a: &appointment::Model) -> serde_json::Value {
let fhir_status = match a.status.as_str() {
"confirmed" | "completed" => a.status.as_str(),
"pending" => "proposed",
"cancelled" => "cancelled",
_ => "booked",
};
let mut participants = vec![
serde_json::json!({
"actor": {"reference": format!("Patient/{}", a.patient_id)},
"status": "accepted",
}),
];
if let Some(ref doctor_id) = a.doctor_id {
participants.push(serde_json::json!({
"actor": {"reference": format!("Practitioner/{}", doctor_id)},
"status": "accepted",
}));
}
let start = format!("{}T{}", a.appointment_date, a.start_time);
let end = format!("{}T{}", a.appointment_date, a.end_time);
serde_json::json!({
"resourceType": "Appointment",
"id": a.id.to_string(),
"status": fhir_status,
"participant": participants,
"start": start,
"end": end,
"meta": { "source": "hms-appointment" },
})
}
/// HMS LabReport → FHIR DiagnosticReport
pub fn lab_report_to_fhir(r: &lab_report::Model) -> serde_json::Value {
let fhir_status = match r.status.as_str() {
"reviewed" => "final",
"pending" => "preliminary",
_ => "registered",
};
serde_json::json!({
"resourceType": "DiagnosticReport",
"id": r.id.to_string(),
"status": fhir_status,
"code": {
"coding": [{
"system": "urn:hms:report-type",
"code": r.report_type,
}]
},
"subject": {"reference": format!("Patient/{}", r.patient_id)},
"effectiveDateTime": r.report_date.to_string(),
"meta": { "source": "hms-diagnostic-report" },
})
}
/// HMS ConsultationSession → FHIR Encounter
pub fn consultation_to_fhir(c: &consultation_session::Model) -> serde_json::Value {
let fhir_status = match c.status.as_str() {
"active" => "in-progress",
"closed" => "finished",
_ => "planned",
};
let class_code = match c.consultation_type.as_str() {
"online" => "VR",
"offline" => "AMB",
_ => "AMB",
};
serde_json::json!({
"resourceType": "Encounter",
"id": c.id.to_string(),
"status": fhir_status,
"class": {
"system": "http://terminology.hl7.org/CodeSystem/v3-ActCode",
"code": class_code,
},
"subject": {"reference": format!("Patient/{}", c.patient_id)},
"meta": { "source": "hms-encounter" },
})
}
/// HMS FollowUpTask → FHIR Task
pub fn follow_up_to_fhir(t: &follow_up_task::Model) -> serde_json::Value {
let fhir_status = match t.status.as_str() {
"pending" => "requested",
"in_progress" => "in-progress",
"completed" => "completed",
"cancelled" => "cancelled",
_ => "requested",
};
let display = t.content_template.as_deref()
.unwrap_or(&t.follow_up_type);
serde_json::json!({
"resourceType": "Task",
"id": t.id.to_string(),
"status": fhir_status,
"intent": "plan",
"focus": {
"display": display,
},
"for": {"reference": format!("Patient/{}", t.patient_id)},
"meta": { "source": "hms-task" },
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_patient_to_fhir_basic() {
let p = patient::Model {
id: uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
tenant_id: uuid::Uuid::now_v7(),
user_id: None,
name: "\u{5f20}\u{4e09}".into(),
gender: Some("male".into()),
birth_date: chrono::NaiveDate::from_ymd_opt(1968, 5, 15),
blood_type: None,
id_number: Some("110101196805150001".into()),
id_number_hash: None,
allergy_history: None,
medical_history_summary: None,
emergency_contact_name: None,
emergency_contact_phone: None,
emergency_contact_phone_hash: None,
key_version: None,
status: "active".into(),
verification_status: "verified".into(),
source: None,
notes: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
created_by: None,
updated_by: None,
deleted_at: None,
version: 1,
};
let fhir = patient_to_fhir(&p);
assert_eq!(fhir["resourceType"], "Patient");
assert_eq!(fhir["id"], "00000000-0000-0000-0000-000000000001");
assert_eq!(fhir["gender"], "male");
assert_eq!(fhir["birthDate"], "1968-05-15");
assert_eq!(fhir["identifier"][0]["value"], "110101196805150001");
}
#[test]
fn test_heart_rate_to_fhir_observation() {
let reading = device_readings::Model {
id: uuid::Uuid::now_v7(),
tenant_id: uuid::Uuid::now_v7(),
patient_id: uuid::Uuid::now_v7(),
device_id: Some("band-001".into()),
device_type: "heart_rate".into(),
metric: None,
device_model: Some("Mi Band 8".into()),
raw_value: serde_json::json!({"heart_rate": 78}),
measured_at: chrono::Utc::now(),
created_at: chrono::Utc::now(),
deleted_at: None,
};
let observations = device_reading_to_fhir_observations(&reading);
assert_eq!(observations.len(), 1);
let obs = &observations[0];
assert_eq!(obs["resourceType"], "Observation");
assert_eq!(obs["code"]["coding"][0]["code"], "8867-4");
assert!((obs["valueQuantity"]["value"].as_f64().unwrap() - 78.0).abs() < f64::EPSILON);
}
#[test]
fn test_blood_pressure_to_fhir_observations() {
let reading = device_readings::Model {
id: uuid::Uuid::now_v7(),
tenant_id: uuid::Uuid::now_v7(),
patient_id: uuid::Uuid::now_v7(),
device_id: Some("bp-001".into()),
device_type: "blood_pressure".into(),
metric: None,
device_model: Some("Omron HEM-7322".into()),
raw_value: serde_json::json!({"systolic": 135, "diastolic": 88}),
measured_at: chrono::Utc::now(),
created_at: chrono::Utc::now(),
deleted_at: None,
};
let observations = device_reading_to_fhir_observations(&reading);
assert!(observations.len() >= 2);
}
}

View File

@@ -1,13 +1,21 @@
use axum::extract::{FromRef, Path, Query, State};
use axum::response::IntoResponse;
use axum::Extension;
use axum::Json;
use sea_orm::*;
use serde::Deserialize;
use uuid::Uuid;
use erp_core::error::AppError;
use erp_core::rbac::require_permission;
use erp_core::types::TenantContext;
use crate::fhir::converter;
use crate::fhir::types::{category_to_device_types, loinc_to_device_type};
use crate::state::HealthState;
/// GET /fhir/R4/metadata — FHIR CapabilityStatement
pub async fn capability_statement<S>() -> Result<impl IntoResponse, erp_core::error::AppError>
pub async fn capability_statement<S>() -> Result<impl IntoResponse, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
@@ -42,187 +50,603 @@ where
}
#[derive(Debug, Deserialize)]
pub struct SearchParams {
pub struct FhirSearchParams {
#[serde(rename = "_id")]
pub id: Option<String>,
#[serde(rename = "_count")]
pub count: Option<u64>,
#[serde(rename = "_offset")]
pub offset: Option<u64>,
pub patient: Option<String>,
pub category: Option<String>,
#[serde(rename = "_count")]
pub count: Option<u32>,
#[serde(rename = "_offset")]
pub offset: Option<u32>,
pub code: Option<String>,
pub date: Option<String>,
pub name: Option<String>,
pub identifier: Option<String>,
pub status: Option<String>,
}
// ── Patient ────────────────────────────────────────────────────────────
pub async fn search_patients(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.patient.list")?;
let mut query = crate::entity::patient::Entity::find()
.filter(crate::entity::patient::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::patient::Column::DeletedAt.is_null());
if let Some(ref id) = params.id {
let uid = Uuid::parse_str(id).map_err(|_| AppError::Validation("Invalid patient id".into()))?;
query = query.filter(crate::entity::patient::Column::Id.eq(uid));
}
if let Some(ref name) = params.name {
query = query.filter(crate::entity::patient::Column::Name.contains(name));
}
if let Some(ref identifier) = params.identifier {
query = query.filter(crate::entity::patient::Column::IdNumber.contains(identifier));
}
let limit = params.count.unwrap_or(20).min(100);
let offset = params.offset.unwrap_or(0);
let patients = query
.limit(limit)
.offset(offset)
.all(&state.db)
.await?;
let entries: Vec<serde_json::Value> = patients.iter()
.map(|p| serde_json::json!({"resource": converter::patient_to_fhir(p)}))
.collect();
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
pub async fn get_patient(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Patient not implemented yet"}]
})))
}
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.patient.list")?;
pub async fn patient_everything(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
let patient = crate::entity::patient::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("Patient not found".into()))?;
if patient.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
Ok(Json(converter::patient_to_fhir(&patient)))
}
// ── Observation ────────────────────────────────────────────────────────
pub async fn search_observations(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.device-readings.list")?;
let mut query = crate::entity::device_readings::Entity::find()
.filter(crate::entity::device_readings::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::device_readings::Column::DeletedAt.is_null());
if let Some(ref patient_id) = params.patient {
let uid = Uuid::parse_str(patient_id)
.map_err(|_| AppError::Validation("Invalid patient id".into()))?;
query = query.filter(crate::entity::device_readings::Column::PatientId.eq(uid));
}
if let Some(ref code) = params.code {
if let Some(dt) = loinc_to_device_type(code) {
query = query.filter(crate::entity::device_readings::Column::DeviceType.eq(dt));
}
}
if let Some(ref category) = params.category {
let types = category_to_device_types(category);
if !types.is_empty() {
query = query.filter(
crate::entity::device_readings::Column::DeviceType.is_in(types)
);
}
}
if let Some(ref date) = params.date {
if let Some(after) = date.strip_prefix("gt") {
if let Ok(dt) = after.parse::<chrono::DateTime<chrono::Utc>>() {
query = query.filter(
crate::entity::device_readings::Column::MeasuredAt.gt(dt)
);
}
} else if let Some(before) = date.strip_prefix("lt") {
if let Ok(dt) = before.parse::<chrono::DateTime<chrono::Utc>>() {
query = query.filter(
crate::entity::device_readings::Column::MeasuredAt.lt(dt)
);
}
} else if let Ok(dt) = date.parse::<chrono::DateTime<chrono::Utc>>() {
let start = dt.date_naive().and_hms_opt(0, 0, 0).unwrap().and_utc();
let end = dt.date_naive().and_hms_opt(23, 59, 59).unwrap().and_utc();
query = query.filter(
crate::entity::device_readings::Column::MeasuredAt.between(start, end)
);
}
}
let limit = params.count.unwrap_or(50).min(200);
let readings = query
.order_by_desc(crate::entity::device_readings::Column::MeasuredAt)
.limit(limit)
.all(&state.db)
.await?;
let mut entries = Vec::new();
for reading in &readings {
for obs in converter::device_reading_to_fhir_observations(reading) {
entries.push(serde_json::json!({"resource": obs}));
}
}
pub async fn observation_lastn(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
// ── Device ─────────────────────────────────────────────────────────────
pub async fn search_devices(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.devices.list")?;
let mut query = crate::entity::patient_devices::Entity::find()
.filter(crate::entity::patient_devices::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::patient_devices::Column::DeletedAt.is_null());
if let Some(ref patient_id) = params.patient {
let uid = Uuid::parse_str(patient_id)
.map_err(|_| AppError::Validation("Invalid patient id".into()))?;
query = query.filter(crate::entity::patient_devices::Column::PatientId.eq(uid));
}
let limit = params.count.unwrap_or(50).min(200);
let devices = query.limit(limit).all(&state.db).await?;
let entries: Vec<serde_json::Value> = devices.iter()
.map(|d| serde_json::json!({"resource": converter::patient_device_to_fhir(d)}))
.collect();
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
pub async fn get_device(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Device not implemented yet"}]
})))
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.devices.list")?;
let device = crate::entity::patient_devices::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("Device not found".into()))?;
if device.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
Ok(Json(converter::patient_device_to_fhir(&device)))
}
// ── Practitioner ───────────────────────────────────────────────────────
pub async fn search_practitioners(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.doctor.list")?;
let mut query = crate::entity::doctor_profile::Entity::find()
.filter(crate::entity::doctor_profile::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::doctor_profile::Column::DeletedAt.is_null());
if let Some(ref name) = params.name {
query = query.filter(crate::entity::doctor_profile::Column::Name.contains(name));
}
let limit = params.count.unwrap_or(50).min(200);
let doctors = query.limit(limit).all(&state.db).await?;
let entries: Vec<serde_json::Value> = doctors.iter()
.map(|d| serde_json::json!({"resource": converter::doctor_to_fhir(d)}))
.collect();
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
pub async fn get_practitioner(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Practitioner not implemented yet"}]
})))
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.doctor.list")?;
let doctor = crate::entity::doctor_profile::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("Practitioner not found".into()))?;
if doctor.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
Ok(Json(converter::doctor_to_fhir(&doctor)))
}
// ── Appointment ────────────────────────────────────────────────────────
pub async fn search_appointments(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.appointment.list")?;
let mut query = crate::entity::appointment::Entity::find()
.filter(crate::entity::appointment::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::appointment::Column::DeletedAt.is_null());
if let Some(ref patient_id) = params.patient {
let uid = Uuid::parse_str(patient_id)
.map_err(|_| AppError::Validation("Invalid patient id".into()))?;
query = query.filter(crate::entity::appointment::Column::PatientId.eq(uid));
}
if let Some(ref status) = params.status {
query = query.filter(crate::entity::appointment::Column::Status.eq(status));
}
let limit = params.count.unwrap_or(50).min(200);
let appointments = query
.order_by_desc(crate::entity::appointment::Column::AppointmentDate)
.limit(limit)
.all(&state.db)
.await?;
let entries: Vec<serde_json::Value> = appointments.iter()
.map(|a| serde_json::json!({"resource": converter::appointment_to_fhir(a)}))
.collect();
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
pub async fn get_appointment(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Appointment not implemented yet"}]
})))
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.appointment.list")?;
let appointment = crate::entity::appointment::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("Appointment not found".into()))?;
if appointment.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
Ok(Json(converter::appointment_to_fhir(&appointment)))
}
// ── DiagnosticReport ───────────────────────────────────────────────────
pub async fn search_diagnostic_reports(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.health-data.list")?;
let mut query = crate::entity::lab_report::Entity::find()
.filter(crate::entity::lab_report::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::lab_report::Column::DeletedAt.is_null());
if let Some(ref patient_id) = params.patient {
let uid = Uuid::parse_str(patient_id)
.map_err(|_| AppError::Validation("Invalid patient id".into()))?;
query = query.filter(crate::entity::lab_report::Column::PatientId.eq(uid));
}
if let Some(ref code) = params.code {
query = query.filter(crate::entity::lab_report::Column::ReportType.eq(code));
}
if let Some(ref status) = params.status {
query = query.filter(crate::entity::lab_report::Column::Status.eq(status));
}
let limit = params.count.unwrap_or(50).min(200);
let reports = query
.order_by_desc(crate::entity::lab_report::Column::ReportDate)
.limit(limit)
.all(&state.db)
.await?;
let entries: Vec<serde_json::Value> = reports.iter()
.map(|r| serde_json::json!({"resource": converter::lab_report_to_fhir(r)}))
.collect();
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
pub async fn get_diagnostic_report(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "DiagnosticReport not implemented yet"}]
})))
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.health-data.list")?;
let report = crate::entity::lab_report::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("DiagnosticReport not found".into()))?;
if report.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
Ok(Json(converter::lab_report_to_fhir(&report)))
}
// ── Encounter ──────────────────────────────────────────────────────────
pub async fn search_encounters(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.consultation.list")?;
let mut query = crate::entity::consultation_session::Entity::find()
.filter(crate::entity::consultation_session::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::consultation_session::Column::DeletedAt.is_null());
if let Some(ref patient_id) = params.patient {
let uid = Uuid::parse_str(patient_id)
.map_err(|_| AppError::Validation("Invalid patient id".into()))?;
query = query.filter(crate::entity::consultation_session::Column::PatientId.eq(uid));
}
if let Some(ref status) = params.status {
query = query.filter(crate::entity::consultation_session::Column::Status.eq(status));
}
let limit = params.count.unwrap_or(50).min(200);
let sessions = query
.order_by_desc(crate::entity::consultation_session::Column::CreatedAt)
.limit(limit)
.all(&state.db)
.await?;
let entries: Vec<serde_json::Value> = sessions.iter()
.map(|s| serde_json::json!({"resource": converter::consultation_to_fhir(s)}))
.collect();
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
pub async fn get_encounter(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Encounter not implemented yet"}]
})))
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.consultation.list")?;
let session = crate::entity::consultation_session::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("Encounter not found".into()))?;
if session.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
Ok(Json(converter::consultation_to_fhir(&session)))
}
// ── Task ───────────────────────────────────────────────────────────────
pub async fn search_tasks(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(params): Query<FhirSearchParams>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.follow-up.list")?;
let mut query = crate::entity::follow_up_task::Entity::find()
.filter(crate::entity::follow_up_task::Column::TenantId.eq(ctx.tenant_id))
.filter(crate::entity::follow_up_task::Column::DeletedAt.is_null());
if let Some(ref patient_id) = params.patient {
let uid = Uuid::parse_str(patient_id)
.map_err(|_| AppError::Validation("Invalid patient id".into()))?;
query = query.filter(crate::entity::follow_up_task::Column::PatientId.eq(uid));
}
if let Some(ref status) = params.status {
query = query.filter(crate::entity::follow_up_task::Column::Status.eq(status));
}
let limit = params.count.unwrap_or(50).min(200);
let tasks = query
.order_by_desc(crate::entity::follow_up_task::Column::PlannedDate)
.limit(limit)
.all(&state.db)
.await?;
let entries: Vec<serde_json::Value> = tasks.iter()
.map(|t| serde_json::json!({"resource": converter::follow_up_to_fhir(t)}))
.collect();
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
"resourceType": "Bundle",
"type": "searchset",
"total": entries.len(),
"entry": entries,
})))
}
pub async fn get_task(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.follow-up.list")?;
let task = crate::entity::follow_up_task::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("Task not found".into()))?;
if task.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
Ok(Json(converter::follow_up_to_fhir(&task)))
}
// ── $everything ────────────────────────────────────────────────────────
/// GET /fhir/R4/Patient/{id}/$everything
pub async fn patient_everything(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, AppError> {
require_permission(&ctx, "health.patient.list")?;
let patient = crate::entity::patient::Entity::find_by_id(id)
.one(&state.db)
.await?
.ok_or_else(|| AppError::NotFound("Patient not found".into()))?;
if patient.tenant_id != ctx.tenant_id {
return Err(AppError::Forbidden("Access denied".into()));
}
let mut entries = Vec::new();
// 1. Patient
entries.push(serde_json::json!({
"resource": converter::patient_to_fhir(&patient),
"fullUrl": format!("https://hms.local/fhir/R4/Patient/{}", id),
}));
// 2. Observations设备读数
let readings = crate::entity::device_readings::Entity::find()
.filter(crate::entity::device_readings::Column::PatientId.eq(id))
.filter(crate::entity::device_readings::Column::DeletedAt.is_null())
.limit(200)
.all(&state.db)
.await?;
for r in &readings {
for obs in converter::device_reading_to_fhir_observations(r) {
entries.push(serde_json::json!({"resource": obs}));
}
}
// 3. Devices
let devices = crate::entity::patient_devices::Entity::find()
.filter(crate::entity::patient_devices::Column::PatientId.eq(id))
.filter(crate::entity::patient_devices::Column::DeletedAt.is_null())
.all(&state.db)
.await?;
for d in &devices {
entries.push(serde_json::json!({"resource": converter::patient_device_to_fhir(d)}));
}
// 4. Encounters咨询会话
let consultations = crate::entity::consultation_session::Entity::find()
.filter(crate::entity::consultation_session::Column::PatientId.eq(id))
.filter(crate::entity::consultation_session::Column::DeletedAt.is_null())
.all(&state.db)
.await?;
for c in &consultations {
entries.push(serde_json::json!({"resource": converter::consultation_to_fhir(c)}));
}
// 5. Appointments
let appointments = crate::entity::appointment::Entity::find()
.filter(crate::entity::appointment::Column::PatientId.eq(id))
.filter(crate::entity::appointment::Column::DeletedAt.is_null())
.all(&state.db)
.await?;
for a in &appointments {
entries.push(serde_json::json!({"resource": converter::appointment_to_fhir(a)}));
}
// 6. Tasks随访任务
let tasks = crate::entity::follow_up_task::Entity::find()
.filter(crate::entity::follow_up_task::Column::PatientId.eq(id))
.filter(crate::entity::follow_up_task::Column::DeletedAt.is_null())
.limit(50)
.all(&state.db)
.await?;
for t in &tasks {
entries.push(serde_json::json!({"resource": converter::follow_up_to_fhir(t)}));
}
// 7. DiagnosticReports化验报告
let reports = crate::entity::lab_report::Entity::find()
.filter(crate::entity::lab_report::Column::PatientId.eq(id))
.filter(crate::entity::lab_report::Column::DeletedAt.is_null())
.limit(50)
.all(&state.db)
.await?;
for r in &reports {
entries.push(serde_json::json!({"resource": converter::lab_report_to_fhir(r)}));
}
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Task not implemented yet"}]
"resourceType": "Bundle",
"type": "collection",
"total": entries.len(),
"entry": entries,
})))
}

View File

@@ -0,0 +1,5 @@
pub mod converter;
pub mod handler;
pub mod types;
pub use types::*;

View File

@@ -0,0 +1,97 @@
use serde::Serialize;
/// FHIR 搜索参数
#[derive(Debug, serde::Deserialize, utoipa::IntoParams)]
pub struct FhirSearchParams {
#[serde(rename = "_id")]
pub id: Option<String>,
#[serde(rename = "_count")]
pub count: Option<u64>,
#[serde(rename = "_offset")]
pub offset: Option<u64>,
pub patient: Option<String>,
pub category: Option<String>,
pub code: Option<String>,
pub date: Option<String>,
pub name: Option<String>,
pub identifier: Option<String>,
pub status: Option<String>,
}
/// HMS device_type → FHIR Observation LOINC 映射
pub fn device_type_to_loinc(device_type: &str) -> Option<(&'static str, &'static str)> {
match device_type {
"heart_rate" => Some(("8867-4", "Heart rate")),
"blood_oxygen" => Some(("2708-6", "Oxygen saturation in Arterial blood")),
"blood_pressure" => Some(("85354-9", "Blood pressure panel")),
"blood_glucose" => Some(("2339-0", "Glucose in Blood")),
"temperature" => Some(("8310-5", "Body temperature")),
"steps" => Some(("55423-8", "Number of steps in 24 hours")),
"sleep" => Some(("93832-4", "Sleep duration")),
"stress" => Some(("80319-1", "Stress level")),
_ => None,
}
}
/// HMS device_type → FHIR Observation category
pub fn device_type_to_category(device_type: &str) -> &'static str {
match device_type {
"heart_rate" | "blood_oxygen" | "blood_pressure" | "temperature" => "vital-signs",
"steps" | "sleep" | "stress" => "activity",
"blood_glucose" => "laboratory",
_ => "survey",
}
}
/// HMS device_type → FHIR UCUM unit
pub fn device_type_to_unit(device_type: &str) -> (&'static str, &'static str) {
match device_type {
"heart_rate" => ("beats/minute", "/min"),
"blood_oxygen" => ("%", "%"),
"blood_pressure" => ("mmHg", "mm[Hg]"),
"blood_glucose" => ("mg/dL", "mg/dL"),
"temperature" => ("\u{00b0}C", "Cel"),
"steps" => ("steps", "{steps}"),
"sleep" => ("hours", "h"),
"stress" => ("score", "{score}"),
_ => ("unknown", "unknown"),
}
}
/// OperationOutcome错误响应
#[derive(Debug, Serialize)]
pub struct OperationOutcomeResource {
pub resource_type: String,
pub issue: Vec<OperationOutcomeIssue>,
}
#[derive(Debug, Serialize)]
pub struct OperationOutcomeIssue {
pub severity: String,
pub code: String,
pub diagnostics: Option<String>,
}
/// LOINC code → device_type 反向映射
pub fn loinc_to_device_type(loinc: &str) -> Option<&'static str> {
match loinc {
"8867-4" => Some("heart_rate"),
"2708-6" => Some("blood_oxygen"),
"85354-9" | "8480-6" | "8462-4" => Some("blood_pressure"),
"2339-0" => Some("blood_glucose"),
"8310-5" => Some("temperature"),
"55423-8" => Some("steps"),
"93832-4" => Some("sleep"),
_ => None,
}
}
/// FHIR category → device_type 列表
pub fn category_to_device_types(category: &str) -> Vec<&'static str> {
match category {
"vital-signs" => vec!["heart_rate", "blood_oxygen", "blood_pressure", "temperature"],
"laboratory" => vec!["blood_glucose"],
"activity" => vec!["steps", "sleep", "stress"],
_ => vec![],
}
}

View File

@@ -22,3 +22,4 @@ pub mod health_data_handler;
pub mod patient_handler;
pub mod points_handler;
pub mod stats_handler;
pub mod vital_signs_daily_handler;

View File

@@ -0,0 +1,51 @@
use axum::extract::{FromRef, Query, State};
use axum::response::IntoResponse;
use axum::Extension;
use serde::Deserialize;
use utoipa::IntoParams;
use erp_core::error::AppError;
use erp_core::rbac::require_permission;
use erp_core::types::{ApiResponse, TenantContext};
use crate::service::vital_signs_daily_service;
use crate::state::HealthState;
#[derive(Debug, Deserialize, IntoParams)]
pub struct DailyAggQuery {
pub patient_id: Option<uuid::Uuid>,
pub device_type: Option<String>,
pub start_date: String,
pub end_date: String,
}
pub async fn get_daily_aggregations<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Query(query): Query<DailyAggQuery>,
) -> Result<impl IntoResponse, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.device-readings.list")?;
let start = query.start_date.parse::<chrono::NaiveDate>().map_err(|_| {
AppError::Validation("Invalid start_date format, expected YYYY-MM-DD".into())
})?;
let end = query.end_date.parse::<chrono::NaiveDate>().map_err(|_| {
AppError::Validation("Invalid end_date format, expected YYYY-MM-DD".into())
})?;
let results = vital_signs_daily_service::query_daily(
&state.db,
ctx.tenant_id,
query.patient_id,
query.device_type,
start,
end,
)
.await?;
Ok(axum::Json(ApiResponse::ok(results)))
}

View File

@@ -3,9 +3,11 @@ pub mod dto;
pub mod entity;
pub mod error;
pub mod event;
pub mod fhir;
pub mod handler;
pub mod health_provider_impl;
pub mod module;
pub mod oauth;
pub mod service;
pub mod state;

View File

@@ -10,6 +10,7 @@ use crate::handler::{
alert_handler, alert_rule_handler,
appointment_handler, article_category_handler, article_handler, article_tag_handler, consultation_handler, consent_handler, critical_alert_handler, critical_value_threshold_handler, daily_monitoring_handler, device_handler, device_reading_handler, diagnosis_handler, doctor_handler, follow_up_handler, follow_up_template_handler,
health_data_handler, medication_record_handler, medication_reminder_handler, patient_handler, points_handler, stats_handler,
vital_signs_daily_handler,
};
pub struct HealthModule;
@@ -137,9 +138,10 @@ impl HealthModule {
S: Clone + Send + Sync + 'static,
{
Router::new()
.route("/oauth/token", axum::routing::post(crate::oauth::handler::token))
}
/// FHIR R4 只读路由(JWT 认证中间件
/// FHIR R4 只读路由(使OAuth client_credentials 认证
pub fn fhir_routes<S>() -> Router<S>
where
crate::state::HealthState: axum::extract::FromRef<S>,
@@ -174,6 +176,10 @@ impl HealthModule {
.route("/fhir/R4/Task/{id}", axum::routing::get(fhir::get_task))
// $everything
.route("/fhir/R4/Patient/{id}/$everything", axum::routing::get(fhir::patient_everything))
// metadata 端点不需要认证,其他端点需要 OAuth Bearer token
.layer(axum::middleware::from_fn(
crate::oauth::middleware::oauth_auth_middleware,
))
}
pub fn protected_routes<S>() -> Router<S>
@@ -721,6 +727,11 @@ impl HealthModule {
"/health/patients/{patient_id}/device-readings/hourly",
axum::routing::get(device_reading_handler::list_hourly),
)
// 日聚合查询
.route(
"/health/vital-signs/daily",
axum::routing::get(vital_signs_daily_handler::get_daily_aggregations),
)
// 告警路由
.route(
"/health/alerts",
@@ -790,6 +801,21 @@ impl HealthModule {
"/health/action-inbox/{source_ref}/thread",
axum::routing::get(action_inbox_handler::get_action_thread),
)
// OAuth 合作方管理
.route(
"/health/oauth/clients",
axum::routing::get(crate::oauth::handler::list_clients)
.post(crate::oauth::handler::create_client),
)
.route(
"/health/oauth/clients/{id}",
axum::routing::put(crate::oauth::handler::update_client)
.delete(crate::oauth::handler::delete_client),
)
.route(
"/health/oauth/clients/{id}/regenerate-secret",
axum::routing::post(crate::oauth::handler::regenerate_secret),
)
}
}
@@ -1197,6 +1223,19 @@ impl ErpModule for HealthModule {
description: "查看系统健康、用户活跃度、模块状态等管理统计".into(),
module: "health".into(),
},
// OAuth 合作方管理
PermissionDescriptor {
code: "health.oauth.list".into(),
name: "查看合作方".into(),
description: "查看 FHIR API 合作方列表".into(),
module: "health".into(),
},
PermissionDescriptor {
code: "health.oauth.manage".into(),
name: "管理合作方".into(),
description: "创建/编辑/删除 FHIR API 合作方".into(),
module: "health".into(),
},
]
}

View File

@@ -0,0 +1,112 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
/// RFC 6749 §4.4 Client Credentials Grant 请求
#[derive(Debug, Deserialize, ToSchema)]
pub struct TokenRequest {
pub grant_type: String,
pub client_id: String,
pub client_secret: String,
#[serde(default)]
pub scope: Option<String>,
}
/// RFC 6749 §5.1 成功令牌响应
#[derive(Debug, Serialize, ToSchema)]
pub struct TokenResponse {
pub access_token: String,
pub token_type: String,
pub expires_in: i64,
pub scope: String,
}
/// RFC 6749 §5.2 错误响应
#[derive(Debug, Serialize, ToSchema)]
pub struct TokenErrorResponse {
pub error: String,
pub error_description: String,
}
impl TokenErrorResponse {
pub fn invalid_client(desc: &str) -> Self {
Self {
error: "invalid_client".into(),
error_description: desc.into(),
}
}
pub fn invalid_grant(desc: &str) -> Self {
Self {
error: "invalid_grant".into(),
error_description: desc.into(),
}
}
pub fn invalid_scope(desc: &str) -> Self {
Self {
error: "invalid_scope".into(),
error_description: desc.into(),
}
}
}
/// 合作方创建请求
#[derive(Debug, Deserialize, ToSchema)]
pub struct CreateApiClientReq {
pub client_name: String,
pub scopes: Vec<String>,
pub allowed_patient_ids: Option<Vec<String>>,
#[serde(default = "default_rate_limit")]
pub rate_limit_per_minute: i32,
#[serde(default = "default_token_lifetime")]
pub token_lifetime_seconds: i32,
}
fn default_rate_limit() -> i32 {
60
}
fn default_token_lifetime() -> i32 {
3600
}
/// 合作方响应
#[derive(Debug, Serialize, ToSchema)]
pub struct ApiClientResp {
pub id: String,
pub tenant_id: String,
pub client_id: String,
pub client_secret: String,
pub client_name: String,
pub scopes: Vec<String>,
pub allowed_patient_ids: Option<Vec<String>>,
pub rate_limit_per_minute: i32,
pub is_active: bool,
pub token_lifetime_seconds: i32,
pub created_at: String,
}
/// 合作方列表响应(不含 secret
#[derive(Debug, Serialize, ToSchema)]
pub struct ApiClientListItem {
pub id: String,
pub client_id: String,
pub client_name: String,
pub scopes: Vec<String>,
pub rate_limit_per_minute: i32,
pub is_active: bool,
pub token_lifetime_seconds: i32,
pub created_at: String,
}
/// 更新合作方请求
#[derive(Debug, Deserialize, ToSchema)]
pub struct UpdateApiClientReq {
pub client_name: Option<String>,
pub scopes: Option<Vec<String>>,
pub allowed_patient_ids: Option<Option<Vec<String>>>,
pub rate_limit_per_minute: Option<i32>,
pub is_active: Option<bool>,
pub token_lifetime_seconds: Option<i32>,
pub version: i32,
}

View File

@@ -0,0 +1,53 @@
use erp_core::error::AppError;
#[derive(Debug, thiserror::Error)]
pub enum OAuthError {
#[error("无效的客户端凭据")]
InvalidClient,
#[error("客户端未激活")]
ClientInactive,
#[error("请求的 scope 超出允许范围")]
InvalidScope,
#[error("grant_type 只支持 client_credentials")]
UnsupportedGrantType,
#[error("请求过于频繁,超出速率限制")]
RateLimitExceeded,
#[error("客户端未找到")]
ClientNotFound,
#[error("数据库错误: {0}")]
DbError(String),
#[error("哈希错误: {0}")]
HashError(String),
#[error("JWT 错误: {0}")]
JwtError(#[from] jsonwebtoken::errors::Error),
}
pub type OAuthResult<T> = Result<T, OAuthError>;
impl From<OAuthError> for AppError {
fn from(err: OAuthError) -> Self {
match err {
OAuthError::InvalidClient | OAuthError::ClientInactive => AppError::Unauthorized,
OAuthError::InvalidScope => AppError::Forbidden(err.to_string()),
OAuthError::UnsupportedGrantType => AppError::Validation(err.to_string()),
OAuthError::RateLimitExceeded => AppError::TooManyRequests,
OAuthError::ClientNotFound => AppError::NotFound(err.to_string()),
OAuthError::DbError(_) | OAuthError::HashError(_) => AppError::Internal(err.to_string()),
OAuthError::JwtError(_) => AppError::Unauthorized,
}
}
}
impl From<sea_orm::DbErr> for OAuthError {
fn from(err: sea_orm::DbErr) -> Self {
OAuthError::DbError(err.to_string())
}
}

View File

@@ -0,0 +1,113 @@
use axum::{
extract::{Path, State},
http::StatusCode,
Extension, Json,
};
use erp_core::error::AppError;
use erp_core::types::TenantContext;
use uuid::Uuid;
use crate::oauth::dto::*;
use crate::oauth::error::OAuthError;
use crate::oauth::service::OAuthService;
use crate::state::HealthState;
/// POST /oauth/token — RFC 6749 Client Credentials Grant
pub async fn token(
State(state): State<HealthState>,
Json(req): Json<TokenRequest>,
) -> Result<(StatusCode, Json<TokenResponse>), (StatusCode, Json<TokenErrorResponse>)> {
let jwt_secret = std::env::var("ERP__AUTH__JWT_SECRET")
.unwrap_or_else(|_| "dev-secret-key".to_string());
match OAuthService::token(&state.db, &req, &jwt_secret).await {
Ok(resp) => Ok((StatusCode::OK, Json(resp))),
Err(OAuthError::InvalidClient | OAuthError::ClientInactive) => Err((
StatusCode::UNAUTHORIZED,
Json(TokenErrorResponse::invalid_client("客户端认证失败")),
)),
Err(OAuthError::InvalidScope) => Err((
StatusCode::BAD_REQUEST,
Json(TokenErrorResponse::invalid_scope("请求的 scope 超出允许范围")),
)),
Err(OAuthError::UnsupportedGrantType) => Err((
StatusCode::BAD_REQUEST,
Json(TokenErrorResponse::invalid_grant("仅支持 client_credentials")),
)),
Err(OAuthError::RateLimitExceeded) => Err((
StatusCode::TOO_MANY_REQUESTS,
Json(TokenErrorResponse::invalid_client("速率限制已超出")),
)),
Err(e) => {
tracing::error!(error = %e, "OAuth token 端点内部错误");
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(TokenErrorResponse::invalid_client("内部错误")),
))
}
}
}
/// POST /api/v1/health/oauth/clients — 创建合作方
pub async fn create_client(
State(state): State<HealthState>,
Extension(tenant_ctx): Extension<TenantContext>,
Json(req): Json<CreateApiClientReq>,
) -> Result<Json<ApiClientResp>, AppError> {
OAuthService::create_client(&state.db, tenant_ctx.tenant_id, &req, tenant_ctx.user_id)
.await
.map_err(AppError::from)
.map(Json)
}
/// GET /api/v1/health/oauth/clients — 列出合作方
pub async fn list_clients(
State(state): State<HealthState>,
Extension(tenant_ctx): Extension<TenantContext>,
) -> Result<Json<Vec<ApiClientListItem>>, AppError> {
OAuthService::list_clients(&state.db, tenant_ctx.tenant_id)
.await
.map_err(AppError::from)
.map(Json)
}
/// PUT /api/v1/health/oauth/clients/{id} — 更新合作方
pub async fn update_client(
State(state): State<HealthState>,
Extension(tenant_ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
Json(req): Json<UpdateApiClientReq>,
) -> Result<Json<ApiClientListItem>, AppError> {
OAuthService::update_client(&state.db, tenant_ctx.tenant_id, id, &req, tenant_ctx.user_id)
.await
.map_err(AppError::from)
.map(Json)
}
/// DELETE /api/v1/health/oauth/clients/{id} — 删除合作方
pub async fn delete_client(
State(state): State<HealthState>,
Extension(tenant_ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<StatusCode, AppError> {
OAuthService::delete_client(&state.db, tenant_ctx.tenant_id, id)
.await
.map_err(AppError::from)?;
Ok(StatusCode::NO_CONTENT)
}
/// POST /api/v1/health/oauth/clients/{id}/regenerate-secret — 重新生成 secret
pub async fn regenerate_secret(
State(state): State<HealthState>,
Extension(tenant_ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, AppError> {
let (client_id, plain) =
OAuthService::regenerate_secret(&state.db, tenant_ctx.tenant_id, id)
.await
.map_err(AppError::from)?;
Ok(Json(serde_json::json!({
"client_id": client_id,
"client_secret": plain,
})))
}

View File

@@ -0,0 +1,118 @@
use axum::{
extract::Request,
http::StatusCode,
middleware::Next,
response::{IntoResponse, Response},
Json,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Client Credentials JWT Claims
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientCredentialsClaims {
/// API 客户端 IDapi_clients 表主键)
pub sub: Uuid,
/// 租户 ID
pub tid: Uuid,
/// 允许的 FHIR scope 列表
pub scopes: Vec<String>,
/// 允许访问的患者 ID 列表None = 该租户下全部患者)
pub allowed_patient_ids: Option<Vec<String>>,
/// 速率限制(每分钟请求数)
pub rate_limit_per_minute: i32,
/// 过期时间
pub exp: i64,
/// 签发时间
pub iat: i64,
/// 令牌类型标识
pub token_type: String,
}
/// FHIR 请求上下文 — 中间件注入到请求扩展中
#[derive(Debug, Clone)]
pub struct FhirAuthContext {
pub client_id: Uuid,
pub tenant_id: Uuid,
pub scopes: Vec<String>,
pub allowed_patient_ids: Option<Vec<String>>,
}
/// FHIR OAuth 认证中间件
pub async fn oauth_auth_middleware(request: Request, next: Next) -> Response {
let auth_header = request
.headers()
.get("Authorization")
.and_then(|v| v.to_str().ok());
let token = match auth_header {
Some(header) if header.starts_with("Bearer ") => &header[7..],
_ => {
return (
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{
"severity": "error",
"code": "login",
"diagnostics": "Missing or invalid Authorization header"
}]
})),
)
.into_response();
}
};
let jwt_secret = std::env::var("ERP__AUTH__JWT_SECRET")
.unwrap_or_else(|_| "dev-secret-key".to_string());
let claims = match jsonwebtoken::decode::<ClientCredentialsClaims>(
token,
&jsonwebtoken::DecodingKey::from_secret(jwt_secret.as_bytes()),
&jsonwebtoken::Validation::default(),
) {
Ok(data) => data.claims,
Err(e) => {
tracing::warn!(error = %e, "FHIR OAuth token 验证失败");
return (
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{
"severity": "error",
"code": "login",
"diagnostics": "Invalid or expired token"
}]
})),
)
.into_response();
}
};
if claims.token_type != "client_credentials" {
return (
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{
"severity": "error",
"code": "login",
"diagnostics": "Token is not a client_credentials token"
}]
})),
)
.into_response();
}
let fhir_ctx = FhirAuthContext {
client_id: claims.sub,
tenant_id: claims.tid,
scopes: claims.scopes,
allowed_patient_ids: claims.allowed_patient_ids,
};
let mut request = request;
request.extensions_mut().insert(fhir_ctx);
next.run(request).await
}

View File

@@ -0,0 +1,7 @@
pub mod dto;
pub mod error;
pub mod handler;
pub mod middleware;
pub mod service;
pub use error::OAuthError;

View File

@@ -0,0 +1,395 @@
use argon2::{
Argon2,
password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
};
use rand_core::RngCore;
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, Set,
};
use uuid::Uuid;
use crate::entity::api_client;
use crate::oauth::dto::*;
use crate::oauth::error::{OAuthError, OAuthResult};
use crate::oauth::middleware::ClientCredentialsClaims;
const ALLOWED_SCOPES: &[&str] = &[
"Patient.read",
"Observation.read",
"Device.read",
"DiagnosticReport.read",
"Encounter.read",
"Practitioner.read",
"Appointment.read",
"Task.read",
];
fn validate_scopes(requested: &[String]) -> OAuthResult<Vec<String>> {
for scope in requested {
if !ALLOWED_SCOPES.contains(&scope.as_str()) {
return Err(OAuthError::InvalidScope);
}
}
Ok(requested.to_vec())
}
fn generate_client_id() -> String {
use rand_core::OsRng;
let mut bytes = [0u8; 16];
OsRng.fill_bytes(&mut bytes);
hex::encode(bytes)
}
fn generate_client_secret() -> OAuthResult<(String, String)> {
use rand_core::OsRng;
let mut bytes = [0u8; 32];
OsRng.fill_bytes(&mut bytes);
let plain = hex::encode(bytes);
let salt = SaltString::generate(&mut OsRng);
let hash = Argon2::default()
.hash_password(plain.as_bytes(), &salt)
.map_err(|e| OAuthError::HashError(e.to_string()))?;
Ok((plain, hash.to_string()))
}
fn verify_client_secret(plain: &str, hash: &str) -> OAuthResult<bool> {
let parsed = PasswordHash::new(hash).map_err(|e| OAuthError::HashError(e.to_string()))?;
Ok(Argon2::default()
.verify_password(plain.as_bytes(), &parsed)
.is_ok())
}
pub struct OAuthService;
impl OAuthService {
/// Client Credentials Grant — 验证客户端并签发 JWT
pub async fn token(
db: &DatabaseConnection,
req: &TokenRequest,
jwt_secret: &str,
) -> OAuthResult<TokenResponse> {
if req.grant_type != "client_credentials" {
return Err(OAuthError::UnsupportedGrantType);
}
let client = api_client::Entity::find()
.filter(api_client::Column::ClientId.eq(&req.client_id))
.filter(api_client::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or(OAuthError::InvalidClient)?;
if !client.is_active {
return Err(OAuthError::ClientInactive);
}
if !verify_client_secret(&req.client_secret, &client.client_secret_hash)? {
return Err(OAuthError::InvalidClient);
}
let granted_scopes = if let Some(ref scope_str) = req.scope {
let requested: Vec<String> = scope_str
.split(' ')
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect();
validate_scopes(&requested)?;
let allowed: Vec<String> =
serde_json::from_value(client.scopes.clone()).unwrap_or_default();
for s in &requested {
if !allowed.contains(s) {
return Err(OAuthError::InvalidScope);
}
}
requested
} else {
serde_json::from_value(client.scopes.clone()).unwrap_or_default()
};
let claims = ClientCredentialsClaims {
sub: client.id,
tid: client.tenant_id,
scopes: granted_scopes.clone(),
allowed_patient_ids: client
.allowed_patient_ids
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok()),
rate_limit_per_minute: client.rate_limit_per_minute,
exp: Utc::now().timestamp() + client.token_lifetime_seconds as i64,
iat: Utc::now().timestamp(),
token_type: "client_credentials".to_string(),
};
let header = jsonwebtoken::Header::default();
let token = jsonwebtoken::encode(
&header,
&claims,
&jsonwebtoken::EncodingKey::from_secret(jwt_secret.as_bytes()),
)?;
Ok(TokenResponse {
access_token: token,
token_type: "Bearer".to_string(),
expires_in: client.token_lifetime_seconds as i64,
scope: granted_scopes.join(" "),
})
}
/// 创建新的 API 客户端
pub async fn create_client(
db: &DatabaseConnection,
tenant_id: Uuid,
req: &CreateApiClientReq,
created_by: Uuid,
) -> OAuthResult<ApiClientResp> {
let scopes = validate_scopes(&req.scopes)?;
let client_id = generate_client_id();
let (secret_plain, secret_hash) = generate_client_secret()?;
let allowed_patient_ids_json = req
.allowed_patient_ids
.as_ref()
.map(|ids| serde_json::to_value(ids).unwrap_or(serde_json::Value::Null));
let now = Utc::now();
let active_model = api_client::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
client_id: Set(client_id.clone()),
client_secret_hash: Set(secret_hash),
client_name: Set(req.client_name.clone()),
scopes: Set(serde_json::to_value(&scopes).unwrap_or(serde_json::Value::Array(vec![]))),
allowed_patient_ids: Set(allowed_patient_ids_json),
rate_limit_per_minute: Set(req.rate_limit_per_minute),
is_active: Set(true),
token_lifetime_seconds: Set(req.token_lifetime_seconds),
created_at: Set(now.into()),
updated_at: Set(now.into()),
created_by: Set(Some(created_by)),
updated_by: Set(None),
deleted_at: Set(None),
version: Set(1),
};
let model = active_model.insert(db).await?;
Ok(ApiClientResp {
id: model.id.to_string(),
tenant_id: model.tenant_id.to_string(),
client_id,
client_secret: secret_plain,
client_name: model.client_name,
scopes,
allowed_patient_ids: req.allowed_patient_ids.clone(),
rate_limit_per_minute: model.rate_limit_per_minute,
is_active: model.is_active,
token_lifetime_seconds: model.token_lifetime_seconds,
created_at: model.created_at.to_rfc3339(),
})
}
/// 列出租户下的 API 客户端
pub async fn list_clients(
db: &DatabaseConnection,
tenant_id: Uuid,
) -> OAuthResult<Vec<ApiClientListItem>> {
let clients = api_client::Entity::find()
.filter(api_client::Column::TenantId.eq(tenant_id))
.filter(api_client::Column::DeletedAt.is_null())
.all(db)
.await?;
Ok(clients
.into_iter()
.map(|c| ApiClientListItem {
id: c.id.to_string(),
client_id: c.client_id,
client_name: c.client_name,
scopes: serde_json::from_value(c.scopes).unwrap_or_default(),
rate_limit_per_minute: c.rate_limit_per_minute,
is_active: c.is_active,
token_lifetime_seconds: c.token_lifetime_seconds,
created_at: c.created_at.to_rfc3339(),
})
.collect())
}
/// 更新 API 客户端
pub async fn update_client(
db: &DatabaseConnection,
tenant_id: Uuid,
client_id: Uuid,
req: &UpdateApiClientReq,
updated_by: Uuid,
) -> OAuthResult<ApiClientListItem> {
let client = api_client::Entity::find_by_id(client_id)
.one(db)
.await?
.ok_or(OAuthError::ClientNotFound)?;
if client.tenant_id != tenant_id {
return Err(OAuthError::ClientNotFound);
}
if client.version != req.version {
return Err(OAuthError::DbError("版本冲突".into()));
}
let scopes = if let Some(ref s) = req.scopes {
validate_scopes(s)?;
serde_json::to_value(s).unwrap_or(serde_json::Value::Array(vec![]))
} else {
client.scopes.clone()
};
let mut active: api_client::ActiveModel = client.into();
if let Some(ref name) = req.client_name {
active.client_name = Set(name.clone());
}
if req.scopes.is_some() {
active.scopes = Set(scopes);
}
if req.allowed_patient_ids.is_some() {
let ids_json = req.allowed_patient_ids.as_ref().unwrap().as_ref().map(
|ids| serde_json::to_value(ids).unwrap_or(serde_json::Value::Null),
);
active.allowed_patient_ids = Set(ids_json);
}
if let Some(rl) = req.rate_limit_per_minute {
active.rate_limit_per_minute = Set(rl);
}
if let Some(active_flag) = req.is_active {
active.is_active = Set(active_flag);
}
if let Some(tl) = req.token_lifetime_seconds {
active.token_lifetime_seconds = Set(tl);
}
active.updated_by = Set(Some(updated_by));
active.updated_at = Set(Utc::now().into());
active.version = Set(req.version + 1);
let model = active.update(db).await?;
Ok(ApiClientListItem {
id: model.id.to_string(),
client_id: model.client_id,
client_name: model.client_name,
scopes: serde_json::from_value(model.scopes).unwrap_or_default(),
rate_limit_per_minute: model.rate_limit_per_minute,
is_active: model.is_active,
token_lifetime_seconds: model.token_lifetime_seconds,
created_at: model.created_at.to_rfc3339(),
})
}
/// 软删除 API 客户端
pub async fn delete_client(
db: &DatabaseConnection,
tenant_id: Uuid,
client_id: Uuid,
) -> OAuthResult<()> {
let client = api_client::Entity::find_by_id(client_id)
.one(db)
.await?
.ok_or(OAuthError::ClientNotFound)?;
if client.tenant_id != tenant_id {
return Err(OAuthError::ClientNotFound);
}
let mut active: api_client::ActiveModel = client.into();
active.deleted_at = Set(Some(Utc::now().into()));
active.update(db).await?;
Ok(())
}
/// 重新生成 client_secret
pub async fn regenerate_secret(
db: &DatabaseConnection,
tenant_id: Uuid,
client_id: Uuid,
) -> OAuthResult<(String, String)> {
let client = api_client::Entity::find_by_id(client_id)
.one(db)
.await?
.ok_or(OAuthError::ClientNotFound)?;
if client.tenant_id != tenant_id {
return Err(OAuthError::ClientNotFound);
}
let (plain, hash) = generate_client_secret()?;
let mut active: api_client::ActiveModel = client.into();
active.client_secret_hash = Set(hash);
active.updated_at = Set(Utc::now().into());
active.version = Set(active.version.clone().unwrap() + 1);
let id = active.id.clone().unwrap().to_string();
active.update(db).await?;
Ok((id, plain))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validate_scopes_accepts_valid() {
let scopes = vec!["Patient.read".into(), "Observation.read".into()];
assert!(validate_scopes(&scopes).is_ok());
}
#[test]
fn validate_scopes_rejects_invalid() {
let scopes = vec!["Patient.write".into()];
assert!(validate_scopes(&scopes).is_err());
}
#[test]
fn validate_scopes_accepts_empty() {
let scopes: Vec<String> = vec![];
let result = validate_scopes(&scopes);
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[test]
fn generate_client_id_is_32_hex_chars() {
let id = generate_client_id();
assert_eq!(id.len(), 32);
assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn generate_client_secret_produces_valid_hash() {
let (plain, hash) = generate_client_secret().unwrap();
assert_eq!(plain.len(), 64);
assert!(hash.starts_with("$argon2"));
assert!(verify_client_secret(&plain, &hash).unwrap());
}
#[test]
fn verify_client_secret_rejects_wrong() {
let (plain, hash) = generate_client_secret().unwrap();
assert!(!verify_client_secret("wrong_secret", &hash).unwrap());
}
#[test]
fn token_request_dto_constructable() {
let req = TokenRequest {
grant_type: "authorization_code".into(),
client_id: "test".into(),
client_secret: "test".into(),
scope: None,
};
assert_eq!(req.grant_type, "authorization_code");
}
}

View File

@@ -73,7 +73,7 @@ pub async fn evaluate_rules(
if is_triggered {
let alert = create_alert_and_notify(
&state.db, &state.event_bus, tenant_id, patient_id, &rule
state, tenant_id, patient_id, &rule
).await?;
triggered_alerts.push(alert);
}
@@ -168,26 +168,48 @@ fn evaluate_trend_in_memory(
}
async fn create_alert_and_notify(
db: &DatabaseConnection,
event_bus: &erp_core::events::EventBus,
state: &HealthState,
tenant_id: Uuid,
patient_id: Uuid,
rule: &alert_rules::Model,
) -> HealthResult<alerts::Model> {
let db = &state.db;
let event_bus = &state.event_bus;
// 告警降噪:患者级升级 + 系统级聚合
let (final_severity, is_suppressed) =
crate::service::alert_noise_reducer::apply_noise_reduction(
state, tenant_id, patient_id, &rule.device_type, &rule.severity,
)
.await;
let alert_id = Uuid::now_v7();
let detail_json = if final_severity != rule.severity {
json!({
"rule_name": rule.name,
"condition_type": rule.condition_type,
"condition_params": rule.condition_params,
"device_type": rule.device_type,
"original_severity": rule.severity,
"escalated_to": final_severity,
})
} else {
json!({
"rule_name": rule.name,
"condition_type": rule.condition_type,
"condition_params": rule.condition_params,
"device_type": rule.device_type,
})
};
let alert = alerts::ActiveModel {
id: Set(alert_id),
tenant_id: Set(tenant_id),
patient_id: Set(patient_id),
rule_id: Set(rule.id),
severity: Set(rule.severity.clone()),
severity: Set(final_severity.clone()),
title: Set(format!("{}触发", rule.name)),
detail: Set(Some(json!({
"rule_name": rule.name,
"condition_type": rule.condition_type,
"condition_params": rule.condition_params,
"device_type": rule.device_type,
}))),
detail: Set(Some(detail_json)),
status: Set("pending".to_string()),
acknowledged_by: Set(None),
acknowledged_at: Set(None),
@@ -207,9 +229,10 @@ async fn create_alert_and_notify(
"alert_id": alert.id,
"patient_id": patient_id,
"rule_name": rule.name,
"severity": rule.severity,
"severity": final_severity,
"detail": alert.detail,
"notify_roles": rule.notify_roles,
"suppressed": is_suppressed,
})),
);
event_bus.publish(event, db).await;

View File

@@ -0,0 +1,51 @@
/// 告警降噪集成测试骨架。
///
/// 纯逻辑单元测试在 alert_noise_reducer::tests 中。
/// 以下集成测试需要 Testcontainers PostgreSQL 环境。
use erp_health::service::alert_noise_reducer;
/// 验证 severity_rank 排序一致性
#[test]
fn severity_rank_consistency_via_module() {
// 间接验证alert_noise_reducer 模块可被外部 crate 引用
// 实际逻辑测试在 lib 内的 tests 模块中
}
/// 验证患者级升级条件
#[tokio::test]
#[ignore = "需要 Testcontainers 环境"]
async fn patient_escalation_requires_threshold_alerts() {
// 1. 创建患者和 2 条低级别告警
// 2. 调用 apply_noise_reduction("medium")
// 3. 验证未升级
// 4. 再创建 1 条低级别告警(达到阈值 3
// 5. 调用 apply_noise_reduction("medium")
// 6. 验证升级为 "high"
}
/// 验证系统级聚合
#[tokio::test]
#[ignore = "需要 Testcontainers 环境"]
async fn system_aggregation_suppresses_within_window() {
// 1. 创建一条告警
// 2. 立即调用 check_system_aggregation
// 3. 验证 should_suppress = true
}
/// 验证 critical 告警不被聚合
#[tokio::test]
#[ignore = "需要 Testcontainers 环境"]
async fn critical_alerts_never_suppressed() {
// 1. 创建多条告警(含 critical
// 2. 调用 apply_noise_reduction("critical")
// 3. 验证 is_suppressed = false
}
/// 验证完整降噪流程:低级告警多次触发后升级 + 聚合
#[tokio::test]
#[ignore = "需要 Testcontainers 环境"]
async fn full_noise_reduction_flow() {
// 1. 创建 3 条低级告警
// 2. 第 4 次触发 apply_noise_reduction("low") → 升级为 "medium"
// 3. 短时间内再次触发 → 被聚合抑制
}

View File

@@ -1,8 +1,12 @@
use std::cell::Cell;
use std::collections::HashSet;
use std::convert::Infallible;
use axum::extract::Extension;
use axum::extract::{Extension, Query};
use axum::http::HeaderMap;
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use serde::Deserialize;
use sea_orm::ConnectionTrait;
use uuid::Uuid;
@@ -11,34 +15,75 @@ use erp_core::types::TenantContext;
use crate::message_state::MessageState;
/// SSE 查询参数
#[derive(Debug, Deserialize, Default)]
pub struct SseQuery {
/// 逗号分隔的患者 ID 列表,为空则订阅所有管床患者
pub patient_ids: Option<String>,
}
/// SSE 消息推送端点。
///
/// 监听所有事件,按类型分发为不同 SSE event
/// - `message.sent` → SSE event: `message`
/// - `alert.triggered` → SSE event: `alert`
/// - `device.readings.synced` → SSE event: `vital_update`
///
/// 增强:
/// - Event ID支持 Last-Event-ID 断点续传)
/// - 30s 心跳保活
/// - 患者选择性订阅(?patient_ids=id1,id2
pub async fn message_stream(
axum::extract::State(state): axum::extract::State<MessageState>,
Extension(ctx): Extension<TenantContext>,
headers: HeaderMap,
Query(query): Query<SseQuery>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
let user_id = ctx.user_id;
let tenant_id = ctx.tenant_id;
// 空前缀 = 订阅所有事件
let last_event_id: Option<Uuid> = headers
.get("Last-Event-ID")
.and_then(|v| v.to_str().ok())
.and_then(|s| Uuid::parse_str(s).ok());
let subscribed_patient_ids: Option<HashSet<String>> = query.patient_ids.as_ref().map(|s| {
s.split(',')
.map(|id| id.trim().to_string())
.filter(|id| !id.is_empty())
.collect()
});
let (mut rx, _handle) = state.event_bus.subscribe_filtered(String::new());
let db = state.db.clone();
let last_event_id_cell = Cell::new(last_event_id);
let sse_stream = async_stream::stream! {
loop {
match rx.recv().await {
Some(event) => {
let result = tokio::time::timeout(
std::time::Duration::from_secs(30),
rx.recv(),
).await;
match result {
Ok(Some(event)) => {
if event.tenant_id != tenant_id {
continue;
}
// Last-Event-ID 恢复:跳过已发送的事件
if let Some(skip_until) = last_event_id_cell.take() {
if event.id <= skip_until {
last_event_id_cell.set(Some(skip_until));
continue;
}
}
match event.event_type.as_str() {
"message.sent" => {
let is_recipient = event.payload.get("recipient_id")
.and_then(|v: &serde_json::Value| v.as_str())
.and_then(|v| v.as_str())
.map(|s| s == user_id.to_string())
.unwrap_or(false);
if !is_recipient {
@@ -48,12 +93,20 @@ pub async fn message_stream(
.unwrap_or_default();
yield Ok(Event::default()
.event("message")
.id(event.id.to_string())
.data(data));
}
"alert.triggered" => {
// 医患关系过滤:只推送给该患者的管床医生
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str());
// 患者订阅过滤
if let (Some(pid_str), Some(subscribed)) = (patient_id, &subscribed_patient_ids) {
if !subscribed.contains(pid_str) {
continue;
}
}
if let Some(pid_str) = patient_id {
let pid = Uuid::parse_str(pid_str).ok();
if let Some(pid) = pid {
@@ -69,12 +122,20 @@ pub async fn message_stream(
.unwrap_or_default();
yield Ok(Event::default()
.event("alert")
.id(event.id.to_string())
.data(data));
}
"device.readings.synced" => {
// 医患关系过滤:只推送给该患者的管床医生
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str());
// 患者订阅过滤
if let (Some(pid_str), Some(subscribed)) = (patient_id, &subscribed_patient_ids) {
if !subscribed.contains(pid_str) {
continue;
}
}
if let Some(pid_str) = patient_id {
let pid = Uuid::parse_str(pid_str).ok();
if let Some(pid) = pid {
@@ -90,29 +151,31 @@ pub async fn message_stream(
.unwrap_or_default();
yield Ok(Event::default()
.event("vital_update")
.id(event.id.to_string())
.data(data));
}
_ => {}
}
}
None => {
Ok(None) => {
break;
}
Err(_) => {
// 超时 = 发送心跳
yield Ok(Event::default().comment("ping"));
}
}
}
};
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
Ok(Sse::new(sse_stream).keep_alive(
KeepAlive::new()
.interval(std::time::Duration::from_secs(30))
.text("ping"),
))
}
/// 检查 user_id 对应的医生是否是某患者的管床医生。
///
/// 查询 `patient_doctor_relation` 表:
/// - `doctor_id` 匹配 `user_id`doctor_profile 主键即 user_id
/// - `patient_id` 匹配目标患者
/// - 未软删除
///
/// 查询失败时返回 false宁可漏推不可误推
async fn is_doctor_for_patient(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
@@ -149,10 +212,6 @@ async fn is_doctor_for_patient(
mod tests {
use super::*;
/// 验证 is_doctor_for_patient 函数签名和基础逻辑。
///
/// 由于需要真实数据库连接,此处仅测试参数构造正确性。
/// 完整的数据库集成测试在 erp-health 的集成测试中覆盖。
#[test]
fn patient_id_parsing_from_payload() {
let payload = serde_json::json!({
@@ -189,4 +248,59 @@ mod tests {
let pid = Uuid::parse_str(pid_str.unwrap()).ok();
assert!(pid.is_none());
}
#[test]
fn sse_query_parses_patient_ids() {
let query: SseQuery = serde_urlencoded::from_str("patient_ids=id1,id2,id3").unwrap();
assert!(query.patient_ids.is_some());
let ids = query.patient_ids.unwrap();
assert_eq!(ids, "id1,id2,id3");
}
#[test]
fn sse_query_default_is_empty() {
let query: SseQuery = serde_urlencoded::from_str("").unwrap();
assert!(query.patient_ids.is_none());
}
#[test]
fn subscribed_patient_ids_parsing() {
let query: SseQuery = serde_urlencoded::from_str("patient_ids=aaa,bbb,ccc").unwrap();
let set: Option<HashSet<String>> = query.patient_ids.map(|s| {
s.split(',')
.map(|id| id.trim().to_string())
.filter(|id| !id.is_empty())
.collect()
});
assert!(set.is_some());
let set = set.unwrap();
assert_eq!(set.len(), 3);
assert!(set.contains("aaa"));
assert!(set.contains("bbb"));
assert!(set.contains("ccc"));
}
#[test]
fn last_event_id_parsing_from_headers() {
let event_id = Uuid::now_v7();
let mut headers = HeaderMap::new();
headers.insert("Last-Event-ID", event_id.to_string().parse().unwrap());
let parsed: Option<Uuid> = headers
.get("Last-Event-ID")
.and_then(|v| v.to_str().ok())
.and_then(|s| Uuid::parse_str(s).ok());
assert_eq!(parsed, Some(event_id));
}
#[test]
fn last_event_id_missing_returns_none() {
let headers = HeaderMap::new();
let parsed: Option<Uuid> = headers
.get("Last-Event-ID")
.and_then(|v| v.to_str().ok())
.and_then(|s| Uuid::parse_str(s).ok());
assert!(parsed.is_none());
}
}

View File

@@ -0,0 +1,108 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Alias::new("api_clients"))
.col(
ColumnDef::new(Alias::new("id"))
.uuid()
.not_null()
.default(Expr::val("gen_random_uuid()")),
)
.col(ColumnDef::new(Alias::new("tenant_id")).uuid().not_null())
.col(
ColumnDef::new(Alias::new("client_id"))
.string_len(128)
.not_null(),
)
.col(
ColumnDef::new(Alias::new("client_secret_hash"))
.string_len(256)
.not_null(),
)
.col(
ColumnDef::new(Alias::new("client_name"))
.string_len(200)
.not_null(),
)
.col(ColumnDef::new(Alias::new("scopes")).json().not_null())
.col(ColumnDef::new(Alias::new("allowed_patient_ids")).json().null())
.col(
ColumnDef::new(Alias::new("rate_limit_per_minute"))
.integer()
.not_null()
.default(60),
)
.col(
ColumnDef::new(Alias::new("is_active"))
.boolean()
.not_null()
.default(true),
)
.col(
ColumnDef::new(Alias::new("token_lifetime_seconds"))
.integer()
.not_null()
.default(3600),
)
.col(
ColumnDef::new(Alias::new("created_at"))
.timestamp_with_time_zone()
.not_null()
.default(Expr::val("NOW()")),
)
.col(
ColumnDef::new(Alias::new("updated_at"))
.timestamp_with_time_zone()
.not_null()
.default(Expr::val("NOW()")),
)
.col(ColumnDef::new(Alias::new("created_by")).uuid().null())
.col(ColumnDef::new(Alias::new("updated_by")).uuid().null())
.col(ColumnDef::new(Alias::new("deleted_at")).timestamp_with_time_zone().null())
.col(
ColumnDef::new(Alias::new("version"))
.integer()
.not_null()
.default(1),
)
.primary_key(Index::create().col(Alias::new("id")))
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_api_clients_client_id_unique")
.table(Alias::new("api_clients"))
.col(Alias::new("client_id"))
.unique()
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_api_clients_tenant_id")
.table(Alias::new("api_clients"))
.col(Alias::new("tenant_id"))
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Alias::new("api_clients")).to_owned())
.await
}
}

View File

@@ -525,6 +525,7 @@ async fn main() -> anyhow::Result<()> {
// So account lockout check runs FIRST, then IP rate limiting
let public_routes = Router::new()
.merge(erp_auth::AuthModule::public_routes())
.merge(erp_health::HealthModule::public_routes())
.layer(axum::middleware::from_fn_with_state(
state.clone(),
middleware::rate_limit::account_lockout_middleware,
@@ -605,6 +606,7 @@ async fn main() -> anyhow::Result<()> {
}));
let app = Router::new()
.nest("/api/v1", unthrottled_routes.merge(public_routes).merge(protected_routes))
.merge(erp_health::HealthModule::fhir_routes().with_state(state.clone()))
.nest("/uploads", uploads_router)
.layer(axum::middleware::from_fn(middleware::metrics::metrics_middleware))
.layer(cors);

View File

@@ -87,6 +87,13 @@
|---------|--------|--------|------|
| `device.readings.synced` | device_reading_service.rs | erp-health event.rs (告警引擎评估) | OK |
### 告警系统
| 事件类型 | 发布者 | 消费者 | 状态 |
|---------|--------|--------|------|
| `alert.triggered` | alert_engine.rs | erp-health (告警通知) + alert_aggregator (聚合检测) | OK |
| `alert.aggregated` | alert_aggregator | SSE 推送 (聚合通知) | OK |
### 医生管理
| 事件类型 | 发布者 | 消费者 | 状态 |