From 975d699e4248b60a6477fc73198fea4309f8633c Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 4 May 2026 02:43:32 +0800 Subject: [PATCH] =?UTF-8?q?feat(health):=20=E5=91=8A=E8=AD=A6=E9=99=8D?= =?UTF-8?q?=E5=99=AA=E9=9B=86=E6=88=90=20alert=5Fengine=20+=20OAuth=20serv?= =?UTF-8?q?ice=20=E7=BC=96=E8=AF=91=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - alert_engine: create_alert_and_notify 调用 noise_reducer,升级严重度+suppressed标记 - oauth/service: 修复 OsRng import + ActiveModel move 问题 - fhir/handler: linter 补全完整实现 --- crates/erp-health/src/fhir/handler.rs | 626 +++++++++++++++--- crates/erp-health/src/oauth/service.rs | 394 +++++++++++ crates/erp-health/src/service/alert_engine.rs | 45 +- 3 files changed, 953 insertions(+), 112 deletions(-) create mode 100644 crates/erp-health/src/oauth/service.rs diff --git a/crates/erp-health/src/fhir/handler.rs b/crates/erp-health/src/fhir/handler.rs index d9fb798..0c4e03c 100644 --- a/crates/erp-health/src/fhir/handler.rs +++ b/crates/erp-health/src/fhir/handler.rs @@ -1,13 +1,21 @@ use axum::extract::{FromRef, Path, Query, State}; use axum::response::IntoResponse; +use axum::Extension; use axum::Json; +use sea_orm::*; use serde::Deserialize; use uuid::Uuid; +use erp_core::error::AppError; +use erp_core::rbac::require_permission; +use erp_core::types::TenantContext; + +use crate::fhir::converter; +use crate::fhir::types::{category_to_device_types, loinc_to_device_type}; use crate::state::HealthState; /// GET /fhir/R4/metadata — FHIR CapabilityStatement -pub async fn capability_statement() -> Result +pub async fn capability_statement() -> Result where HealthState: FromRef, S: Clone + Send + Sync + 'static, @@ -42,187 +50,603 @@ where } #[derive(Debug, Deserialize)] -pub struct SearchParams { +pub struct FhirSearchParams { + #[serde(rename = "_id")] + pub id: Option, + #[serde(rename = "_count")] + pub count: Option, + #[serde(rename = "_offset")] + pub offset: Option, pub patient: Option, pub category: Option, - #[serde(rename = "_count")] - pub count: Option, - #[serde(rename = "_offset")] - pub offset: Option, + pub code: Option, + pub date: Option, + pub name: Option, + pub identifier: Option, + pub status: Option, } // ── Patient ──────────────────────────────────────────────────────────── pub async fn search_patients( - State(_state): State, - Query(_params): Query, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.patient.list")?; + + let mut query = crate::entity::patient::Entity::find() + .filter(crate::entity::patient::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::patient::Column::DeletedAt.is_null()); + + if let Some(ref id) = params.id { + let uid = Uuid::parse_str(id).map_err(|_| AppError::Validation("Invalid patient id".into()))?; + query = query.filter(crate::entity::patient::Column::Id.eq(uid)); + } + if let Some(ref name) = params.name { + query = query.filter(crate::entity::patient::Column::Name.contains(name)); + } + if let Some(ref identifier) = params.identifier { + query = query.filter(crate::entity::patient::Column::IdNumber.contains(identifier)); + } + + let limit = params.count.unwrap_or(20).min(100); + let offset = params.offset.unwrap_or(0); + let patients = query + .limit(limit) + .offset(offset) + .all(&state.db) + .await?; + + let entries: Vec = patients.iter() + .map(|p| serde_json::json!({"resource": converter::patient_to_fhir(p)})) + .collect(); + Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } pub async fn get_patient( - State(_state): State, - Path(_id): Path, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "OperationOutcome", - "issue": [{"severity": "information", "code": "not-found", "diagnostics": "Patient not implemented yet"}] - }))) -} + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.patient.list")?; -pub async fn patient_everything( - State(_state): State, - Path(_id): Path, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] - }))) + let patient = crate::entity::patient::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("Patient not found".into()))?; + + if patient.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + Ok(Json(converter::patient_to_fhir(&patient))) } // ── Observation ──────────────────────────────────────────────────────── pub async fn search_observations( - State(_state): State, - Query(_params): Query, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] - }))) -} + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.device-readings.list")?; + + let mut query = crate::entity::device_readings::Entity::find() + .filter(crate::entity::device_readings::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::device_readings::Column::DeletedAt.is_null()); + + if let Some(ref patient_id) = params.patient { + let uid = Uuid::parse_str(patient_id) + .map_err(|_| AppError::Validation("Invalid patient id".into()))?; + query = query.filter(crate::entity::device_readings::Column::PatientId.eq(uid)); + } + if let Some(ref code) = params.code { + if let Some(dt) = loinc_to_device_type(code) { + query = query.filter(crate::entity::device_readings::Column::DeviceType.eq(dt)); + } + } + if let Some(ref category) = params.category { + let types = category_to_device_types(category); + if !types.is_empty() { + query = query.filter( + crate::entity::device_readings::Column::DeviceType.is_in(types) + ); + } + } + if let Some(ref date) = params.date { + if let Some(after) = date.strip_prefix("gt") { + if let Ok(dt) = after.parse::>() { + query = query.filter( + crate::entity::device_readings::Column::MeasuredAt.gt(dt) + ); + } + } else if let Some(before) = date.strip_prefix("lt") { + if let Ok(dt) = before.parse::>() { + query = query.filter( + crate::entity::device_readings::Column::MeasuredAt.lt(dt) + ); + } + } else if let Ok(dt) = date.parse::>() { + let start = dt.date_naive().and_hms_opt(0, 0, 0).unwrap().and_utc(); + let end = dt.date_naive().and_hms_opt(23, 59, 59).unwrap().and_utc(); + query = query.filter( + crate::entity::device_readings::Column::MeasuredAt.between(start, end) + ); + } + } + + let limit = params.count.unwrap_or(50).min(200); + let readings = query + .order_by_desc(crate::entity::device_readings::Column::MeasuredAt) + .limit(limit) + .all(&state.db) + .await?; + + let mut entries = Vec::new(); + for reading in &readings { + for obs in converter::device_reading_to_fhir_observations(reading) { + entries.push(serde_json::json!({"resource": obs})); + } + } -pub async fn observation_lastn( - State(_state): State, - Query(_params): Query, -) -> Result { Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } // ── Device ───────────────────────────────────────────────────────────── pub async fn search_devices( - State(_state): State, - Query(_params): Query, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.devices.list")?; + + let mut query = crate::entity::patient_devices::Entity::find() + .filter(crate::entity::patient_devices::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::patient_devices::Column::DeletedAt.is_null()); + + if let Some(ref patient_id) = params.patient { + let uid = Uuid::parse_str(patient_id) + .map_err(|_| AppError::Validation("Invalid patient id".into()))?; + query = query.filter(crate::entity::patient_devices::Column::PatientId.eq(uid)); + } + + let limit = params.count.unwrap_or(50).min(200); + let devices = query.limit(limit).all(&state.db).await?; + + let entries: Vec = devices.iter() + .map(|d| serde_json::json!({"resource": converter::patient_device_to_fhir(d)})) + .collect(); + Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } pub async fn get_device( - State(_state): State, - Path(_id): Path, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "OperationOutcome", - "issue": [{"severity": "information", "code": "not-found", "diagnostics": "Device not implemented yet"}] - }))) + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.devices.list")?; + + let device = crate::entity::patient_devices::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("Device not found".into()))?; + + if device.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + Ok(Json(converter::patient_device_to_fhir(&device))) } // ── Practitioner ─────────────────────────────────────────────────────── pub async fn search_practitioners( - State(_state): State, - Query(_params): Query, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.doctor.list")?; + + let mut query = crate::entity::doctor_profile::Entity::find() + .filter(crate::entity::doctor_profile::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::doctor_profile::Column::DeletedAt.is_null()); + + if let Some(ref name) = params.name { + query = query.filter(crate::entity::doctor_profile::Column::Name.contains(name)); + } + + let limit = params.count.unwrap_or(50).min(200); + let doctors = query.limit(limit).all(&state.db).await?; + + let entries: Vec = doctors.iter() + .map(|d| serde_json::json!({"resource": converter::doctor_to_fhir(d)})) + .collect(); + Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } pub async fn get_practitioner( - State(_state): State, - Path(_id): Path, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "OperationOutcome", - "issue": [{"severity": "information", "code": "not-found", "diagnostics": "Practitioner not implemented yet"}] - }))) + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.doctor.list")?; + + let doctor = crate::entity::doctor_profile::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("Practitioner not found".into()))?; + + if doctor.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + Ok(Json(converter::doctor_to_fhir(&doctor))) } // ── Appointment ──────────────────────────────────────────────────────── pub async fn search_appointments( - State(_state): State, - Query(_params): Query, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.appointment.list")?; + + let mut query = crate::entity::appointment::Entity::find() + .filter(crate::entity::appointment::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::appointment::Column::DeletedAt.is_null()); + + if let Some(ref patient_id) = params.patient { + let uid = Uuid::parse_str(patient_id) + .map_err(|_| AppError::Validation("Invalid patient id".into()))?; + query = query.filter(crate::entity::appointment::Column::PatientId.eq(uid)); + } + if let Some(ref status) = params.status { + query = query.filter(crate::entity::appointment::Column::Status.eq(status)); + } + + let limit = params.count.unwrap_or(50).min(200); + let appointments = query + .order_by_desc(crate::entity::appointment::Column::AppointmentDate) + .limit(limit) + .all(&state.db) + .await?; + + let entries: Vec = appointments.iter() + .map(|a| serde_json::json!({"resource": converter::appointment_to_fhir(a)})) + .collect(); + Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } pub async fn get_appointment( - State(_state): State, - Path(_id): Path, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "OperationOutcome", - "issue": [{"severity": "information", "code": "not-found", "diagnostics": "Appointment not implemented yet"}] - }))) + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.appointment.list")?; + + let appointment = crate::entity::appointment::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("Appointment not found".into()))?; + + if appointment.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + Ok(Json(converter::appointment_to_fhir(&appointment))) } // ── DiagnosticReport ─────────────────────────────────────────────────── pub async fn search_diagnostic_reports( - State(_state): State, - Query(_params): Query, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.health-data.list")?; + + let mut query = crate::entity::lab_report::Entity::find() + .filter(crate::entity::lab_report::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::lab_report::Column::DeletedAt.is_null()); + + if let Some(ref patient_id) = params.patient { + let uid = Uuid::parse_str(patient_id) + .map_err(|_| AppError::Validation("Invalid patient id".into()))?; + query = query.filter(crate::entity::lab_report::Column::PatientId.eq(uid)); + } + if let Some(ref code) = params.code { + query = query.filter(crate::entity::lab_report::Column::ReportType.eq(code)); + } + if let Some(ref status) = params.status { + query = query.filter(crate::entity::lab_report::Column::Status.eq(status)); + } + + let limit = params.count.unwrap_or(50).min(200); + let reports = query + .order_by_desc(crate::entity::lab_report::Column::ReportDate) + .limit(limit) + .all(&state.db) + .await?; + + let entries: Vec = reports.iter() + .map(|r| serde_json::json!({"resource": converter::lab_report_to_fhir(r)})) + .collect(); + Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } pub async fn get_diagnostic_report( - State(_state): State, - Path(_id): Path, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "OperationOutcome", - "issue": [{"severity": "information", "code": "not-found", "diagnostics": "DiagnosticReport not implemented yet"}] - }))) + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.health-data.list")?; + + let report = crate::entity::lab_report::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("DiagnosticReport not found".into()))?; + + if report.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + Ok(Json(converter::lab_report_to_fhir(&report))) } // ── Encounter ────────────────────────────────────────────────────────── pub async fn search_encounters( - State(_state): State, - Query(_params): Query, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.consultation.list")?; + + let mut query = crate::entity::consultation_session::Entity::find() + .filter(crate::entity::consultation_session::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::consultation_session::Column::DeletedAt.is_null()); + + if let Some(ref patient_id) = params.patient { + let uid = Uuid::parse_str(patient_id) + .map_err(|_| AppError::Validation("Invalid patient id".into()))?; + query = query.filter(crate::entity::consultation_session::Column::PatientId.eq(uid)); + } + if let Some(ref status) = params.status { + query = query.filter(crate::entity::consultation_session::Column::Status.eq(status)); + } + + let limit = params.count.unwrap_or(50).min(200); + let sessions = query + .order_by_desc(crate::entity::consultation_session::Column::CreatedAt) + .limit(limit) + .all(&state.db) + .await?; + + let entries: Vec = sessions.iter() + .map(|s| serde_json::json!({"resource": converter::consultation_to_fhir(s)})) + .collect(); + Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } pub async fn get_encounter( - State(_state): State, - Path(_id): Path, -) -> Result { - Ok(Json(serde_json::json!({ - "resourceType": "OperationOutcome", - "issue": [{"severity": "information", "code": "not-found", "diagnostics": "Encounter not implemented yet"}] - }))) + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.consultation.list")?; + + let session = crate::entity::consultation_session::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("Encounter not found".into()))?; + + if session.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + Ok(Json(converter::consultation_to_fhir(&session))) } // ── Task ─────────────────────────────────────────────────────────────── pub async fn search_tasks( - State(_state): State, - Query(_params): Query, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Query(params): Query, +) -> Result { + require_permission(&ctx, "health.follow-up.list")?; + + let mut query = crate::entity::follow_up_task::Entity::find() + .filter(crate::entity::follow_up_task::Column::TenantId.eq(ctx.tenant_id)) + .filter(crate::entity::follow_up_task::Column::DeletedAt.is_null()); + + if let Some(ref patient_id) = params.patient { + let uid = Uuid::parse_str(patient_id) + .map_err(|_| AppError::Validation("Invalid patient id".into()))?; + query = query.filter(crate::entity::follow_up_task::Column::PatientId.eq(uid)); + } + if let Some(ref status) = params.status { + query = query.filter(crate::entity::follow_up_task::Column::Status.eq(status)); + } + + let limit = params.count.unwrap_or(50).min(200); + let tasks = query + .order_by_desc(crate::entity::follow_up_task::Column::PlannedDate) + .limit(limit) + .all(&state.db) + .await?; + + let entries: Vec = tasks.iter() + .map(|t| serde_json::json!({"resource": converter::follow_up_to_fhir(t)})) + .collect(); + Ok(Json(serde_json::json!({ - "resourceType": "Bundle", "type": "searchset", "total": 0, "entry": [] + "resourceType": "Bundle", + "type": "searchset", + "total": entries.len(), + "entry": entries, }))) } pub async fn get_task( - State(_state): State, - Path(_id): Path, -) -> Result { + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.follow-up.list")?; + + let task = crate::entity::follow_up_task::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("Task not found".into()))?; + + if task.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + Ok(Json(converter::follow_up_to_fhir(&task))) +} + +// ── $everything ──────────────────────────────────────────────────────── + +/// GET /fhir/R4/Patient/{id}/$everything +pub async fn patient_everything( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result { + require_permission(&ctx, "health.patient.list")?; + + let patient = crate::entity::patient::Entity::find_by_id(id) + .one(&state.db) + .await? + .ok_or_else(|| AppError::NotFound("Patient not found".into()))?; + + if patient.tenant_id != ctx.tenant_id { + return Err(AppError::Forbidden("Access denied".into())); + } + + let mut entries = Vec::new(); + + // 1. Patient + entries.push(serde_json::json!({ + "resource": converter::patient_to_fhir(&patient), + "fullUrl": format!("https://hms.local/fhir/R4/Patient/{}", id), + })); + + // 2. Observations(设备读数) + let readings = crate::entity::device_readings::Entity::find() + .filter(crate::entity::device_readings::Column::PatientId.eq(id)) + .filter(crate::entity::device_readings::Column::DeletedAt.is_null()) + .limit(200) + .all(&state.db) + .await?; + for r in &readings { + for obs in converter::device_reading_to_fhir_observations(r) { + entries.push(serde_json::json!({"resource": obs})); + } + } + + // 3. Devices + let devices = crate::entity::patient_devices::Entity::find() + .filter(crate::entity::patient_devices::Column::PatientId.eq(id)) + .filter(crate::entity::patient_devices::Column::DeletedAt.is_null()) + .all(&state.db) + .await?; + for d in &devices { + entries.push(serde_json::json!({"resource": converter::patient_device_to_fhir(d)})); + } + + // 4. Encounters(咨询会话) + let consultations = crate::entity::consultation_session::Entity::find() + .filter(crate::entity::consultation_session::Column::PatientId.eq(id)) + .filter(crate::entity::consultation_session::Column::DeletedAt.is_null()) + .all(&state.db) + .await?; + for c in &consultations { + entries.push(serde_json::json!({"resource": converter::consultation_to_fhir(c)})); + } + + // 5. Appointments + let appointments = crate::entity::appointment::Entity::find() + .filter(crate::entity::appointment::Column::PatientId.eq(id)) + .filter(crate::entity::appointment::Column::DeletedAt.is_null()) + .all(&state.db) + .await?; + for a in &appointments { + entries.push(serde_json::json!({"resource": converter::appointment_to_fhir(a)})); + } + + // 6. Tasks(随访任务) + let tasks = crate::entity::follow_up_task::Entity::find() + .filter(crate::entity::follow_up_task::Column::PatientId.eq(id)) + .filter(crate::entity::follow_up_task::Column::DeletedAt.is_null()) + .limit(50) + .all(&state.db) + .await?; + for t in &tasks { + entries.push(serde_json::json!({"resource": converter::follow_up_to_fhir(t)})); + } + + // 7. DiagnosticReports(化验报告) + let reports = crate::entity::lab_report::Entity::find() + .filter(crate::entity::lab_report::Column::PatientId.eq(id)) + .filter(crate::entity::lab_report::Column::DeletedAt.is_null()) + .limit(50) + .all(&state.db) + .await?; + for r in &reports { + entries.push(serde_json::json!({"resource": converter::lab_report_to_fhir(r)})); + } + Ok(Json(serde_json::json!({ - "resourceType": "OperationOutcome", - "issue": [{"severity": "information", "code": "not-found", "diagnostics": "Task not implemented yet"}] + "resourceType": "Bundle", + "type": "collection", + "total": entries.len(), + "entry": entries, }))) } diff --git a/crates/erp-health/src/oauth/service.rs b/crates/erp-health/src/oauth/service.rs new file mode 100644 index 0000000..1412681 --- /dev/null +++ b/crates/erp-health/src/oauth/service.rs @@ -0,0 +1,394 @@ +use argon2::{ + Argon2, + password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString, rand_core::RngCore}, +}; +use chrono::Utc; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, Set, +}; +use uuid::Uuid; + +use crate::entity::api_client; +use crate::oauth::dto::*; +use crate::oauth::error::{OAuthError, OAuthResult}; +use crate::oauth::middleware::ClientCredentialsClaims; + +const ALLOWED_SCOPES: &[&str] = &[ + "Patient.read", + "Observation.read", + "Device.read", + "DiagnosticReport.read", + "Encounter.read", + "Practitioner.read", + "Appointment.read", + "Task.read", +]; + +fn validate_scopes(requested: &[String]) -> OAuthResult> { + for scope in requested { + if !ALLOWED_SCOPES.contains(&scope.as_str()) { + return Err(OAuthError::InvalidScope); + } + } + Ok(requested.to_vec()) +} + +fn generate_client_id() -> String { + use argon2::password_hash::rand_core::OsRng; + let mut bytes = [0u8; 16]; + OsRng.fill_bytes(&mut bytes); + hex::encode(bytes) +} + +fn generate_client_secret() -> OAuthResult<(String, String)> { + use argon2::password_hash::rand_core::OsRng; + let mut bytes = [0u8; 32]; + OsRng.fill_bytes(&mut bytes); + let plain = hex::encode(bytes); + + let salt = SaltString::generate(&mut OsRng); + let hash = Argon2::default() + .hash_password(plain.as_bytes(), &salt) + .map_err(|e| OAuthError::HashError(e.to_string()))?; + + Ok((plain, hash.to_string())) +} + +fn verify_client_secret(plain: &str, hash: &str) -> OAuthResult { + let parsed = PasswordHash::new(hash).map_err(|e| OAuthError::HashError(e.to_string()))?; + Ok(Argon2::default() + .verify_password(plain.as_bytes(), &parsed) + .is_ok()) +} + +pub struct OAuthService; + +impl OAuthService { + /// Client Credentials Grant — 验证客户端并签发 JWT + pub async fn token( + db: &DatabaseConnection, + req: &TokenRequest, + jwt_secret: &str, + ) -> OAuthResult { + if req.grant_type != "client_credentials" { + return Err(OAuthError::UnsupportedGrantType); + } + + let client = api_client::Entity::find() + .filter(api_client::Column::ClientId.eq(&req.client_id)) + .filter(api_client::Column::DeletedAt.is_null()) + .one(db) + .await? + .ok_or(OAuthError::InvalidClient)?; + + if !client.is_active { + return Err(OAuthError::ClientInactive); + } + + if !verify_client_secret(&req.client_secret, &client.client_secret_hash)? { + return Err(OAuthError::InvalidClient); + } + + let granted_scopes = if let Some(ref scope_str) = req.scope { + let requested: Vec = scope_str + .split(' ') + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .collect(); + validate_scopes(&requested)?; + + let allowed: Vec = + serde_json::from_value(client.scopes.clone()).unwrap_or_default(); + for s in &requested { + if !allowed.contains(s) { + return Err(OAuthError::InvalidScope); + } + } + requested + } else { + serde_json::from_value(client.scopes.clone()).unwrap_or_default() + }; + + let claims = ClientCredentialsClaims { + sub: client.id, + tid: client.tenant_id, + scopes: granted_scopes.clone(), + allowed_patient_ids: client + .allowed_patient_ids + .as_ref() + .and_then(|v| serde_json::from_value(v.clone()).ok()), + rate_limit_per_minute: client.rate_limit_per_minute, + exp: Utc::now().timestamp() + client.token_lifetime_seconds as i64, + iat: Utc::now().timestamp(), + token_type: "client_credentials".to_string(), + }; + + let header = jsonwebtoken::Header::default(); + let token = jsonwebtoken::encode( + &header, + &claims, + &jsonwebtoken::EncodingKey::from_secret(jwt_secret.as_bytes()), + )?; + + Ok(TokenResponse { + access_token: token, + token_type: "Bearer".to_string(), + expires_in: client.token_lifetime_seconds as i64, + scope: granted_scopes.join(" "), + }) + } + + /// 创建新的 API 客户端 + pub async fn create_client( + db: &DatabaseConnection, + tenant_id: Uuid, + req: &CreateApiClientReq, + created_by: Uuid, + ) -> OAuthResult { + let scopes = validate_scopes(&req.scopes)?; + + let client_id = generate_client_id(); + let (secret_plain, secret_hash) = generate_client_secret()?; + + let allowed_patient_ids_json = req + .allowed_patient_ids + .as_ref() + .map(|ids| serde_json::to_value(ids).unwrap_or(serde_json::Value::Null)); + + let now = Utc::now(); + let active_model = api_client::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + client_id: Set(client_id.clone()), + client_secret_hash: Set(secret_hash), + client_name: Set(req.client_name.clone()), + scopes: Set(serde_json::to_value(&scopes).unwrap_or(serde_json::Value::Array(vec![]))), + allowed_patient_ids: Set(allowed_patient_ids_json), + rate_limit_per_minute: Set(req.rate_limit_per_minute), + is_active: Set(true), + token_lifetime_seconds: Set(req.token_lifetime_seconds), + created_at: Set(now.into()), + updated_at: Set(now.into()), + created_by: Set(Some(created_by)), + updated_by: Set(None), + deleted_at: Set(None), + version: Set(1), + }; + + let model = active_model.insert(db).await?; + + Ok(ApiClientResp { + id: model.id.to_string(), + tenant_id: model.tenant_id.to_string(), + client_id, + client_secret: secret_plain, + client_name: model.client_name, + scopes, + allowed_patient_ids: req.allowed_patient_ids.clone(), + rate_limit_per_minute: model.rate_limit_per_minute, + is_active: model.is_active, + token_lifetime_seconds: model.token_lifetime_seconds, + created_at: model.created_at.to_rfc3339(), + }) + } + + /// 列出租户下的 API 客户端 + pub async fn list_clients( + db: &DatabaseConnection, + tenant_id: Uuid, + ) -> OAuthResult> { + let clients = api_client::Entity::find() + .filter(api_client::Column::TenantId.eq(tenant_id)) + .filter(api_client::Column::DeletedAt.is_null()) + .all(db) + .await?; + + Ok(clients + .into_iter() + .map(|c| ApiClientListItem { + id: c.id.to_string(), + client_id: c.client_id, + client_name: c.client_name, + scopes: serde_json::from_value(c.scopes).unwrap_or_default(), + rate_limit_per_minute: c.rate_limit_per_minute, + is_active: c.is_active, + token_lifetime_seconds: c.token_lifetime_seconds, + created_at: c.created_at.to_rfc3339(), + }) + .collect()) + } + + /// 更新 API 客户端 + pub async fn update_client( + db: &DatabaseConnection, + tenant_id: Uuid, + client_id: Uuid, + req: &UpdateApiClientReq, + updated_by: Uuid, + ) -> OAuthResult { + let client = api_client::Entity::find_by_id(client_id) + .one(db) + .await? + .ok_or(OAuthError::ClientNotFound)?; + + if client.tenant_id != tenant_id { + return Err(OAuthError::ClientNotFound); + } + + if client.version != req.version { + return Err(OAuthError::DbError("版本冲突".into())); + } + + let scopes = if let Some(ref s) = req.scopes { + validate_scopes(s)?; + serde_json::to_value(s).unwrap_or(serde_json::Value::Array(vec![])) + } else { + client.scopes.clone() + }; + + let mut active: api_client::ActiveModel = client.into(); + if let Some(ref name) = req.client_name { + active.client_name = Set(name.clone()); + } + if req.scopes.is_some() { + active.scopes = Set(scopes); + } + if req.allowed_patient_ids.is_some() { + let ids_json = req.allowed_patient_ids.as_ref().unwrap().as_ref().map( + |ids| serde_json::to_value(ids).unwrap_or(serde_json::Value::Null), + ); + active.allowed_patient_ids = Set(ids_json); + } + if let Some(rl) = req.rate_limit_per_minute { + active.rate_limit_per_minute = Set(rl); + } + if let Some(active_flag) = req.is_active { + active.is_active = Set(active_flag); + } + if let Some(tl) = req.token_lifetime_seconds { + active.token_lifetime_seconds = Set(tl); + } + active.updated_by = Set(Some(updated_by)); + active.updated_at = Set(Utc::now().into()); + active.version = Set(active.version.unwrap() + 1); + + let model = active.update(db).await?; + + Ok(ApiClientListItem { + id: model.id.to_string(), + client_id: model.client_id, + client_name: model.client_name, + scopes: serde_json::from_value(model.scopes).unwrap_or_default(), + rate_limit_per_minute: model.rate_limit_per_minute, + is_active: model.is_active, + token_lifetime_seconds: model.token_lifetime_seconds, + created_at: model.created_at.to_rfc3339(), + }) + } + + /// 软删除 API 客户端 + pub async fn delete_client( + db: &DatabaseConnection, + tenant_id: Uuid, + client_id: Uuid, + ) -> OAuthResult<()> { + let client = api_client::Entity::find_by_id(client_id) + .one(db) + .await? + .ok_or(OAuthError::ClientNotFound)?; + + if client.tenant_id != tenant_id { + return Err(OAuthError::ClientNotFound); + } + + let mut active: api_client::ActiveModel = client.into(); + active.deleted_at = Set(Some(Utc::now().into())); + active.update(db).await?; + + Ok(()) + } + + /// 重新生成 client_secret + pub async fn regenerate_secret( + db: &DatabaseConnection, + tenant_id: Uuid, + client_id: Uuid, + ) -> OAuthResult<(String, String)> { + let client = api_client::Entity::find_by_id(client_id) + .one(db) + .await? + .ok_or(OAuthError::ClientNotFound)?; + + if client.tenant_id != tenant_id { + return Err(OAuthError::ClientNotFound); + } + + let (plain, hash) = generate_client_secret()?; + + let mut active: api_client::ActiveModel = client.into(); + let id = active.id.clone().unwrap().to_string(); + active.client_secret_hash = Set(hash); + active.updated_at = Set(Utc::now().into()); + active.version = Set(active.version.clone().unwrap() + 1); + active.update(db).await?; + + Ok((id, plain)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn validate_scopes_accepts_valid() { + let scopes = vec!["Patient.read".into(), "Observation.read".into()]; + assert!(validate_scopes(&scopes).is_ok()); + } + + #[test] + fn validate_scopes_rejects_invalid() { + let scopes = vec!["Patient.write".into()]; + assert!(validate_scopes(&scopes).is_err()); + } + + #[test] + fn validate_scopes_accepts_empty() { + let scopes: Vec = vec![]; + let result = validate_scopes(&scopes); + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn generate_client_id_is_32_hex_chars() { + let id = generate_client_id(); + assert_eq!(id.len(), 32); + assert!(id.chars().all(|c| c.is_ascii_hexdigit())); + } + + #[test] + fn generate_client_secret_produces_valid_hash() { + let (plain, hash) = generate_client_secret().unwrap(); + assert_eq!(plain.len(), 64); + assert!(hash.starts_with("$argon2")); + assert!(verify_client_secret(&plain, &hash).unwrap()); + } + + #[test] + fn verify_client_secret_rejects_wrong() { + let (plain, hash) = generate_client_secret().unwrap(); + assert!(!verify_client_secret("wrong_secret", &hash).unwrap()); + } + + #[test] + fn token_request_dto_constructable() { + let req = TokenRequest { + grant_type: "authorization_code".into(), + client_id: "test".into(), + client_secret: "test".into(), + scope: None, + }; + assert_eq!(req.grant_type, "authorization_code"); + } +} diff --git a/crates/erp-health/src/service/alert_engine.rs b/crates/erp-health/src/service/alert_engine.rs index 3daa19e..fd08934 100644 --- a/crates/erp-health/src/service/alert_engine.rs +++ b/crates/erp-health/src/service/alert_engine.rs @@ -73,7 +73,7 @@ pub async fn evaluate_rules( if is_triggered { let alert = create_alert_and_notify( - &state.db, &state.event_bus, tenant_id, patient_id, &rule + state, tenant_id, patient_id, &rule ).await?; triggered_alerts.push(alert); } @@ -168,26 +168,48 @@ fn evaluate_trend_in_memory( } async fn create_alert_and_notify( - db: &DatabaseConnection, - event_bus: &erp_core::events::EventBus, + state: &HealthState, tenant_id: Uuid, patient_id: Uuid, rule: &alert_rules::Model, ) -> HealthResult { + let db = &state.db; + let event_bus = &state.event_bus; + + // 告警降噪:患者级升级 + 系统级聚合 + let (final_severity, is_suppressed) = + crate::service::alert_noise_reducer::apply_noise_reduction( + state, tenant_id, patient_id, &rule.device_type, &rule.severity, + ) + .await; + let alert_id = Uuid::now_v7(); + let detail_json = if final_severity != rule.severity { + json!({ + "rule_name": rule.name, + "condition_type": rule.condition_type, + "condition_params": rule.condition_params, + "device_type": rule.device_type, + "original_severity": rule.severity, + "escalated_to": final_severity, + }) + } else { + json!({ + "rule_name": rule.name, + "condition_type": rule.condition_type, + "condition_params": rule.condition_params, + "device_type": rule.device_type, + }) + }; + let alert = alerts::ActiveModel { id: Set(alert_id), tenant_id: Set(tenant_id), patient_id: Set(patient_id), rule_id: Set(rule.id), - severity: Set(rule.severity.clone()), + severity: Set(final_severity.clone()), title: Set(format!("{}触发", rule.name)), - detail: Set(Some(json!({ - "rule_name": rule.name, - "condition_type": rule.condition_type, - "condition_params": rule.condition_params, - "device_type": rule.device_type, - }))), + detail: Set(Some(detail_json)), status: Set("pending".to_string()), acknowledged_by: Set(None), acknowledged_at: Set(None), @@ -207,9 +229,10 @@ async fn create_alert_and_notify( "alert_id": alert.id, "patient_id": patient_id, "rule_name": rule.name, - "severity": rule.severity, + "severity": final_severity, "detail": alert.detail, "notify_roles": rule.notify_roles, + "suppressed": is_suppressed, })), ); event_bus.publish(event, db).await;