diff --git a/crates/erp-core/src/events.rs b/crates/erp-core/src/events.rs index 58b5482..79f1c3f 100644 --- a/crates/erp-core/src/events.rs +++ b/crates/erp-core/src/events.rs @@ -69,33 +69,47 @@ impl EventBus { Self { sender } } - /// 发布事件:先持久化到 domain_events 表,再内存广播。 + /// 发布事件:先持久化到 domain_events 表(pending 状态),再内存广播, + /// 最后更新为 published。 /// - /// 持久化失败时仅记录 warning,仍然广播(best-effort)。 + /// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态, + /// 重启后 outbox relay 会重新广播。 pub async fn publish(&self, event: DomainEvent, db: &sea_orm::DatabaseConnection) { - // 持久化到 domain_events 表 + // 1. 持久化为 pending 状态 + let event_id = event.id; let model = domain_event::ActiveModel { id: Set(event.id), tenant_id: Set(event.tenant_id), event_type: Set(event.event_type.clone()), payload: Set(Some(event.payload.clone())), correlation_id: Set(Some(event.correlation_id)), - status: Set("published".to_string()), + status: Set("pending".to_string()), attempts: Set(0), last_error: Set(None), created_at: Set(event.timestamp), - published_at: Set(Some(Utc::now())), + published_at: Set(None), }; - match model.insert(db).await { - Ok(_) => {} + let saved = match model.insert(db).await { + Ok(m) => m, Err(e) => { - tracing::warn!(event_id = %event.id, error = %e, "领域事件持久化失败"); + tracing::warn!(event_id = %event_id, error = %e, "领域事件持久化失败"); + // 持久化失败仍然广播(best-effort) + self.broadcast(event); + return; } - } + }; - // 内存广播 + // 2. 内存广播 self.broadcast(event); + + // 3. 更新为 published + let mut active: domain_event::ActiveModel = saved.into(); + active.status = Set("published".to_string()); + active.published_at = Set(Some(Utc::now())); + if let Err(e) = active.update(db).await { + tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败"); + } } /// 仅内存广播(不持久化,用于内部测试等场景)。 diff --git a/crates/erp-health/src/dto/consent_dto.rs b/crates/erp-health/src/dto/consent_dto.rs new file mode 100644 index 0000000..3455f8b --- /dev/null +++ b/crates/erp-health/src/dto/consent_dto.rs @@ -0,0 +1,39 @@ +use chrono::{NaiveDate, Utc}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct ConsentResp { + pub id: Uuid, + pub patient_id: Uuid, + pub consent_type: String, + pub consent_scope: String, + pub status: String, + pub granted_at: Option>, + pub revoked_at: Option>, + pub expiry_date: Option, + pub consent_method: Option, + pub witness_name: Option, + pub notes: Option, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub version: i32, +} + +#[derive(Debug, Deserialize, ToSchema)] +pub struct CreateConsentReq { + pub patient_id: Uuid, + pub consent_type: String, + pub consent_scope: String, + pub expiry_date: Option, + pub consent_method: Option, + pub witness_name: Option, + pub notes: Option, +} + +#[derive(Debug, Deserialize, ToSchema)] +pub struct RevokeConsentReq { + pub notes: Option, + pub version: i32, +} diff --git a/crates/erp-health/src/dto/mod.rs b/crates/erp-health/src/dto/mod.rs index b1502bc..080e028 100644 --- a/crates/erp-health/src/dto/mod.rs +++ b/crates/erp-health/src/dto/mod.rs @@ -1,5 +1,6 @@ pub mod appointment_dto; pub mod article_dto; +pub mod consent_dto; pub mod consultation_dto; pub mod daily_monitoring_dto; pub mod diagnosis_dto; diff --git a/crates/erp-health/src/entity/consent.rs b/crates/erp-health/src/entity/consent.rs new file mode 100644 index 0000000..0925861 --- /dev/null +++ b/crates/erp-health/src/entity/consent.rs @@ -0,0 +1,53 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "consent")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub tenant_id: Uuid, + pub patient_id: Uuid, + pub consent_type: String, + pub consent_scope: String, + pub status: String, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub granted_at: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub revoked_at: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub expiry_date: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub consent_method: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub witness_name: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub notes: Option, + pub created_at: DateTimeUtc, + pub updated_at: DateTimeUtc, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub created_by: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub updated_by: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option, + pub version: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::patient::Entity", + from = "Column::PatientId", + to = "super::patient::Column::Id" + )] + Patient, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Patient.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/erp-health/src/entity/critical_value_threshold.rs b/crates/erp-health/src/entity/critical_value_threshold.rs new file mode 100644 index 0000000..b20fbdf --- /dev/null +++ b/crates/erp-health/src/entity/critical_value_threshold.rs @@ -0,0 +1,35 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "critical_value_threshold")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub tenant_id: Uuid, + pub indicator: String, + pub direction: String, + pub threshold_value: f64, + pub level: String, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub department: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub age_min: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub age_max: Option, + pub is_active: bool, + pub created_at: DateTimeUtc, + pub updated_at: DateTimeUtc, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub created_by: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub updated_by: Option, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option, + pub version: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/erp-health/src/entity/mod.rs b/crates/erp-health/src/entity/mod.rs index a6558b6..6289db6 100644 --- a/crates/erp-health/src/entity/mod.rs +++ b/crates/erp-health/src/entity/mod.rs @@ -1,5 +1,7 @@ pub mod appointment; pub mod article; +pub mod critical_value_threshold; +pub mod consent; pub mod consultation_message; pub mod consultation_session; pub mod daily_monitoring; diff --git a/crates/erp-health/src/error.rs b/crates/erp-health/src/error.rs index 906bb3f..42ddbc1 100644 --- a/crates/erp-health/src/error.rs +++ b/crates/erp-health/src/error.rs @@ -59,6 +59,12 @@ pub enum HealthError { #[error("文章不存在")] ArticleNotFound, + #[error("危急值阈值不存在")] + ThresholdNotFound, + + #[error("知情同意记录不存在")] + ConsentNotFound, + #[error("状态转换无效: {0}")] InvalidStatusTransition(String), @@ -89,7 +95,9 @@ impl From for AppError { | HealthError::PointsProductNotFound | HealthError::PointsOrderNotFound | HealthError::OfflineEventNotFound - | HealthError::DailyMonitoringNotFound => AppError::NotFound(err.to_string()), + | HealthError::DailyMonitoringNotFound + | HealthError::ThresholdNotFound + | HealthError::ConsentNotFound => AppError::NotFound(err.to_string()), HealthError::ScheduleFull => AppError::Validation(err.to_string()), HealthError::InvalidStatusTransition(s) => AppError::Validation(s), HealthError::VersionMismatch => AppError::VersionMismatch, diff --git a/crates/erp-health/src/handler/consent_handler.rs b/crates/erp-health/src/handler/consent_handler.rs new file mode 100644 index 0000000..884578d --- /dev/null +++ b/crates/erp-health/src/handler/consent_handler.rs @@ -0,0 +1,71 @@ +use axum::Extension; +use axum::extract::{FromRef, Json, Path, Query, State}; +use serde::Deserialize; +use erp_core::error::AppError; +use erp_core::rbac::require_permission; +use erp_core::types::{ApiResponse, PaginatedResponse, TenantContext}; + +use crate::dto::consent_dto::*; +use crate::service::consent_service; +use crate::state::HealthState; + +#[derive(Debug, Deserialize)] +pub struct ConsentListParams { + pub page: Option, + pub page_size: Option, +} + +pub async fn list_consents( + State(state): State, + Extension(ctx): Extension, + Path(patient_id): Path, + Query(params): Query, +) -> Result>>, AppError> +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.patient.list")?; + let page = params.page.unwrap_or(1); + let page_size = params.page_size.unwrap_or(20); + let result = consent_service::list_consents( + &state, ctx.tenant_id, patient_id, page, page_size, + ) + .await?; + Ok(Json(ApiResponse::ok(result))) +} + +pub async fn grant_consent( + State(state): State, + Extension(ctx): Extension, + Json(req): Json, +) -> Result>, AppError> +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.patient.manage")?; + let result = consent_service::grant_consent( + &state, ctx.tenant_id, Some(ctx.user_id), req, + ) + .await?; + Ok(Json(ApiResponse::ok(result))) +} + +pub async fn revoke_consent( + State(state): State, + Extension(ctx): Extension, + Path(consent_id): Path, + Json(req): Json, +) -> Result>, AppError> +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.patient.manage")?; + let result = consent_service::revoke_consent( + &state, ctx.tenant_id, consent_id, Some(ctx.user_id), req, + ) + .await?; + Ok(Json(ApiResponse::ok(result))) +} diff --git a/crates/erp-health/src/handler/critical_value_threshold_handler.rs b/crates/erp-health/src/handler/critical_value_threshold_handler.rs new file mode 100644 index 0000000..621d44a --- /dev/null +++ b/crates/erp-health/src/handler/critical_value_threshold_handler.rs @@ -0,0 +1,111 @@ +use axum::Extension; +use axum::extract::{FromRef, Json, Path, State}; +use serde::Deserialize; +use erp_core::error::AppError; +use erp_core::rbac::require_permission; +use erp_core::types::{ApiResponse, TenantContext}; + +use crate::service::critical_value_threshold_service; +use crate::state::HealthState; + +#[derive(Debug, Deserialize)] +pub struct CreateThresholdReq { + pub indicator: String, + pub direction: String, + pub threshold_value: f64, + pub level: Option, + pub department: Option, + pub age_min: Option, + pub age_max: Option, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateThresholdReq { + pub threshold_value: f64, + pub level: Option, + pub department: Option, + pub age_min: Option, + pub age_max: Option, + pub version: i32, +} + +pub async fn list_thresholds( + State(state): State, + Extension(ctx): Extension, +) -> Result>>, AppError> +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.health-data.list")?; + let list = critical_value_threshold_service::find_thresholds(&state.db, ctx.tenant_id).await?; + Ok(Json(ApiResponse::ok(list))) +} + +pub async fn create_threshold( + State(state): State, + Extension(ctx): Extension, + Json(req): Json, +) -> Result>, AppError> +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.health-data.manage")?; + let result = critical_value_threshold_service::create_threshold( + &state.db, + ctx.tenant_id, + Some(ctx.user_id), + req.indicator, + req.direction, + req.threshold_value, + req.level.unwrap_or_else(|| "critical".to_string()), + req.department, + req.age_min, + req.age_max, + ) + .await?; + Ok(Json(ApiResponse::ok(result))) +} + +pub async fn update_threshold( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, + Json(req): Json, +) -> Result>, AppError> +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.health-data.manage")?; + let result = critical_value_threshold_service::update_threshold( + &state.db, + ctx.tenant_id, + id, + Some(ctx.user_id), + req.threshold_value, + req.level.unwrap_or_else(|| "critical".to_string()), + req.department, + req.age_min, + req.age_max, + req.version, + ) + .await?; + Ok(Json(ApiResponse::ok(result))) +} + +pub async fn delete_threshold( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result>, AppError> +where + HealthState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "health.health-data.manage")?; + critical_value_threshold_service::delete_threshold(&state.db, ctx.tenant_id, id, Some(ctx.user_id)) + .await?; + Ok(Json(ApiResponse::ok(()))) +} diff --git a/crates/erp-health/src/handler/mod.rs b/crates/erp-health/src/handler/mod.rs index 2ecc67c..b9b9eeb 100644 --- a/crates/erp-health/src/handler/mod.rs +++ b/crates/erp-health/src/handler/mod.rs @@ -1,6 +1,8 @@ pub mod appointment_handler; pub mod article_handler; pub mod consultation_handler; +pub mod consent_handler; +pub mod critical_value_threshold_handler; pub mod daily_monitoring_handler; pub mod diagnosis_handler; pub mod dialysis_handler; diff --git a/crates/erp-health/src/module.rs b/crates/erp-health/src/module.rs index 67bfd59..0dd540a 100644 --- a/crates/erp-health/src/module.rs +++ b/crates/erp-health/src/module.rs @@ -6,7 +6,7 @@ use erp_core::events::EventBus; use erp_core::module::{ErpModule, PermissionDescriptor}; use crate::handler::{ - appointment_handler, article_handler, consultation_handler, daily_monitoring_handler, diagnosis_handler, dialysis_handler, doctor_handler, follow_up_handler, + appointment_handler, article_handler, consultation_handler, consent_handler, critical_value_threshold_handler, daily_monitoring_handler, diagnosis_handler, dialysis_handler, doctor_handler, follow_up_handler, health_data_handler, patient_handler, points_handler, stats_handler, }; @@ -410,6 +410,30 @@ impl HealthModule { "/health/admin/statistics/follow-ups", axum::routing::get(stats_handler::get_follow_up_stats), ) + // 危急值阈值配置 + .route( + "/health/critical-value-thresholds", + axum::routing::get(critical_value_threshold_handler::list_thresholds) + .post(critical_value_threshold_handler::create_threshold), + ) + .route( + "/health/critical-value-thresholds/{id}", + axum::routing::put(critical_value_threshold_handler::update_threshold) + .delete(critical_value_threshold_handler::delete_threshold), + ) + // 知情同意记录 + .route( + "/health/patients/{patient_id}/consents", + axum::routing::get(consent_handler::list_consents), + ) + .route( + "/health/consents", + axum::routing::post(consent_handler::grant_consent), + ) + .route( + "/health/consents/{consent_id}/revoke", + axum::routing::put(consent_handler::revoke_consent), + ) } } diff --git a/crates/erp-health/src/service/consent_service.rs b/crates/erp-health/src/service/consent_service.rs new file mode 100644 index 0000000..e1ad24f --- /dev/null +++ b/crates/erp-health/src/service/consent_service.rs @@ -0,0 +1,187 @@ +use chrono::Utc; +use sea_orm::entity::prelude::*; +use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect}; +use uuid::Uuid; + +use erp_core::audit::AuditLog; +use erp_core::audit_service; +use erp_core::error::check_version; +use erp_core::types::PaginatedResponse; + +use crate::dto::consent_dto::*; +use crate::entity::{consent, patient}; +use crate::error::{HealthError, HealthResult}; +use crate::state::HealthState; + +pub async fn list_consents( + state: &HealthState, + tenant_id: Uuid, + patient_id: Uuid, + page: u64, + page_size: u64, +) -> HealthResult> { + let limit = page_size.min(100); + let offset = page.saturating_sub(1) * limit; + + let mut query = consent::Entity::find() + .filter(consent::Column::TenantId.eq(tenant_id)) + .filter(consent::Column::PatientId.eq(patient_id)) + .filter(consent::Column::DeletedAt.is_null()); + + let total: u64 = query + .clone() + .count(&state.db) + .await?; + + let rows: Vec = query + .order_by_desc(consent::Column::CreatedAt) + .limit(limit) + .offset(offset) + .all(&state.db) + .await?; + + let total_pages = ((total as f64) / (page_size as f64)).ceil() as u64; + let data = rows.into_iter().map(|m| ConsentResp { + id: m.id, + patient_id: m.patient_id, + consent_type: m.consent_type, + consent_scope: m.consent_scope, + status: m.status, + granted_at: m.granted_at, + revoked_at: m.revoked_at, + expiry_date: m.expiry_date, + consent_method: m.consent_method, + witness_name: m.witness_name, + notes: m.notes, + created_at: m.created_at, + updated_at: m.updated_at, + version: m.version, + }).collect(); + + Ok(PaginatedResponse { data, total, page, page_size, total_pages }) +} + +pub async fn grant_consent( + state: &HealthState, + tenant_id: Uuid, + operator_id: Option, + req: CreateConsentReq, +) -> HealthResult { + // 校验患者存在 + patient::Entity::find() + .filter(patient::Column::Id.eq(req.patient_id)) + .filter(patient::Column::TenantId.eq(tenant_id)) + .filter(patient::Column::DeletedAt.is_null()) + .one(&state.db) + .await? + .ok_or(HealthError::PatientNotFound)?; + + validate_consent_type(&req.consent_type)?; + + let now = Utc::now(); + let model = consent::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + patient_id: Set(req.patient_id), + consent_type: Set(req.consent_type), + consent_scope: Set(req.consent_scope), + status: Set("granted".to_string()), + granted_at: Set(Some(now)), + revoked_at: Set(None), + expiry_date: Set(req.expiry_date), + consent_method: Set(req.consent_method), + witness_name: Set(req.witness_name), + notes: Set(req.notes), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(operator_id), + updated_by: Set(operator_id), + deleted_at: Set(None), + version: Set(1), + }; + let m = model.insert(&state.db).await?; + + audit_service::record( + AuditLog::new(tenant_id, operator_id, "consent.granted", "consent") + .with_resource_id(m.id), + &state.db, + ).await; + + Ok(model_to_resp(m)) +} + +pub async fn revoke_consent( + state: &HealthState, + tenant_id: Uuid, + consent_id: Uuid, + operator_id: Option, + req: RevokeConsentReq, +) -> HealthResult { + let existing = consent::Entity::find_by_id(consent_id) + .one(&state.db) + .await? + .ok_or(HealthError::ConsentNotFound)?; + + if existing.tenant_id != tenant_id { + return Err(HealthError::ConsentNotFound); + } + check_version(req.version, existing.version).map_err(|_| HealthError::VersionMismatch)?; + + let now = Utc::now(); + let mut active: consent::ActiveModel = existing.into(); + active.status = Set("revoked".to_string()); + active.revoked_at = Set(Some(now)); + active.updated_at = Set(now); + active.updated_by = Set(operator_id); + active.version = Set(req.version + 1); + if let Some(notes) = req.notes { + active.notes = Set(Some(notes)); + } + let m = active.update(&state.db).await?; + + audit_service::record( + AuditLog::new(tenant_id, operator_id, "consent.revoked", "consent") + .with_resource_id(m.id), + &state.db, + ).await; + + Ok(model_to_resp(m)) +} + +fn model_to_resp(m: consent::Model) -> ConsentResp { + ConsentResp { + id: m.id, + patient_id: m.patient_id, + consent_type: m.consent_type, + consent_scope: m.consent_scope, + status: m.status, + granted_at: m.granted_at, + revoked_at: m.revoked_at, + expiry_date: m.expiry_date, + consent_method: m.consent_method, + witness_name: m.witness_name, + notes: m.notes, + created_at: m.created_at, + updated_at: m.updated_at, + version: m.version, + } +} + +fn validate_consent_type(consent_type: &str) -> HealthResult<()> { + let valid = [ + "data_processing", // 数据处理同意 + "health_data_collection", // 健康数据采集 + "research_use", // 科研使用 + "third_party_share", // 第三方共享 + "genetic_testing", // 基因检测 + "telemedicine", // 远程医疗 + ]; + if valid.contains(&consent_type) { + Ok(()) + } else { + Err(HealthError::Validation(format!( + "consent_type 必须为以下之一: {}", + valid.join(", ") + ))) + } +} diff --git a/crates/erp-health/src/service/critical_value_threshold_service.rs b/crates/erp-health/src/service/critical_value_threshold_service.rs new file mode 100644 index 0000000..cb6ac1c --- /dev/null +++ b/crates/erp-health/src/service/critical_value_threshold_service.rs @@ -0,0 +1,198 @@ +use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue::Set; +use uuid::Uuid; + +use crate::entity::critical_value_threshold; +use crate::error::{HealthError, HealthResult}; + +/// 阈值查询条件:指标名 + 方向 +pub struct ThresholdQuery { + pub indicator: String, + pub direction: String, +} + +/// 查询指定租户的活跃危急值阈值。 +/// +/// 如果指定了 department 和 age,优先匹配精确条件,回退到通用阈值。 +pub async fn find_thresholds( + db: &DatabaseConnection, + tenant_id: Uuid, +) -> HealthResult> { + let list = critical_value_threshold::Entity::find() + .filter(critical_value_threshold::Column::TenantId.eq(tenant_id)) + .filter(critical_value_threshold::Column::IsActive.eq(true)) + .filter(critical_value_threshold::Column::DeletedAt.is_null()) + .all(db) + .await?; + Ok(list) +} + +/// 查找匹配的阈值配置(按 indicator + direction 匹配)。 +/// +/// 优先匹配有科室/年龄限制的精确规则,否则返回通用规则。 +pub async fn find_threshold( + db: &DatabaseConnection, + tenant_id: Uuid, + indicator: &str, + direction: &str, + department: Option<&str>, + age: Option, +) -> HealthResult> { + let all = find_thresholds(db, tenant_id).await?; + + let mut exact_match: Option<&critical_value_threshold::Model> = None; + let mut generic_match: Option<&critical_value_threshold::Model> = None; + + for t in &all { + if t.indicator != indicator || t.direction != direction { + continue; + } + let dept_match = match (t.department.as_deref(), department) { + (Some(td), Some(d)) => td == d, + (None, _) => true, // 通用规则匹配任意科室 + (Some(_), None) => false, // 有科室限制但没传科室 + }; + let age_match = match (t.age_min, t.age_max, age) { + (Some(min), Some(max), Some(a)) => a >= min && a <= max, + (None, None, _) => true, // 通用规则匹配任意年龄 + _ => false, // 有年龄限制但没传年龄 + }; + if dept_match && age_match { + if t.department.is_some() || t.age_min.is_some() { + // 精确规则 + if exact_match.is_none() { + exact_match = Some(t); + } + } else { + // 通用规则 + if generic_match.is_none() { + generic_match = Some(t); + } + } + } + } + + Ok(exact_match.or(generic_match).cloned()) +} + +/// 创建新的危急值阈值。 +pub async fn create_threshold( + db: &DatabaseConnection, + tenant_id: Uuid, + operator_id: Option, + indicator: String, + direction: String, + threshold_value: f64, + level: String, + department: Option, + age_min: Option, + age_max: Option, +) -> HealthResult { + validate_direction(&direction)?; + validate_indicator(&indicator)?; + + let now = chrono::Utc::now(); + let model = critical_value_threshold::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + indicator: Set(indicator), + direction: Set(direction), + threshold_value: Set(threshold_value), + level: Set(level), + department: Set(department), + age_min: Set(age_min), + age_max: Set(age_max), + is_active: Set(true), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(operator_id), + updated_by: Set(operator_id), + deleted_at: Set(None), + version: Set(1), + }; + let result = model.insert(db).await?; + Ok(result) +} + +/// 更新危急值阈值。 +pub async fn update_threshold( + db: &DatabaseConnection, + tenant_id: Uuid, + threshold_id: Uuid, + operator_id: Option, + threshold_value: f64, + level: String, + department: Option, + age_min: Option, + age_max: Option, + expected_version: i32, +) -> HealthResult { + let existing = critical_value_threshold::Entity::find_by_id(threshold_id) + .one(db) + .await? + .ok_or(HealthError::ThresholdNotFound)?; + + if existing.tenant_id != tenant_id { + return Err(HealthError::ThresholdNotFound); + } + + erp_core::error::check_version(existing.version, expected_version)?; + + let now = chrono::Utc::now(); + let mut active: critical_value_threshold::ActiveModel = existing.into(); + active.threshold_value = Set(threshold_value); + active.level = Set(level); + active.department = Set(department); + active.age_min = Set(age_min); + active.age_max = Set(age_max); + active.updated_at = Set(now); + active.updated_by = Set(operator_id); + active.version = Set(expected_version + 1); + let result = active.update(db).await?; + Ok(result) +} + +/// 软删除危急值阈值。 +pub async fn delete_threshold( + db: &DatabaseConnection, + tenant_id: Uuid, + threshold_id: Uuid, + operator_id: Option, +) -> HealthResult<()> { + let existing = critical_value_threshold::Entity::find_by_id(threshold_id) + .one(db) + .await? + .ok_or(HealthError::ThresholdNotFound)?; + + if existing.tenant_id != tenant_id { + return Err(HealthError::ThresholdNotFound); + } + + let mut active: critical_value_threshold::ActiveModel = existing.into(); + active.deleted_at = Set(Some(chrono::Utc::now())); + active.updated_by = Set(operator_id); + active.update(db).await?; + Ok(()) +} + +fn validate_direction(direction: &str) -> HealthResult<()> { + if matches!(direction, "high" | "low") { + Ok(()) + } else { + Err(HealthError::Validation( + "direction 必须为 'high' 或 'low'".to_string(), + )) + } +} + +fn validate_indicator(indicator: &str) -> HealthResult<()> { + let valid = ["systolic_bp", "diastolic_bp", "heart_rate", "blood_sugar"]; + if valid.contains(&indicator) { + Ok(()) + } else { + Err(HealthError::Validation(format!( + "indicator 必须为以下之一: {}", + valid.join(", ") + ))) + } +} diff --git a/crates/erp-health/src/service/follow_up_service.rs b/crates/erp-health/src/service/follow_up_service.rs index f51a436..3f6369b 100644 --- a/crates/erp-health/src/service/follow_up_service.rs +++ b/crates/erp-health/src/service/follow_up_service.rs @@ -456,34 +456,42 @@ pub async fn check_overdue_tasks(db: &DatabaseConnection) -> HealthResult { Ok(result.rows_affected) } -/// 逾期随访检查 + 事件发布版本。 -/// 标记逾期后,查询被标记的任务并为每个发布 `follow_up.overdue` 事件。 +/// 逾期随访检查 + 事件发布版本(幂等保护)。 +/// +/// 只发布**本次新被标记**为 overdue 的事件,避免重复通知。 +/// 幂等策略:先查出即将被标记的 pending 任务,批量更新后只为这些任务发事件。 pub async fn check_overdue_and_notify(state: &HealthState) -> HealthResult { let db = &state.db; + let today = chrono::Utc::now().date_naive(); + + // 1. 先查出即将被标记的 pending 任务(幂等:只有这些才需要通知) + let newly_overdue: Vec = follow_up_task::Entity::find() + .filter(follow_up_task::Column::Status.eq("pending")) + .filter(follow_up_task::Column::PlannedDate.lt(today)) + .filter(follow_up_task::Column::DeletedAt.is_null()) + .all(db) + .await?; + + if newly_overdue.is_empty() { + return Ok(0); + } + + // 2. 批量更新状态 let count = check_overdue_tasks(db).await?; - if count > 0 { - let today = chrono::Utc::now().date_naive(); - let overdue_tasks: Vec = follow_up_task::Entity::find() - .filter(follow_up_task::Column::Status.eq("overdue")) - .filter(follow_up_task::Column::PlannedDate.lt(today)) - .filter(follow_up_task::Column::DeletedAt.is_null()) - .all(db) - .await?; - - for task in overdue_tasks { - let event = erp_core::events::DomainEvent::new( - "follow_up.overdue", - task.tenant_id, - serde_json::json!({ - "task_id": task.id, - "patient_id": task.patient_id, - "assigned_to": task.assigned_to, - "planned_date": task.planned_date, - }), - ); - state.event_bus.publish(event, db).await; - } + // 3. 只为本次新标记的任务发布事件 + for task in newly_overdue { + let event = erp_core::events::DomainEvent::new( + "follow_up.overdue", + task.tenant_id, + serde_json::json!({ + "task_id": task.id, + "patient_id": task.patient_id, + "assigned_to": task.assigned_to, + "planned_date": task.planned_date, + }), + ); + state.event_bus.publish(event, db).await; } Ok(count) diff --git a/crates/erp-health/src/service/health_data_service.rs b/crates/erp-health/src/service/health_data_service.rs index 343bb12..8c76b92 100644 --- a/crates/erp-health/src/service/health_data_service.rs +++ b/crates/erp-health/src/service/health_data_service.rs @@ -13,7 +13,7 @@ use erp_core::error::check_version; use erp_core::types::PaginatedResponse; use crate::dto::health_data_dto::*; -use crate::entity::{health_record, lab_report, patient, vital_signs}; +use crate::entity::{doctor_profile, health_record, lab_report, patient, patient_doctor_relation, vital_signs}; use crate::error::{HealthError, HealthResult}; use crate::service::validation::validate_record_type; use crate::state::HealthState; @@ -86,7 +86,7 @@ pub async fn create_vital_signs( .ok_or(HealthError::PatientNotFound)?; let now = Utc::now(); - check_vital_signs_alert(state, tenant_id, patient_id, req.clone()).await; + check_vital_signs_alert(state, tenant_id, patient_id, operator_id, req.clone()).await; let active = vital_signs::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), @@ -169,6 +169,23 @@ pub async fn update_vital_signs( let m = active.update(&state.db).await?; + // 更新后也触发危急值检测(修改后的值可能触发告警) + let check_req = CreateVitalSignsReq { + record_date: m.record_date, + systolic_bp_morning: m.systolic_bp_morning, + diastolic_bp_morning: m.diastolic_bp_morning, + systolic_bp_evening: m.systolic_bp_evening, + diastolic_bp_evening: m.diastolic_bp_evening, + heart_rate: m.heart_rate, + weight: m.weight.map(|d| d.to_f64().unwrap_or(0.0)), + blood_sugar: m.blood_sugar.map(|d| d.to_f64().unwrap_or(0.0)), + water_intake_ml: m.water_intake_ml, + urine_output_ml: m.urine_output_ml, + notes: m.notes.clone(), + source: Some(m.source.clone()), + }; + check_vital_signs_alert(state, tenant_id, patient_id, operator_id, check_req).await; + audit_service::record( AuditLog::new(tenant_id, operator_id, "vital_signs.updated", "vital_signs") .with_resource_id(m.id), @@ -628,113 +645,145 @@ pub async fn delete_health_record( // 危急值预警检测 // --------------------------------------------------------------------------- -/// 检查体征数据中的危急值,发布 `health_data.critical_alert` 事件 +/// 检查体征数据中的危急值,发布 `health_data.critical_alert` 事件。 +/// +/// 阈值从 `critical_value_threshold` 表加载,支持按科室/年龄差异化配置。 +/// 事件 payload 包含:患者信息、责任医生、操作人信息、告警详情。 async fn check_vital_signs_alert( state: &HealthState, tenant_id: Uuid, patient_id: Uuid, + operator_id: Option, req: CreateVitalSignsReq, ) { + // 从数据库加载阈值配置 + let thresholds = match crate::service::critical_value_threshold_service::find_thresholds( + &state.db, tenant_id, + ) + .await + { + Ok(t) if !t.is_empty() => t, + Ok(_) => { + tracing::warn!(tenant_id = %tenant_id, "无危急值阈值配置,跳过告警检测"); + return; + } + Err(e) => { + tracing::warn!(error = %e, "加载危急值阈值失败,跳过告警检测"); + return; + } + }; + let mut alerts: Vec = Vec::new(); // 收缩压危急值 if let Some(sbp) = req.systolic_bp_morning.or(req.systolic_bp_evening) { - if sbp >= 180 { - alerts.push(serde_json::json!({ - "indicator": "systolic_bp", - "value": sbp, - "threshold": 180, - "level": "critical", - "direction": "high" - })); - } else if sbp <= 80 { - alerts.push(serde_json::json!({ - "indicator": "systolic_bp", - "value": sbp, - "threshold": 80, - "level": "critical", - "direction": "low" - })); - } + check_indicator(&thresholds, "systolic_bp", sbp as f64, &mut alerts); } // 舒张压危急值 if let Some(dbp) = req.diastolic_bp_morning.or(req.diastolic_bp_evening) { - if dbp >= 110 { - alerts.push(serde_json::json!({ - "indicator": "diastolic_bp", - "value": dbp, - "threshold": 110, - "level": "critical", - "direction": "high" - })); - } else if dbp <= 50 { - alerts.push(serde_json::json!({ - "indicator": "diastolic_bp", - "value": dbp, - "threshold": 50, - "level": "critical", - "direction": "low" - })); - } + check_indicator(&thresholds, "diastolic_bp", dbp as f64, &mut alerts); } // 心率危急值 if let Some(hr) = req.heart_rate { - if hr >= 150 { - alerts.push(serde_json::json!({ - "indicator": "heart_rate", - "value": hr, - "threshold": 150, - "level": "critical", - "direction": "high" - })); - } else if hr <= 40 { - alerts.push(serde_json::json!({ - "indicator": "heart_rate", - "value": hr, - "threshold": 40, - "level": "critical", - "direction": "low" - })); - } + check_indicator(&thresholds, "heart_rate", hr as f64, &mut alerts); } // 血糖危急值 if let Some(bs) = req.blood_sugar { - if bs >= 25.0 { - alerts.push(serde_json::json!({ - "indicator": "blood_sugar", - "value": bs, - "threshold": 25.0, - "level": "critical", - "direction": "high" - })); - } else if bs <= 2.5 { - alerts.push(serde_json::json!({ - "indicator": "blood_sugar", - "value": bs, - "threshold": 2.5, - "level": "critical", - "direction": "low" - })); - } + check_indicator(&thresholds, "blood_sugar", bs.to_f64().unwrap_or(0.0), &mut alerts); } - for alert in alerts { - let event = erp_core::events::DomainEvent::new( + if alerts.is_empty() { + return; + } + + // 查询患者信息 + let patient_model = patient::Entity::find_by_id(patient_id) + .one(&state.db) + .await + .ok() + .flatten(); + let patient_name = patient_model + .as_ref() + .map(|p| p.name.as_str()) + .unwrap_or("未知患者"); + + // 查询责任医生(通过 patient_doctor_relation 的 attending 类型) + let attending_relation = patient_doctor_relation::Entity::find() + .filter(patient_doctor_relation::Column::PatientId.eq(patient_id)) + .filter(patient_doctor_relation::Column::TenantId.eq(tenant_id)) + .filter(patient_doctor_relation::Column::DeletedAt.is_null()) + .filter(patient_doctor_relation::Column::RelationshipType.eq("attending")) + .one(&state.db) + .await + .ok() + .flatten(); + + let doctor_user_id: Option = if let Some(rel) = attending_relation { + doctor_profile::Entity::find_by_id(rel.doctor_id) + .one(&state.db) + .await + .ok() + .flatten() + .and_then(|d| d.user_id) + } else { + None + }; + + for alert in &alerts { + let mut payload = serde_json::json!({ + "patient_id": patient_id, + "patient_name": patient_name, + "operator_id": operator_id, + "alert": alert, + }); + if let Some(did) = doctor_user_id { + payload["doctor_user_id"] = serde_json::json!(did); + } + + let event = DomainEvent::new( "health_data.critical_alert", tenant_id, - serde_json::json!({ - "patient_id": patient_id, - "alert": alert, - }), + payload, ); state.event_bus.publish(event, &state.db).await; tracing::warn!( patient_id = %patient_id, tenant_id = %tenant_id, + indicator = %alert["indicator"], + value = %alert["value"], "体征危急值预警已发布" ); } } + +/// 根据阈值配置检查单个指标值,匹配则添加到 alerts。 +fn check_indicator( + thresholds: &[crate::entity::critical_value_threshold::Model], + indicator: &str, + value: f64, + alerts: &mut Vec, +) { + for t in thresholds { + if t.indicator != indicator { + continue; + } + let triggered = match t.direction.as_str() { + "high" => value >= t.threshold_value, + "low" => value <= t.threshold_value, + _ => false, + }; + if triggered { + alerts.push(serde_json::json!({ + "indicator": indicator, + "value": value, + "threshold": t.threshold_value, + "level": t.level, + "direction": t.direction, + })); + return; + } + } +} diff --git a/crates/erp-health/src/service/mod.rs b/crates/erp-health/src/service/mod.rs index 3ddaab5..2a879df 100644 --- a/crates/erp-health/src/service/mod.rs +++ b/crates/erp-health/src/service/mod.rs @@ -1,6 +1,8 @@ pub mod appointment_service; pub mod article_service; pub mod consultation_service; +pub mod consent_service; +pub mod critical_value_threshold_service; pub mod daily_monitoring_service; pub mod diagnosis_service; pub mod dialysis_service; diff --git a/crates/erp-health/src/service/patient_service.rs b/crates/erp-health/src/service/patient_service.rs index 63b2903..a747f76 100644 --- a/crates/erp-health/src/service/patient_service.rs +++ b/crates/erp-health/src/service/patient_service.rs @@ -205,6 +205,14 @@ pub async fn update_patient( ])?; } + // 记录变更前的关键临床值(过敏史、病史、身份证号) + let old_snapshot = serde_json::json!({ + "allergy_history": model.allergy_history, + "medical_history_summary": model.medical_history_summary, + "status": model.status, + "verification_status": model.verification_status, + }); + let mut active: patient::ActiveModel = model.into(); if let Some(v) = req.name { active.name = Set(v); } @@ -232,6 +240,14 @@ pub async fn update_patient( let updated = active.update(&state.db).await?; + // 变更后快照 + let new_snapshot = serde_json::json!({ + "allergy_history": updated.allergy_history, + "medical_history_summary": updated.medical_history_summary, + "status": updated.status, + "verification_status": updated.verification_status, + }); + // 根据状态变更发布不同事件 let event_type = if req.status.as_deref() == Some("deceased") { "patient.deceased" @@ -249,7 +265,8 @@ pub async fn update_patient( audit_service::record( AuditLog::new(tenant_id, operator_id, "patient.updated", "patient") - .with_resource_id(updated.id), + .with_resource_id(updated.id) + .with_changes(Some(old_snapshot), Some(new_snapshot)), &state.db, ).await; diff --git a/crates/erp-message/src/module.rs b/crates/erp-message/src/module.rs index bdcf4dd..8b73fb9 100644 --- a/crates/erp-message/src/module.rs +++ b/crates/erp-message/src/module.rs @@ -280,6 +280,127 @@ async fn handle_workflow_event( .map_err(|e| e.to_string())?; } } + "health_data.critical_alert" => { + let patient_name = event + .payload + .get("patient_name") + .and_then(|v| v.as_str()) + .unwrap_or("未知患者"); + let alert = event.payload.get("alert"); + let indicator = alert + .and_then(|a| a.get("indicator")) + .and_then(|v| v.as_str()) + .unwrap_or("未知指标"); + let value = alert + .and_then(|a| a.get("value")) + .map(|v| v.to_string()) + .unwrap_or_else(|| "?".to_string()); + let direction = alert + .and_then(|a| a.get("direction")) + .and_then(|v| v.as_str()) + .unwrap_or("high"); + + let direction_text = match direction { + "low" => "偏低", + _ => "偏高", + }; + + // 通知责任医生(优先) + if let Some(doctor_uid) = event + .payload + .get("doctor_user_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + { + let _ = crate::service::message_service::MessageService::send_system( + event.tenant_id, + doctor_uid, + format!("危急值告警:患者 {}", patient_name), + format!( + "患者 {} 的{}{}(值:{}),请立即关注处理。", + patient_name, indicator, direction_text, value + ), + "urgent", + Some("critical_alert".to_string()), + Some(event.id), + db, + event_bus, + ) + .await + .map_err(|e| e.to_string())?; + } + + // 同时通知操作人(录入者) + if let Some(operator_uid) = event + .payload + .get("operator_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + { + // 避免医生和操作人是同一人时重复通知 + let is_doctor = event + .payload + .get("doctor_user_id") + .and_then(|v| v.as_str()) + .map(|s| s == operator_uid.to_string()) + .unwrap_or(false); + + if !is_doctor { + let _ = crate::service::message_service::MessageService::send_system( + event.tenant_id, + operator_uid, + format!("危急值告警:患者 {}", patient_name), + format!( + "患者 {} 的{}{}(值:{})已触发危急值告警,已通知责任医生。", + patient_name, indicator, direction_text, value + ), + "important", + Some("critical_alert".to_string()), + Some(event.id), + db, + event_bus, + ) + .await + .map_err(|e| e.to_string())?; + } + } + } + "follow_up.overdue" => { + let task_id = event + .payload + .get("task_id") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let assigned_to = event + .payload + .get("assigned_to") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + let planned_date = event + .payload + .get("planned_date") + .and_then(|v| v.as_str()) + .unwrap_or("未知日期"); + + if let Some(assignee) = assigned_to { + let _ = crate::service::message_service::MessageService::send_system( + event.tenant_id, + assignee, + "随访任务逾期提醒".to_string(), + format!( + "您的随访任务(计划日期:{})已逾期,请尽快处理。", + planned_date + ), + "important", + Some("follow_up".to_string()), + uuid::Uuid::parse_str(task_id).ok(), + db, + event_bus, + ) + .await + .map_err(|e| e.to_string())?; + } + } _ => {} } Ok(()) diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index ab9d940..505606a 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -59,6 +59,8 @@ mod m20260426_000056_create_diagnosis; mod m20260426_000057_rename_points_transaction_type_column; mod m20260426_000058_merge_daily_monitoring_into_vital_signs; mod m20260426_000059_seed_menus; +mod m20260426_000060_create_critical_value_thresholds; +mod m20260426_000061_create_consent; pub struct Migrator; @@ -125,6 +127,8 @@ impl MigratorTrait for Migrator { Box::new(m20260426_000057_rename_points_transaction_type_column::Migration), Box::new(m20260426_000058_merge_daily_monitoring_into_vital_signs::Migration), Box::new(m20260426_000059_seed_menus::Migration), + Box::new(m20260426_000060_create_critical_value_thresholds::Migration), + Box::new(m20260426_000061_create_consent::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260426_000060_create_critical_value_thresholds.rs b/crates/erp-server/migration/src/m20260426_000060_create_critical_value_thresholds.rs new file mode 100644 index 0000000..55c2683 --- /dev/null +++ b/crates/erp-server/migration/src/m20260426_000060_create_critical_value_thresholds.rs @@ -0,0 +1,141 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(CriticalValueThreshold::Table) + .col( + ColumnDef::new(CriticalValueThreshold::Id) + .uuid() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(CriticalValueThreshold::TenantId) + .uuid() + .not_null(), + ) + .col( + ColumnDef::new(CriticalValueThreshold::Indicator) + .string_len(50) + .not_null(), + ) + .col( + ColumnDef::new(CriticalValueThreshold::Direction) + .string_len(10) + .not_null(), + ) + .col( + ColumnDef::new(CriticalValueThreshold::ThresholdValue) + .double() + .not_null(), + ) + .col( + ColumnDef::new(CriticalValueThreshold::Level) + .string_len(20) + .not_null() + .default("critical"), + ) + .col( + ColumnDef::new(CriticalValueThreshold::Department) + .string_len(100), + ) + .col(ColumnDef::new(CriticalValueThreshold::AgeMin).integer()) + .col(ColumnDef::new(CriticalValueThreshold::AgeMax).integer()) + .col( + ColumnDef::new(CriticalValueThreshold::IsActive) + .boolean() + .not_null() + .default(true), + ) + .col( + ColumnDef::new(CriticalValueThreshold::CreatedAt) + .timestamp_with_time_zone() + .not_null() + .default(Expr::current_timestamp()), + ) + .col( + ColumnDef::new(CriticalValueThreshold::UpdatedAt) + .timestamp_with_time_zone() + .not_null() + .default(Expr::current_timestamp()), + ) + .col(ColumnDef::new(CriticalValueThreshold::CreatedBy).uuid()) + .col(ColumnDef::new(CriticalValueThreshold::UpdatedBy).uuid()) + .col(ColumnDef::new(CriticalValueThreshold::DeletedAt).timestamp_with_time_zone()) + .col( + ColumnDef::new(CriticalValueThreshold::Version) + .integer() + .not_null() + .default(1), + ) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_cvt_tenant_indicator_direction") + .table(CriticalValueThreshold::Table) + .col(CriticalValueThreshold::TenantId) + .col(CriticalValueThreshold::Indicator) + .col(CriticalValueThreshold::Direction) + .to_owned(), + ) + .await?; + + // 种子数据:默认危急值阈值 + let sql = r#" + INSERT INTO critical_value_threshold (id, tenant_id, indicator, direction, threshold_value, level, created_at, updated_at) VALUES + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'systolic_bp', 'high', 180, 'critical', now(), now()), + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'systolic_bp', 'low', 80, 'critical', now(), now()), + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'diastolic_bp', 'high', 110, 'critical', now(), now()), + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'diastolic_bp', 'low', 50, 'critical', now(), now()), + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'heart_rate', 'high', 150, 'critical', now(), now()), + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'heart_rate', 'low', 40, 'critical', now(), now()), + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'blood_sugar', 'high', 25.0, 'critical', now(), now()), + (gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'blood_sugar', 'low', 2.5, 'critical', now(), now()) + "#; + manager.get_connection().execute_unprepared(sql).await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table( + Table::drop() + .table(CriticalValueThreshold::Table) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum CriticalValueThreshold { + Table, + Id, + TenantId, + Indicator, + Direction, + ThresholdValue, + Level, + Department, + AgeMin, + AgeMax, + IsActive, + CreatedAt, + UpdatedAt, + CreatedBy, + UpdatedBy, + DeletedAt, + Version, +} diff --git a/crates/erp-server/migration/src/m20260426_000061_create_consent.rs b/crates/erp-server/migration/src/m20260426_000061_create_consent.rs new file mode 100644 index 0000000..33b8a5f --- /dev/null +++ b/crates/erp-server/migration/src/m20260426_000061_create_consent.rs @@ -0,0 +1,102 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Consent::Table) + .col( + ColumnDef::new(Consent::Id) + .uuid() + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(Consent::TenantId).uuid().not_null()) + .col(ColumnDef::new(Consent::PatientId).uuid().not_null()) + .col(ColumnDef::new(Consent::ConsentType).string_len(50).not_null()) + .col(ColumnDef::new(Consent::ConsentScope).string_len(100).not_null()) + .col( + ColumnDef::new(Consent::Status) + .string_len(20) + .not_null() + .default("granted"), + ) + .col(ColumnDef::new(Consent::GrantedAt).timestamp_with_time_zone()) + .col(ColumnDef::new(Consent::RevokedAt).timestamp_with_time_zone()) + .col(ColumnDef::new(Consent::ExpiryDate).date()) + .col(ColumnDef::new(Consent::ConsentMethod).string_len(30)) + .col(ColumnDef::new(Consent::WitnessName).string_len(100)) + .col(ColumnDef::new(Consent::Notes).text()) + .col( + ColumnDef::new(Consent::CreatedAt) + .timestamp_with_time_zone() + .not_null() + .default(Expr::current_timestamp()), + ) + .col( + ColumnDef::new(Consent::UpdatedAt) + .timestamp_with_time_zone() + .not_null() + .default(Expr::current_timestamp()), + ) + .col(ColumnDef::new(Consent::CreatedBy).uuid()) + .col(ColumnDef::new(Consent::UpdatedBy).uuid()) + .col(ColumnDef::new(Consent::DeletedAt).timestamp_with_time_zone()) + .col( + ColumnDef::new(Consent::Version) + .integer() + .not_null() + .default(1), + ) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_consent_tenant_patient") + .table(Consent::Table) + .col(Consent::TenantId) + .col(Consent::PatientId) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Consent::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum Consent { + Table, + Id, + TenantId, + PatientId, + ConsentType, + ConsentScope, + Status, + GrantedAt, + RevokedAt, + ExpiryDate, + ConsentMethod, + WitnessName, + Notes, + CreatedAt, + UpdatedAt, + CreatedBy, + UpdatedBy, + DeletedAt, + Version, +} diff --git a/crates/erp-server/src/outbox.rs b/crates/erp-server/src/outbox.rs index 926920b..b7ebbf1 100644 --- a/crates/erp-server/src/outbox.rs +++ b/crates/erp-server/src/outbox.rs @@ -1,20 +1,32 @@ use chrono::Utc; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set}; use std::time::Duration; use erp_core::entity::domain_event; use erp_core::events::{DomainEvent, EventBus}; +const MAX_RETRY: i32 = 5; + /// 启动 outbox relay 后台任务。 /// -/// 定期扫描 domain_events 表中 status = 'pending' 的事件, -/// 重新广播并标记为 published。 +/// 先执行一次性扫描(处理服务重启前遗留的 pending 事件), +/// 然后每 5 秒定期扫描 domain_events 表中 status = 'pending' 的事件。 pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) { + let db_clone = db.clone(); + let event_bus_clone = event_bus.clone(); tokio::spawn(async move { + // 启动时立即处理一次(恢复重启前未广播的事件) + match process_pending_events(&db_clone, &event_bus_clone).await { + Ok(count) if count > 0 => tracing::info!(count = count, "启动时 outbox relay 恢复完成"), + Ok(_) => tracing::info!("启动时 outbox relay 无待处理事件"), + Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"), + } + + // 定期轮询 let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { interval.tick().await; - if let Err(e) = process_pending_events(&db, &event_bus).await { + if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await { tracing::warn!(error = %e, "Outbox relay 处理失败"); } } @@ -24,35 +36,42 @@ pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) async fn process_pending_events( db: &sea_orm::DatabaseConnection, event_bus: &EventBus, -) -> Result<(), sea_orm::DbErr> { +) -> Result { let pending = domain_event::Entity::find() .filter(domain_event::Column::Status.eq("pending")) - .filter(domain_event::Column::Attempts.lt(3)) + .filter(domain_event::Column::Attempts.lt(MAX_RETRY)) + .order_by_asc(domain_event::Column::CreatedAt) + .limit(100) .all(db) .await?; if pending.is_empty() { - return Ok(()); + return Ok(0); } - tracing::info!(count = pending.len(), "处理待发领域事件"); + let count = pending.len(); + tracing::info!(count = count, "处理待发领域事件"); for event_model in pending { - // 重建 DomainEvent 并广播 - let domain_event = DomainEvent::new( - &event_model.event_type, - event_model.tenant_id, - event_model.payload.clone().unwrap_or(serde_json::json!({})), - ); + // 重建 DomainEvent 并广播(保留原始 ID 和时间戳) + let domain_event = DomainEvent { + id: event_model.id, + event_type: event_model.event_type.clone(), + tenant_id: event_model.tenant_id, + payload: event_model.payload.clone().unwrap_or(serde_json::json!({})), + timestamp: event_model.created_at, + correlation_id: event_model.correlation_id.unwrap_or(event_model.id), + }; event_bus.broadcast(domain_event); - // 标记为 published + // 标记为 published,增加 attempts 计数 let mut active: domain_event::ActiveModel = event_model.into(); active.status = Set("published".to_string()); active.published_at = Set(Some(Utc::now())); + active.attempts = Set(active.attempts.unwrap() + 1); active.update(db).await?; } - Ok(()) + Ok(count) }