feat(health): P0 平台基座回顾 — 7项上线前必修
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

P0-1: 危急值告警消费者 — health_data.critical_alert 事件推送给责任医护
P0-2: 危急值阈值可配置化 — 硬编码改为数据库配置(critical_value_threshold表),支持科室/年龄差异化
P0-3: daily_monitoring合并后告警验证 — update_vital_signs也触发危急值检测
P0-4: 随访逾期通知+幂等保护 — 只通知本次新标记的逾期任务,避免重复
P0-5: 知情同意记录(consent) — 新增实体/迁移/Service/Handler,PIPL合规
P0-6: 审计日志补全 — 患者更新记录前后值(过敏史/病史/状态变更)
P0-7: EventBus持久化增强 — 两阶段提交(pending→published)+启动时outbox relay恢复
This commit is contained in:
iven
2026-04-26 03:37:31 +08:00
parent e3177f262c
commit 4ab189283e
22 changed files with 1338 additions and 130 deletions

View File

@@ -69,33 +69,47 @@ impl EventBus {
Self { sender }
}
/// 发布事件:先持久化到 domain_events 表,再内存广播
/// 发布事件:先持久化到 domain_events 表pending 状态),再内存广播
/// 最后更新为 published。
///
/// 持久化失败时仅记录 warning仍然广播best-effort
/// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态,
/// 重启后 outbox relay 会重新广播。
pub async fn publish(&self, event: DomainEvent, db: &sea_orm::DatabaseConnection) {
// 持久化到 domain_events 表
// 1. 持久化为 pending 状态
let event_id = event.id;
let model = domain_event::ActiveModel {
id: Set(event.id),
tenant_id: Set(event.tenant_id),
event_type: Set(event.event_type.clone()),
payload: Set(Some(event.payload.clone())),
correlation_id: Set(Some(event.correlation_id)),
status: Set("published".to_string()),
status: Set("pending".to_string()),
attempts: Set(0),
last_error: Set(None),
created_at: Set(event.timestamp),
published_at: Set(Some(Utc::now())),
published_at: Set(None),
};
match model.insert(db).await {
Ok(_) => {}
let saved = match model.insert(db).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(event_id = %event.id, error = %e, "领域事件持久化失败");
tracing::warn!(event_id = %event_id, error = %e, "领域事件持久化失败");
// 持久化失败仍然广播best-effort
self.broadcast(event);
return;
}
}
};
// 内存广播
// 2. 内存广播
self.broadcast(event);
// 3. 更新为 published
let mut active: domain_event::ActiveModel = saved.into();
active.status = Set("published".to_string());
active.published_at = Set(Some(Utc::now()));
if let Err(e) = active.update(db).await {
tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败");
}
}
/// 仅内存广播(不持久化,用于内部测试等场景)。

View File

@@ -0,0 +1,39 @@
use chrono::{NaiveDate, Utc};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ConsentResp {
pub id: Uuid,
pub patient_id: Uuid,
pub consent_type: String,
pub consent_scope: String,
pub status: String,
pub granted_at: Option<chrono::DateTime<Utc>>,
pub revoked_at: Option<chrono::DateTime<Utc>>,
pub expiry_date: Option<NaiveDate>,
pub consent_method: Option<String>,
pub witness_name: Option<String>,
pub notes: Option<String>,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
pub version: i32,
}
#[derive(Debug, Deserialize, ToSchema)]
pub struct CreateConsentReq {
pub patient_id: Uuid,
pub consent_type: String,
pub consent_scope: String,
pub expiry_date: Option<NaiveDate>,
pub consent_method: Option<String>,
pub witness_name: Option<String>,
pub notes: Option<String>,
}
#[derive(Debug, Deserialize, ToSchema)]
pub struct RevokeConsentReq {
pub notes: Option<String>,
pub version: i32,
}

View File

@@ -1,5 +1,6 @@
pub mod appointment_dto;
pub mod article_dto;
pub mod consent_dto;
pub mod consultation_dto;
pub mod daily_monitoring_dto;
pub mod diagnosis_dto;

View File

@@ -0,0 +1,53 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "consent")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Uuid,
pub tenant_id: Uuid,
pub patient_id: Uuid,
pub consent_type: String,
pub consent_scope: String,
pub status: String,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub granted_at: Option<DateTimeUtc>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub revoked_at: Option<DateTimeUtc>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub expiry_date: Option<chrono::NaiveDate>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub consent_method: Option<String>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub witness_name: Option<String>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub notes: Option<String>,
pub created_at: DateTimeUtc,
pub updated_at: DateTimeUtc,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub created_by: Option<Uuid>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub updated_by: Option<Uuid>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<DateTimeUtc>,
pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::patient::Entity",
from = "Column::PatientId",
to = "super::patient::Column::Id"
)]
Patient,
}
impl Related<super::patient::Entity> for Entity {
fn to() -> RelationDef {
Relation::Patient.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -0,0 +1,35 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "critical_value_threshold")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Uuid,
pub tenant_id: Uuid,
pub indicator: String,
pub direction: String,
pub threshold_value: f64,
pub level: String,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub department: Option<String>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub age_min: Option<i32>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub age_max: Option<i32>,
pub is_active: bool,
pub created_at: DateTimeUtc,
pub updated_at: DateTimeUtc,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub created_by: Option<Uuid>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub updated_by: Option<Uuid>,
#[sea_orm(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<DateTimeUtc>,
pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -1,5 +1,7 @@
pub mod appointment;
pub mod article;
pub mod critical_value_threshold;
pub mod consent;
pub mod consultation_message;
pub mod consultation_session;
pub mod daily_monitoring;

View File

@@ -59,6 +59,12 @@ pub enum HealthError {
#[error("文章不存在")]
ArticleNotFound,
#[error("危急值阈值不存在")]
ThresholdNotFound,
#[error("知情同意记录不存在")]
ConsentNotFound,
#[error("状态转换无效: {0}")]
InvalidStatusTransition(String),
@@ -89,7 +95,9 @@ impl From<HealthError> for AppError {
| HealthError::PointsProductNotFound
| HealthError::PointsOrderNotFound
| HealthError::OfflineEventNotFound
| HealthError::DailyMonitoringNotFound => AppError::NotFound(err.to_string()),
| HealthError::DailyMonitoringNotFound
| HealthError::ThresholdNotFound
| HealthError::ConsentNotFound => AppError::NotFound(err.to_string()),
HealthError::ScheduleFull => AppError::Validation(err.to_string()),
HealthError::InvalidStatusTransition(s) => AppError::Validation(s),
HealthError::VersionMismatch => AppError::VersionMismatch,

View File

@@ -0,0 +1,71 @@
use axum::Extension;
use axum::extract::{FromRef, Json, Path, Query, State};
use serde::Deserialize;
use erp_core::error::AppError;
use erp_core::rbac::require_permission;
use erp_core::types::{ApiResponse, PaginatedResponse, TenantContext};
use crate::dto::consent_dto::*;
use crate::service::consent_service;
use crate::state::HealthState;
#[derive(Debug, Deserialize)]
pub struct ConsentListParams {
pub page: Option<u64>,
pub page_size: Option<u64>,
}
pub async fn list_consents<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(patient_id): Path<uuid::Uuid>,
Query(params): Query<ConsentListParams>,
) -> Result<Json<ApiResponse<PaginatedResponse<ConsentResp>>>, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.patient.list")?;
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let result = consent_service::list_consents(
&state, ctx.tenant_id, patient_id, page, page_size,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
pub async fn grant_consent<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Json(req): Json<CreateConsentReq>,
) -> Result<Json<ApiResponse<ConsentResp>>, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.patient.manage")?;
let result = consent_service::grant_consent(
&state, ctx.tenant_id, Some(ctx.user_id), req,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
pub async fn revoke_consent<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(consent_id): Path<uuid::Uuid>,
Json(req): Json<RevokeConsentReq>,
) -> Result<Json<ApiResponse<ConsentResp>>, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.patient.manage")?;
let result = consent_service::revoke_consent(
&state, ctx.tenant_id, consent_id, Some(ctx.user_id), req,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}

View File

@@ -0,0 +1,111 @@
use axum::Extension;
use axum::extract::{FromRef, Json, Path, State};
use serde::Deserialize;
use erp_core::error::AppError;
use erp_core::rbac::require_permission;
use erp_core::types::{ApiResponse, TenantContext};
use crate::service::critical_value_threshold_service;
use crate::state::HealthState;
#[derive(Debug, Deserialize)]
pub struct CreateThresholdReq {
pub indicator: String,
pub direction: String,
pub threshold_value: f64,
pub level: Option<String>,
pub department: Option<String>,
pub age_min: Option<i32>,
pub age_max: Option<i32>,
}
#[derive(Debug, Deserialize)]
pub struct UpdateThresholdReq {
pub threshold_value: f64,
pub level: Option<String>,
pub department: Option<String>,
pub age_min: Option<i32>,
pub age_max: Option<i32>,
pub version: i32,
}
pub async fn list_thresholds<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
) -> Result<Json<ApiResponse<Vec<crate::entity::critical_value_threshold::Model>>>, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.health-data.list")?;
let list = critical_value_threshold_service::find_thresholds(&state.db, ctx.tenant_id).await?;
Ok(Json(ApiResponse::ok(list)))
}
pub async fn create_threshold<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Json(req): Json<CreateThresholdReq>,
) -> Result<Json<ApiResponse<crate::entity::critical_value_threshold::Model>>, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.health-data.manage")?;
let result = critical_value_threshold_service::create_threshold(
&state.db,
ctx.tenant_id,
Some(ctx.user_id),
req.indicator,
req.direction,
req.threshold_value,
req.level.unwrap_or_else(|| "critical".to_string()),
req.department,
req.age_min,
req.age_max,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
pub async fn update_threshold<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<uuid::Uuid>,
Json(req): Json<UpdateThresholdReq>,
) -> Result<Json<ApiResponse<crate::entity::critical_value_threshold::Model>>, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.health-data.manage")?;
let result = critical_value_threshold_service::update_threshold(
&state.db,
ctx.tenant_id,
id,
Some(ctx.user_id),
req.threshold_value,
req.level.unwrap_or_else(|| "critical".to_string()),
req.department,
req.age_min,
req.age_max,
req.version,
)
.await?;
Ok(Json(ApiResponse::ok(result)))
}
pub async fn delete_threshold<S>(
State(state): State<HealthState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<uuid::Uuid>,
) -> Result<Json<ApiResponse<()>>, AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "health.health-data.manage")?;
critical_value_threshold_service::delete_threshold(&state.db, ctx.tenant_id, id, Some(ctx.user_id))
.await?;
Ok(Json(ApiResponse::ok(())))
}

View File

@@ -1,6 +1,8 @@
pub mod appointment_handler;
pub mod article_handler;
pub mod consultation_handler;
pub mod consent_handler;
pub mod critical_value_threshold_handler;
pub mod daily_monitoring_handler;
pub mod diagnosis_handler;
pub mod dialysis_handler;

View File

@@ -6,7 +6,7 @@ use erp_core::events::EventBus;
use erp_core::module::{ErpModule, PermissionDescriptor};
use crate::handler::{
appointment_handler, article_handler, consultation_handler, daily_monitoring_handler, diagnosis_handler, dialysis_handler, doctor_handler, follow_up_handler,
appointment_handler, article_handler, consultation_handler, consent_handler, critical_value_threshold_handler, daily_monitoring_handler, diagnosis_handler, dialysis_handler, doctor_handler, follow_up_handler,
health_data_handler, patient_handler, points_handler, stats_handler,
};
@@ -410,6 +410,30 @@ impl HealthModule {
"/health/admin/statistics/follow-ups",
axum::routing::get(stats_handler::get_follow_up_stats),
)
// 危急值阈值配置
.route(
"/health/critical-value-thresholds",
axum::routing::get(critical_value_threshold_handler::list_thresholds)
.post(critical_value_threshold_handler::create_threshold),
)
.route(
"/health/critical-value-thresholds/{id}",
axum::routing::put(critical_value_threshold_handler::update_threshold)
.delete(critical_value_threshold_handler::delete_threshold),
)
// 知情同意记录
.route(
"/health/patients/{patient_id}/consents",
axum::routing::get(consent_handler::list_consents),
)
.route(
"/health/consents",
axum::routing::post(consent_handler::grant_consent),
)
.route(
"/health/consents/{consent_id}/revoke",
axum::routing::put(consent_handler::revoke_consent),
)
}
}

View File

@@ -0,0 +1,187 @@
use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect};
use uuid::Uuid;
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::error::check_version;
use erp_core::types::PaginatedResponse;
use crate::dto::consent_dto::*;
use crate::entity::{consent, patient};
use crate::error::{HealthError, HealthResult};
use crate::state::HealthState;
pub async fn list_consents(
state: &HealthState,
tenant_id: Uuid,
patient_id: Uuid,
page: u64,
page_size: u64,
) -> HealthResult<PaginatedResponse<ConsentResp>> {
let limit = page_size.min(100);
let offset = page.saturating_sub(1) * limit;
let mut query = consent::Entity::find()
.filter(consent::Column::TenantId.eq(tenant_id))
.filter(consent::Column::PatientId.eq(patient_id))
.filter(consent::Column::DeletedAt.is_null());
let total: u64 = query
.clone()
.count(&state.db)
.await?;
let rows: Vec<consent::Model> = query
.order_by_desc(consent::Column::CreatedAt)
.limit(limit)
.offset(offset)
.all(&state.db)
.await?;
let total_pages = ((total as f64) / (page_size as f64)).ceil() as u64;
let data = rows.into_iter().map(|m| ConsentResp {
id: m.id,
patient_id: m.patient_id,
consent_type: m.consent_type,
consent_scope: m.consent_scope,
status: m.status,
granted_at: m.granted_at,
revoked_at: m.revoked_at,
expiry_date: m.expiry_date,
consent_method: m.consent_method,
witness_name: m.witness_name,
notes: m.notes,
created_at: m.created_at,
updated_at: m.updated_at,
version: m.version,
}).collect();
Ok(PaginatedResponse { data, total, page, page_size, total_pages })
}
pub async fn grant_consent(
state: &HealthState,
tenant_id: Uuid,
operator_id: Option<Uuid>,
req: CreateConsentReq,
) -> HealthResult<ConsentResp> {
// 校验患者存在
patient::Entity::find()
.filter(patient::Column::Id.eq(req.patient_id))
.filter(patient::Column::TenantId.eq(tenant_id))
.filter(patient::Column::DeletedAt.is_null())
.one(&state.db)
.await?
.ok_or(HealthError::PatientNotFound)?;
validate_consent_type(&req.consent_type)?;
let now = Utc::now();
let model = consent::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
patient_id: Set(req.patient_id),
consent_type: Set(req.consent_type),
consent_scope: Set(req.consent_scope),
status: Set("granted".to_string()),
granted_at: Set(Some(now)),
revoked_at: Set(None),
expiry_date: Set(req.expiry_date),
consent_method: Set(req.consent_method),
witness_name: Set(req.witness_name),
notes: Set(req.notes),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version: Set(1),
};
let m = model.insert(&state.db).await?;
audit_service::record(
AuditLog::new(tenant_id, operator_id, "consent.granted", "consent")
.with_resource_id(m.id),
&state.db,
).await;
Ok(model_to_resp(m))
}
pub async fn revoke_consent(
state: &HealthState,
tenant_id: Uuid,
consent_id: Uuid,
operator_id: Option<Uuid>,
req: RevokeConsentReq,
) -> HealthResult<ConsentResp> {
let existing = consent::Entity::find_by_id(consent_id)
.one(&state.db)
.await?
.ok_or(HealthError::ConsentNotFound)?;
if existing.tenant_id != tenant_id {
return Err(HealthError::ConsentNotFound);
}
check_version(req.version, existing.version).map_err(|_| HealthError::VersionMismatch)?;
let now = Utc::now();
let mut active: consent::ActiveModel = existing.into();
active.status = Set("revoked".to_string());
active.revoked_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(operator_id);
active.version = Set(req.version + 1);
if let Some(notes) = req.notes {
active.notes = Set(Some(notes));
}
let m = active.update(&state.db).await?;
audit_service::record(
AuditLog::new(tenant_id, operator_id, "consent.revoked", "consent")
.with_resource_id(m.id),
&state.db,
).await;
Ok(model_to_resp(m))
}
fn model_to_resp(m: consent::Model) -> ConsentResp {
ConsentResp {
id: m.id,
patient_id: m.patient_id,
consent_type: m.consent_type,
consent_scope: m.consent_scope,
status: m.status,
granted_at: m.granted_at,
revoked_at: m.revoked_at,
expiry_date: m.expiry_date,
consent_method: m.consent_method,
witness_name: m.witness_name,
notes: m.notes,
created_at: m.created_at,
updated_at: m.updated_at,
version: m.version,
}
}
fn validate_consent_type(consent_type: &str) -> HealthResult<()> {
let valid = [
"data_processing", // 数据处理同意
"health_data_collection", // 健康数据采集
"research_use", // 科研使用
"third_party_share", // 第三方共享
"genetic_testing", // 基因检测
"telemedicine", // 远程医疗
];
if valid.contains(&consent_type) {
Ok(())
} else {
Err(HealthError::Validation(format!(
"consent_type 必须为以下之一: {}",
valid.join(", ")
)))
}
}

View File

@@ -0,0 +1,198 @@
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use uuid::Uuid;
use crate::entity::critical_value_threshold;
use crate::error::{HealthError, HealthResult};
/// 阈值查询条件:指标名 + 方向
pub struct ThresholdQuery {
pub indicator: String,
pub direction: String,
}
/// 查询指定租户的活跃危急值阈值。
///
/// 如果指定了 department 和 age优先匹配精确条件回退到通用阈值。
pub async fn find_thresholds(
db: &DatabaseConnection,
tenant_id: Uuid,
) -> HealthResult<Vec<critical_value_threshold::Model>> {
let list = critical_value_threshold::Entity::find()
.filter(critical_value_threshold::Column::TenantId.eq(tenant_id))
.filter(critical_value_threshold::Column::IsActive.eq(true))
.filter(critical_value_threshold::Column::DeletedAt.is_null())
.all(db)
.await?;
Ok(list)
}
/// 查找匹配的阈值配置(按 indicator + direction 匹配)。
///
/// 优先匹配有科室/年龄限制的精确规则,否则返回通用规则。
pub async fn find_threshold(
db: &DatabaseConnection,
tenant_id: Uuid,
indicator: &str,
direction: &str,
department: Option<&str>,
age: Option<i32>,
) -> HealthResult<Option<critical_value_threshold::Model>> {
let all = find_thresholds(db, tenant_id).await?;
let mut exact_match: Option<&critical_value_threshold::Model> = None;
let mut generic_match: Option<&critical_value_threshold::Model> = None;
for t in &all {
if t.indicator != indicator || t.direction != direction {
continue;
}
let dept_match = match (t.department.as_deref(), department) {
(Some(td), Some(d)) => td == d,
(None, _) => true, // 通用规则匹配任意科室
(Some(_), None) => false, // 有科室限制但没传科室
};
let age_match = match (t.age_min, t.age_max, age) {
(Some(min), Some(max), Some(a)) => a >= min && a <= max,
(None, None, _) => true, // 通用规则匹配任意年龄
_ => false, // 有年龄限制但没传年龄
};
if dept_match && age_match {
if t.department.is_some() || t.age_min.is_some() {
// 精确规则
if exact_match.is_none() {
exact_match = Some(t);
}
} else {
// 通用规则
if generic_match.is_none() {
generic_match = Some(t);
}
}
}
}
Ok(exact_match.or(generic_match).cloned())
}
/// 创建新的危急值阈值。
pub async fn create_threshold(
db: &DatabaseConnection,
tenant_id: Uuid,
operator_id: Option<Uuid>,
indicator: String,
direction: String,
threshold_value: f64,
level: String,
department: Option<String>,
age_min: Option<i32>,
age_max: Option<i32>,
) -> HealthResult<critical_value_threshold::Model> {
validate_direction(&direction)?;
validate_indicator(&indicator)?;
let now = chrono::Utc::now();
let model = critical_value_threshold::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
indicator: Set(indicator),
direction: Set(direction),
threshold_value: Set(threshold_value),
level: Set(level),
department: Set(department),
age_min: Set(age_min),
age_max: Set(age_max),
is_active: Set(true),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version: Set(1),
};
let result = model.insert(db).await?;
Ok(result)
}
/// 更新危急值阈值。
pub async fn update_threshold(
db: &DatabaseConnection,
tenant_id: Uuid,
threshold_id: Uuid,
operator_id: Option<Uuid>,
threshold_value: f64,
level: String,
department: Option<String>,
age_min: Option<i32>,
age_max: Option<i32>,
expected_version: i32,
) -> HealthResult<critical_value_threshold::Model> {
let existing = critical_value_threshold::Entity::find_by_id(threshold_id)
.one(db)
.await?
.ok_or(HealthError::ThresholdNotFound)?;
if existing.tenant_id != tenant_id {
return Err(HealthError::ThresholdNotFound);
}
erp_core::error::check_version(existing.version, expected_version)?;
let now = chrono::Utc::now();
let mut active: critical_value_threshold::ActiveModel = existing.into();
active.threshold_value = Set(threshold_value);
active.level = Set(level);
active.department = Set(department);
active.age_min = Set(age_min);
active.age_max = Set(age_max);
active.updated_at = Set(now);
active.updated_by = Set(operator_id);
active.version = Set(expected_version + 1);
let result = active.update(db).await?;
Ok(result)
}
/// 软删除危急值阈值。
pub async fn delete_threshold(
db: &DatabaseConnection,
tenant_id: Uuid,
threshold_id: Uuid,
operator_id: Option<Uuid>,
) -> HealthResult<()> {
let existing = critical_value_threshold::Entity::find_by_id(threshold_id)
.one(db)
.await?
.ok_or(HealthError::ThresholdNotFound)?;
if existing.tenant_id != tenant_id {
return Err(HealthError::ThresholdNotFound);
}
let mut active: critical_value_threshold::ActiveModel = existing.into();
active.deleted_at = Set(Some(chrono::Utc::now()));
active.updated_by = Set(operator_id);
active.update(db).await?;
Ok(())
}
fn validate_direction(direction: &str) -> HealthResult<()> {
if matches!(direction, "high" | "low") {
Ok(())
} else {
Err(HealthError::Validation(
"direction 必须为 'high' 或 'low'".to_string(),
))
}
}
fn validate_indicator(indicator: &str) -> HealthResult<()> {
let valid = ["systolic_bp", "diastolic_bp", "heart_rate", "blood_sugar"];
if valid.contains(&indicator) {
Ok(())
} else {
Err(HealthError::Validation(format!(
"indicator 必须为以下之一: {}",
valid.join(", ")
)))
}
}

View File

@@ -456,34 +456,42 @@ pub async fn check_overdue_tasks(db: &DatabaseConnection) -> HealthResult<u64> {
Ok(result.rows_affected)
}
/// 逾期随访检查 + 事件发布版本。
/// 标记逾期后,查询被标记的任务并为每个发布 `follow_up.overdue` 事件。
/// 逾期随访检查 + 事件发布版本(幂等保护)
///
/// 只发布**本次新被标记**为 overdue 的事件,避免重复通知。
/// 幂等策略:先查出即将被标记的 pending 任务,批量更新后只为这些任务发事件。
pub async fn check_overdue_and_notify(state: &HealthState) -> HealthResult<u64> {
let db = &state.db;
let today = chrono::Utc::now().date_naive();
// 1. 先查出即将被标记的 pending 任务(幂等:只有这些才需要通知)
let newly_overdue: Vec<follow_up_task::Model> = follow_up_task::Entity::find()
.filter(follow_up_task::Column::Status.eq("pending"))
.filter(follow_up_task::Column::PlannedDate.lt(today))
.filter(follow_up_task::Column::DeletedAt.is_null())
.all(db)
.await?;
if newly_overdue.is_empty() {
return Ok(0);
}
// 2. 批量更新状态
let count = check_overdue_tasks(db).await?;
if count > 0 {
let today = chrono::Utc::now().date_naive();
let overdue_tasks: Vec<follow_up_task::Model> = follow_up_task::Entity::find()
.filter(follow_up_task::Column::Status.eq("overdue"))
.filter(follow_up_task::Column::PlannedDate.lt(today))
.filter(follow_up_task::Column::DeletedAt.is_null())
.all(db)
.await?;
for task in overdue_tasks {
let event = erp_core::events::DomainEvent::new(
"follow_up.overdue",
task.tenant_id,
serde_json::json!({
"task_id": task.id,
"patient_id": task.patient_id,
"assigned_to": task.assigned_to,
"planned_date": task.planned_date,
}),
);
state.event_bus.publish(event, db).await;
}
// 3. 只为本次新标记的任务发布事件
for task in newly_overdue {
let event = erp_core::events::DomainEvent::new(
"follow_up.overdue",
task.tenant_id,
serde_json::json!({
"task_id": task.id,
"patient_id": task.patient_id,
"assigned_to": task.assigned_to,
"planned_date": task.planned_date,
}),
);
state.event_bus.publish(event, db).await;
}
Ok(count)

View File

@@ -13,7 +13,7 @@ use erp_core::error::check_version;
use erp_core::types::PaginatedResponse;
use crate::dto::health_data_dto::*;
use crate::entity::{health_record, lab_report, patient, vital_signs};
use crate::entity::{doctor_profile, health_record, lab_report, patient, patient_doctor_relation, vital_signs};
use crate::error::{HealthError, HealthResult};
use crate::service::validation::validate_record_type;
use crate::state::HealthState;
@@ -86,7 +86,7 @@ pub async fn create_vital_signs(
.ok_or(HealthError::PatientNotFound)?;
let now = Utc::now();
check_vital_signs_alert(state, tenant_id, patient_id, req.clone()).await;
check_vital_signs_alert(state, tenant_id, patient_id, operator_id, req.clone()).await;
let active = vital_signs::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
@@ -169,6 +169,23 @@ pub async fn update_vital_signs(
let m = active.update(&state.db).await?;
// 更新后也触发危急值检测(修改后的值可能触发告警)
let check_req = CreateVitalSignsReq {
record_date: m.record_date,
systolic_bp_morning: m.systolic_bp_morning,
diastolic_bp_morning: m.diastolic_bp_morning,
systolic_bp_evening: m.systolic_bp_evening,
diastolic_bp_evening: m.diastolic_bp_evening,
heart_rate: m.heart_rate,
weight: m.weight.map(|d| d.to_f64().unwrap_or(0.0)),
blood_sugar: m.blood_sugar.map(|d| d.to_f64().unwrap_or(0.0)),
water_intake_ml: m.water_intake_ml,
urine_output_ml: m.urine_output_ml,
notes: m.notes.clone(),
source: Some(m.source.clone()),
};
check_vital_signs_alert(state, tenant_id, patient_id, operator_id, check_req).await;
audit_service::record(
AuditLog::new(tenant_id, operator_id, "vital_signs.updated", "vital_signs")
.with_resource_id(m.id),
@@ -628,113 +645,145 @@ pub async fn delete_health_record(
// 危急值预警检测
// ---------------------------------------------------------------------------
/// 检查体征数据中的危急值,发布 `health_data.critical_alert` 事件
/// 检查体征数据中的危急值,发布 `health_data.critical_alert` 事件
///
/// 阈值从 `critical_value_threshold` 表加载,支持按科室/年龄差异化配置。
/// 事件 payload 包含:患者信息、责任医生、操作人信息、告警详情。
async fn check_vital_signs_alert(
state: &HealthState,
tenant_id: Uuid,
patient_id: Uuid,
operator_id: Option<Uuid>,
req: CreateVitalSignsReq,
) {
// 从数据库加载阈值配置
let thresholds = match crate::service::critical_value_threshold_service::find_thresholds(
&state.db, tenant_id,
)
.await
{
Ok(t) if !t.is_empty() => t,
Ok(_) => {
tracing::warn!(tenant_id = %tenant_id, "无危急值阈值配置,跳过告警检测");
return;
}
Err(e) => {
tracing::warn!(error = %e, "加载危急值阈值失败,跳过告警检测");
return;
}
};
let mut alerts: Vec<serde_json::Value> = Vec::new();
// 收缩压危急值
if let Some(sbp) = req.systolic_bp_morning.or(req.systolic_bp_evening) {
if sbp >= 180 {
alerts.push(serde_json::json!({
"indicator": "systolic_bp",
"value": sbp,
"threshold": 180,
"level": "critical",
"direction": "high"
}));
} else if sbp <= 80 {
alerts.push(serde_json::json!({
"indicator": "systolic_bp",
"value": sbp,
"threshold": 80,
"level": "critical",
"direction": "low"
}));
}
check_indicator(&thresholds, "systolic_bp", sbp as f64, &mut alerts);
}
// 舒张压危急值
if let Some(dbp) = req.diastolic_bp_morning.or(req.diastolic_bp_evening) {
if dbp >= 110 {
alerts.push(serde_json::json!({
"indicator": "diastolic_bp",
"value": dbp,
"threshold": 110,
"level": "critical",
"direction": "high"
}));
} else if dbp <= 50 {
alerts.push(serde_json::json!({
"indicator": "diastolic_bp",
"value": dbp,
"threshold": 50,
"level": "critical",
"direction": "low"
}));
}
check_indicator(&thresholds, "diastolic_bp", dbp as f64, &mut alerts);
}
// 心率危急值
if let Some(hr) = req.heart_rate {
if hr >= 150 {
alerts.push(serde_json::json!({
"indicator": "heart_rate",
"value": hr,
"threshold": 150,
"level": "critical",
"direction": "high"
}));
} else if hr <= 40 {
alerts.push(serde_json::json!({
"indicator": "heart_rate",
"value": hr,
"threshold": 40,
"level": "critical",
"direction": "low"
}));
}
check_indicator(&thresholds, "heart_rate", hr as f64, &mut alerts);
}
// 血糖危急值
if let Some(bs) = req.blood_sugar {
if bs >= 25.0 {
alerts.push(serde_json::json!({
"indicator": "blood_sugar",
"value": bs,
"threshold": 25.0,
"level": "critical",
"direction": "high"
}));
} else if bs <= 2.5 {
alerts.push(serde_json::json!({
"indicator": "blood_sugar",
"value": bs,
"threshold": 2.5,
"level": "critical",
"direction": "low"
}));
}
check_indicator(&thresholds, "blood_sugar", bs.to_f64().unwrap_or(0.0), &mut alerts);
}
for alert in alerts {
let event = erp_core::events::DomainEvent::new(
if alerts.is_empty() {
return;
}
// 查询患者信息
let patient_model = patient::Entity::find_by_id(patient_id)
.one(&state.db)
.await
.ok()
.flatten();
let patient_name = patient_model
.as_ref()
.map(|p| p.name.as_str())
.unwrap_or("未知患者");
// 查询责任医生(通过 patient_doctor_relation 的 attending 类型)
let attending_relation = patient_doctor_relation::Entity::find()
.filter(patient_doctor_relation::Column::PatientId.eq(patient_id))
.filter(patient_doctor_relation::Column::TenantId.eq(tenant_id))
.filter(patient_doctor_relation::Column::DeletedAt.is_null())
.filter(patient_doctor_relation::Column::RelationshipType.eq("attending"))
.one(&state.db)
.await
.ok()
.flatten();
let doctor_user_id: Option<Uuid> = if let Some(rel) = attending_relation {
doctor_profile::Entity::find_by_id(rel.doctor_id)
.one(&state.db)
.await
.ok()
.flatten()
.and_then(|d| d.user_id)
} else {
None
};
for alert in &alerts {
let mut payload = serde_json::json!({
"patient_id": patient_id,
"patient_name": patient_name,
"operator_id": operator_id,
"alert": alert,
});
if let Some(did) = doctor_user_id {
payload["doctor_user_id"] = serde_json::json!(did);
}
let event = DomainEvent::new(
"health_data.critical_alert",
tenant_id,
serde_json::json!({
"patient_id": patient_id,
"alert": alert,
}),
payload,
);
state.event_bus.publish(event, &state.db).await;
tracing::warn!(
patient_id = %patient_id,
tenant_id = %tenant_id,
indicator = %alert["indicator"],
value = %alert["value"],
"体征危急值预警已发布"
);
}
}
/// 根据阈值配置检查单个指标值,匹配则添加到 alerts。
fn check_indicator(
thresholds: &[crate::entity::critical_value_threshold::Model],
indicator: &str,
value: f64,
alerts: &mut Vec<serde_json::Value>,
) {
for t in thresholds {
if t.indicator != indicator {
continue;
}
let triggered = match t.direction.as_str() {
"high" => value >= t.threshold_value,
"low" => value <= t.threshold_value,
_ => false,
};
if triggered {
alerts.push(serde_json::json!({
"indicator": indicator,
"value": value,
"threshold": t.threshold_value,
"level": t.level,
"direction": t.direction,
}));
return;
}
}
}

View File

@@ -1,6 +1,8 @@
pub mod appointment_service;
pub mod article_service;
pub mod consultation_service;
pub mod consent_service;
pub mod critical_value_threshold_service;
pub mod daily_monitoring_service;
pub mod diagnosis_service;
pub mod dialysis_service;

View File

@@ -205,6 +205,14 @@ pub async fn update_patient(
])?;
}
// 记录变更前的关键临床值(过敏史、病史、身份证号)
let old_snapshot = serde_json::json!({
"allergy_history": model.allergy_history,
"medical_history_summary": model.medical_history_summary,
"status": model.status,
"verification_status": model.verification_status,
});
let mut active: patient::ActiveModel = model.into();
if let Some(v) = req.name { active.name = Set(v); }
@@ -232,6 +240,14 @@ pub async fn update_patient(
let updated = active.update(&state.db).await?;
// 变更后快照
let new_snapshot = serde_json::json!({
"allergy_history": updated.allergy_history,
"medical_history_summary": updated.medical_history_summary,
"status": updated.status,
"verification_status": updated.verification_status,
});
// 根据状态变更发布不同事件
let event_type = if req.status.as_deref() == Some("deceased") {
"patient.deceased"
@@ -249,7 +265,8 @@ pub async fn update_patient(
audit_service::record(
AuditLog::new(tenant_id, operator_id, "patient.updated", "patient")
.with_resource_id(updated.id),
.with_resource_id(updated.id)
.with_changes(Some(old_snapshot), Some(new_snapshot)),
&state.db,
).await;

View File

@@ -280,6 +280,127 @@ async fn handle_workflow_event(
.map_err(|e| e.to_string())?;
}
}
"health_data.critical_alert" => {
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("未知患者");
let alert = event.payload.get("alert");
let indicator = alert
.and_then(|a| a.get("indicator"))
.and_then(|v| v.as_str())
.unwrap_or("未知指标");
let value = alert
.and_then(|a| a.get("value"))
.map(|v| v.to_string())
.unwrap_or_else(|| "?".to_string());
let direction = alert
.and_then(|a| a.get("direction"))
.and_then(|v| v.as_str())
.unwrap_or("high");
let direction_text = match direction {
"low" => "偏低",
_ => "偏高",
};
// 通知责任医生(优先)
if let Some(doctor_uid) = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
doctor_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{}),请立即关注处理。",
patient_name, indicator, direction_text, value
),
"urgent",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
// 同时通知操作人(录入者)
if let Some(operator_uid) = event
.payload
.get("operator_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
// 避免医生和操作人是同一人时重复通知
let is_doctor = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.map(|s| s == operator_uid.to_string())
.unwrap_or(false);
if !is_doctor {
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
operator_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{})已触发危急值告警,已通知责任医生。",
patient_name, indicator, direction_text, value
),
"important",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
}
"follow_up.overdue" => {
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let assigned_to = event
.payload
.get("assigned_to")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let planned_date = event
.payload
.get("planned_date")
.and_then(|v| v.as_str())
.unwrap_or("未知日期");
if let Some(assignee) = assigned_to {
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
assignee,
"随访任务逾期提醒".to_string(),
format!(
"您的随访任务(计划日期:{})已逾期,请尽快处理。",
planned_date
),
"important",
Some("follow_up".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
_ => {}
}
Ok(())

View File

@@ -59,6 +59,8 @@ mod m20260426_000056_create_diagnosis;
mod m20260426_000057_rename_points_transaction_type_column;
mod m20260426_000058_merge_daily_monitoring_into_vital_signs;
mod m20260426_000059_seed_menus;
mod m20260426_000060_create_critical_value_thresholds;
mod m20260426_000061_create_consent;
pub struct Migrator;
@@ -125,6 +127,8 @@ impl MigratorTrait for Migrator {
Box::new(m20260426_000057_rename_points_transaction_type_column::Migration),
Box::new(m20260426_000058_merge_daily_monitoring_into_vital_signs::Migration),
Box::new(m20260426_000059_seed_menus::Migration),
Box::new(m20260426_000060_create_critical_value_thresholds::Migration),
Box::new(m20260426_000061_create_consent::Migration),
]
}
}

View File

@@ -0,0 +1,141 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(CriticalValueThreshold::Table)
.col(
ColumnDef::new(CriticalValueThreshold::Id)
.uuid()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(CriticalValueThreshold::TenantId)
.uuid()
.not_null(),
)
.col(
ColumnDef::new(CriticalValueThreshold::Indicator)
.string_len(50)
.not_null(),
)
.col(
ColumnDef::new(CriticalValueThreshold::Direction)
.string_len(10)
.not_null(),
)
.col(
ColumnDef::new(CriticalValueThreshold::ThresholdValue)
.double()
.not_null(),
)
.col(
ColumnDef::new(CriticalValueThreshold::Level)
.string_len(20)
.not_null()
.default("critical"),
)
.col(
ColumnDef::new(CriticalValueThreshold::Department)
.string_len(100),
)
.col(ColumnDef::new(CriticalValueThreshold::AgeMin).integer())
.col(ColumnDef::new(CriticalValueThreshold::AgeMax).integer())
.col(
ColumnDef::new(CriticalValueThreshold::IsActive)
.boolean()
.not_null()
.default(true),
)
.col(
ColumnDef::new(CriticalValueThreshold::CreatedAt)
.timestamp_with_time_zone()
.not_null()
.default(Expr::current_timestamp()),
)
.col(
ColumnDef::new(CriticalValueThreshold::UpdatedAt)
.timestamp_with_time_zone()
.not_null()
.default(Expr::current_timestamp()),
)
.col(ColumnDef::new(CriticalValueThreshold::CreatedBy).uuid())
.col(ColumnDef::new(CriticalValueThreshold::UpdatedBy).uuid())
.col(ColumnDef::new(CriticalValueThreshold::DeletedAt).timestamp_with_time_zone())
.col(
ColumnDef::new(CriticalValueThreshold::Version)
.integer()
.not_null()
.default(1),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_cvt_tenant_indicator_direction")
.table(CriticalValueThreshold::Table)
.col(CriticalValueThreshold::TenantId)
.col(CriticalValueThreshold::Indicator)
.col(CriticalValueThreshold::Direction)
.to_owned(),
)
.await?;
// 种子数据:默认危急值阈值
let sql = r#"
INSERT INTO critical_value_threshold (id, tenant_id, indicator, direction, threshold_value, level, created_at, updated_at) VALUES
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'systolic_bp', 'high', 180, 'critical', now(), now()),
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'systolic_bp', 'low', 80, 'critical', now(), now()),
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'diastolic_bp', 'high', 110, 'critical', now(), now()),
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'diastolic_bp', 'low', 50, 'critical', now(), now()),
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'heart_rate', 'high', 150, 'critical', now(), now()),
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'heart_rate', 'low', 40, 'critical', now(), now()),
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'blood_sugar', 'high', 25.0, 'critical', now(), now()),
(gen_random_uuid(), '00000000-0000-0000-0000-000000000001', 'blood_sugar', 'low', 2.5, 'critical', now(), now())
"#;
manager.get_connection().execute_unprepared(sql).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(CriticalValueThreshold::Table)
.to_owned(),
)
.await
}
}
#[derive(DeriveIden)]
enum CriticalValueThreshold {
Table,
Id,
TenantId,
Indicator,
Direction,
ThresholdValue,
Level,
Department,
AgeMin,
AgeMax,
IsActive,
CreatedAt,
UpdatedAt,
CreatedBy,
UpdatedBy,
DeletedAt,
Version,
}

View File

@@ -0,0 +1,102 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Consent::Table)
.col(
ColumnDef::new(Consent::Id)
.uuid()
.not_null()
.primary_key(),
)
.col(ColumnDef::new(Consent::TenantId).uuid().not_null())
.col(ColumnDef::new(Consent::PatientId).uuid().not_null())
.col(ColumnDef::new(Consent::ConsentType).string_len(50).not_null())
.col(ColumnDef::new(Consent::ConsentScope).string_len(100).not_null())
.col(
ColumnDef::new(Consent::Status)
.string_len(20)
.not_null()
.default("granted"),
)
.col(ColumnDef::new(Consent::GrantedAt).timestamp_with_time_zone())
.col(ColumnDef::new(Consent::RevokedAt).timestamp_with_time_zone())
.col(ColumnDef::new(Consent::ExpiryDate).date())
.col(ColumnDef::new(Consent::ConsentMethod).string_len(30))
.col(ColumnDef::new(Consent::WitnessName).string_len(100))
.col(ColumnDef::new(Consent::Notes).text())
.col(
ColumnDef::new(Consent::CreatedAt)
.timestamp_with_time_zone()
.not_null()
.default(Expr::current_timestamp()),
)
.col(
ColumnDef::new(Consent::UpdatedAt)
.timestamp_with_time_zone()
.not_null()
.default(Expr::current_timestamp()),
)
.col(ColumnDef::new(Consent::CreatedBy).uuid())
.col(ColumnDef::new(Consent::UpdatedBy).uuid())
.col(ColumnDef::new(Consent::DeletedAt).timestamp_with_time_zone())
.col(
ColumnDef::new(Consent::Version)
.integer()
.not_null()
.default(1),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_consent_tenant_patient")
.table(Consent::Table)
.col(Consent::TenantId)
.col(Consent::PatientId)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Consent::Table).to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Consent {
Table,
Id,
TenantId,
PatientId,
ConsentType,
ConsentScope,
Status,
GrantedAt,
RevokedAt,
ExpiryDate,
ConsentMethod,
WitnessName,
Notes,
CreatedAt,
UpdatedAt,
CreatedBy,
UpdatedBy,
DeletedAt,
Version,
}

View File

@@ -1,20 +1,32 @@
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set};
use std::time::Duration;
use erp_core::entity::domain_event;
use erp_core::events::{DomainEvent, EventBus};
const MAX_RETRY: i32 = 5;
/// 启动 outbox relay 后台任务。
///
/// 定期扫描 domain_events 表中 status = 'pending' 的事件,
/// 重新广播并标记为 published
/// 先执行一次性扫描(处理服务重启前遗留的 pending 事件
/// 然后每 5 秒定期扫描 domain_events 表中 status = 'pending' 的事件
pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let db_clone = db.clone();
let event_bus_clone = event_bus.clone();
tokio::spawn(async move {
// 启动时立即处理一次(恢复重启前未广播的事件)
match process_pending_events(&db_clone, &event_bus_clone).await {
Ok(count) if count > 0 => tracing::info!(count = count, "启动时 outbox relay 恢复完成"),
Ok(_) => tracing::info!("启动时 outbox relay 无待处理事件"),
Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"),
}
// 定期轮询
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
if let Err(e) = process_pending_events(&db, &event_bus).await {
if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await {
tracing::warn!(error = %e, "Outbox relay 处理失败");
}
}
@@ -24,35 +36,42 @@ pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus)
async fn process_pending_events(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> Result<(), sea_orm::DbErr> {
) -> Result<usize, sea_orm::DbErr> {
let pending = domain_event::Entity::find()
.filter(domain_event::Column::Status.eq("pending"))
.filter(domain_event::Column::Attempts.lt(3))
.filter(domain_event::Column::Attempts.lt(MAX_RETRY))
.order_by_asc(domain_event::Column::CreatedAt)
.limit(100)
.all(db)
.await?;
if pending.is_empty() {
return Ok(());
return Ok(0);
}
tracing::info!(count = pending.len(), "处理待发领域事件");
let count = pending.len();
tracing::info!(count = count, "处理待发领域事件");
for event_model in pending {
// 重建 DomainEvent 并广播
let domain_event = DomainEvent::new(
&event_model.event_type,
event_model.tenant_id,
event_model.payload.clone().unwrap_or(serde_json::json!({})),
);
// 重建 DomainEvent 并广播(保留原始 ID 和时间戳)
let domain_event = DomainEvent {
id: event_model.id,
event_type: event_model.event_type.clone(),
tenant_id: event_model.tenant_id,
payload: event_model.payload.clone().unwrap_or(serde_json::json!({})),
timestamp: event_model.created_at,
correlation_id: event_model.correlation_id.unwrap_or(event_model.id),
};
event_bus.broadcast(domain_event);
// 标记为 published
// 标记为 published,增加 attempts 计数
let mut active: domain_event::ActiveModel = event_model.into();
active.status = Set("published".to_string());
active.published_at = Set(Some(Utc::now()));
active.attempts = Set(active.attempts.unwrap() + 1);
active.update(db).await?;
}
Ok(())
Ok(count)
}