diff --git a/crates/erp-health/src/entity/patient_doctor_relation.rs b/crates/erp-health/src/entity/patient_doctor_relation.rs index ef0e0e1..bfb4912 100644 --- a/crates/erp-health/src/entity/patient_doctor_relation.rs +++ b/crates/erp-health/src/entity/patient_doctor_relation.rs @@ -18,6 +18,7 @@ pub struct Model { pub updated_by: Option, #[sea_orm(skip_serializing_if = "Option::is_none")] pub deleted_at: Option, + pub version: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/erp-health/src/entity/patient_tag_relation.rs b/crates/erp-health/src/entity/patient_tag_relation.rs index 737fdc2..2e410c4 100644 --- a/crates/erp-health/src/entity/patient_tag_relation.rs +++ b/crates/erp-health/src/entity/patient_tag_relation.rs @@ -17,6 +17,7 @@ pub struct Model { pub updated_by: Option, #[sea_orm(skip_serializing_if = "Option::is_none")] pub deleted_at: Option, + pub version: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/erp-health/src/handler/consultation_handler.rs b/crates/erp-health/src/handler/consultation_handler.rs index a1e7bd4..66db058 100644 --- a/crates/erp-health/src/handler/consultation_handler.rs +++ b/crates/erp-health/src/handler/consultation_handler.rs @@ -35,7 +35,6 @@ pub struct CloseSessionReq { #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] pub struct CreateConsultationMessageReq { pub session_id: Uuid, - pub sender_id: Uuid, pub sender_role: String, pub content_type: Option, pub content: String, @@ -135,7 +134,7 @@ where require_permission(&ctx, "health.consultation.manage")?; let msg_req = CreateMessageReq { session_id: req.session_id, - sender_id: req.sender_id, + sender_id: ctx.user_id, sender_role: req.sender_role, content_type: req.content_type, content: req.content, diff --git a/crates/erp-health/src/service/appointment_service.rs b/crates/erp-health/src/service/appointment_service.rs index 6c634ae..9109ae4 100644 --- a/crates/erp-health/src/service/appointment_service.rs +++ b/crates/erp-health/src/service/appointment_service.rs @@ -187,7 +187,7 @@ pub async fn update_appointment_status( // 取消时释放排班名额(带下限保护) if req.status == "cancelled" { if let Some(did) = model.doctor_id { - let _ = doctor_schedule::Entity::update_many() + let release_result = doctor_schedule::Entity::update_many() .col_expr( doctor_schedule::Column::CurrentAppointments, Expr::col(doctor_schedule::Column::CurrentAppointments).sub(1), @@ -200,6 +200,9 @@ pub async fn update_appointment_status( .filter(Expr::col(doctor_schedule::Column::CurrentAppointments).gt(0)) .exec(&state.db) .await; + if let Err(e) = release_result { + tracing::error!(error = %e, "取消预约时释放排班名额失败"); + } } } @@ -279,6 +282,16 @@ pub async fn create_schedule( let now = Utc::now(); let period_type = req.period_type.unwrap_or_else(|| "am".to_string()); validate_period_type(&period_type)?; + + // H-6: 校验医生存在 + doctor_profile::Entity::find() + .filter(doctor_profile::Column::Id.eq(req.doctor_id)) + .filter(doctor_profile::Column::TenantId.eq(tenant_id)) + .filter(doctor_profile::Column::DeletedAt.is_null()) + .one(&state.db) + .await? + .ok_or(HealthError::DoctorNotFound)?; + let active = doctor_schedule::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), @@ -356,6 +369,12 @@ pub async fn calendar_view( end_date: chrono::NaiveDate, doctor_id: Option, ) -> HealthResult> { + // H-3: 限制日期范围跨度最多 90 天 + let max_span = chrono::Duration::days(90); + if end_date - start_date > max_span { + return Err(HealthError::Validation("日历查询范围不能超过 90 天".to_string())); + } + let mut query = doctor_schedule::Entity::find() .filter(doctor_schedule::Column::TenantId.eq(tenant_id)) .filter(doctor_schedule::Column::DeletedAt.is_null()) diff --git a/crates/erp-health/src/service/article_service.rs b/crates/erp-health/src/service/article_service.rs index 2cfb7c8..8bf41b7 100644 --- a/crates/erp-health/src/service/article_service.rs +++ b/crates/erp-health/src/service/article_service.rs @@ -65,7 +65,7 @@ pub async fn get_article( .filter(article::Column::PublishedAt.is_not_null()) .one(&state.db) .await? - .ok_or(HealthError::HealthRecordNotFound)?; + .ok_or(HealthError::ArticleNotFound)?; Ok(model_to_resp(model)) } diff --git a/crates/erp-health/src/service/consultation_service.rs b/crates/erp-health/src/service/consultation_service.rs index b488d7f..0611d90 100644 --- a/crates/erp-health/src/service/consultation_service.rs +++ b/crates/erp-health/src/service/consultation_service.rs @@ -181,6 +181,7 @@ pub async fn export_sessions( let models = query .order_by_desc(consultation_session::Column::CreatedAt) + .limit(10000) .all(&state.db) .await?; @@ -278,20 +279,34 @@ pub async fn create_message( let m = active.insert(&state.db).await?; // 更新会话的 last_message_at 和未读计数,waiting→active 自动触发 - let mut session_active: consultation_session::ActiveModel = session.into(); - session_active.last_message_at = Set(Some(now)); + // 使用 CAS 防止并发发消息时丢失 unread_count 更新 + let expected_version = session.version; + let mut cas = consultation_session::Entity::update_many() + .col_expr(consultation_session::Column::LastMessageAt, Expr::value(Some(now))) + .col_expr(consultation_session::Column::UpdatedAt, Expr::value(now)) + .col_expr(consultation_session::Column::Version, Expr::col(consultation_session::Column::Version).add(1)) + .filter(consultation_session::Column::Id.eq(req.session_id)) + .filter(consultation_session::Column::TenantId.eq(tenant_id)) + .filter(consultation_session::Column::Version.eq(expected_version)); + if should_activate { - session_active.status = Set("active".to_string()); + cas = cas.col_expr(consultation_session::Column::Status, Expr::value("active".to_string())); } - // 根据发送者角色更新对方的 unread_count if is_patient { - session_active.unread_count_doctor = Set(session_active.unread_count_doctor.unwrap() + 1); + cas = cas.col_expr( + consultation_session::Column::UnreadCountDoctor, + Expr::col(consultation_session::Column::UnreadCountDoctor).add(1), + ); } else { - session_active.unread_count_patient = Set(session_active.unread_count_patient.unwrap() + 1); + cas = cas.col_expr( + consultation_session::Column::UnreadCountPatient, + Expr::col(consultation_session::Column::UnreadCountPatient).add(1), + ); + } + let cas_result = cas.exec(&state.db).await?; + if cas_result.rows_affected == 0 { + return Err(HealthError::VersionMismatch); } - session_active.updated_at = Set(now); - session_active.version = Set(session_active.version.unwrap() + 1); - session_active.update(&state.db).await?; Ok(MessageResp { id: m.id, session_id: m.session_id, sender_id: m.sender_id, diff --git a/crates/erp-health/src/service/doctor_service.rs b/crates/erp-health/src/service/doctor_service.rs index 0e58f3a..7bbd923 100644 --- a/crates/erp-health/src/service/doctor_service.rs +++ b/crates/erp-health/src/service/doctor_service.rs @@ -31,13 +31,12 @@ pub async fn list_doctors( .filter(doctor_profile::Column::DeletedAt.is_null()); if let Some(ref s) = search { - let pattern = format!("%{}%", s); query = query.filter( Condition::any() - .add(doctor_profile::Column::Name.contains(&pattern)) - .add(doctor_profile::Column::LicenseNumber.contains(&pattern)) - .add(doctor_profile::Column::Department.contains(&pattern)) - .add(doctor_profile::Column::Specialty.contains(&pattern)), + .add(doctor_profile::Column::Name.contains(s)) + .add(doctor_profile::Column::LicenseNumber.contains(s)) + .add(doctor_profile::Column::Department.contains(s)) + .add(doctor_profile::Column::Specialty.contains(s)), ); } if let Some(ref d) = department { diff --git a/crates/erp-health/src/service/follow_up_service.rs b/crates/erp-health/src/service/follow_up_service.rs index c64d75d..7223d97 100644 --- a/crates/erp-health/src/service/follow_up_service.rs +++ b/crates/erp-health/src/service/follow_up_service.rs @@ -3,7 +3,7 @@ use chrono::Utc; use erp_core::events::DomainEvent; use sea_orm::entity::prelude::*; -use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect}; +use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect, TransactionTrait}; use uuid::Uuid; use erp_core::error::check_version; @@ -206,6 +206,9 @@ pub async fn create_record( let now = Utc::now(); + // 事务包裹:插入记录 + 更新任务状态 + 创建后续任务 + let txn = state.db.begin().await?; + let record_active = follow_up_record::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), @@ -223,17 +226,17 @@ pub async fn create_record( deleted_at: Set(None), version: Set(1), }; - let record = record_active.insert(&state.db).await?; + let record = record_active.insert(&txn).await?; + let task_patient_id = task.patient_id; + let task_assigned_to = task.assigned_to; + let task_follow_up_type = task.follow_up_type.clone(); let mut task_active: follow_up_task::ActiveModel = task.into(); - let task_patient_id = task_active.patient_id.clone().unwrap(); - let task_assigned_to = task_active.assigned_to.clone().unwrap(); - let task_follow_up_type = task_active.follow_up_type.clone().unwrap(); task_active.status = Set("completed".to_string()); task_active.updated_at = Set(now); task_active.updated_by = Set(operator_id); task_active.version = Set(task_active.version.unwrap() + 1); - task_active.update(&state.db).await?; + task_active.update(&txn).await?; // 当 next_follow_up_date 不为空时,自动创建后续随访任务 if let Some(next_date) = req.next_follow_up_date { @@ -254,9 +257,11 @@ pub async fn create_record( deleted_at: Set(None), version: Set(1), }; - new_task.insert(&state.db).await?; + new_task.insert(&txn).await?; } + txn.commit().await?; + let event = DomainEvent::new( "follow_up.completed", tenant_id, diff --git a/crates/erp-health/src/service/patient_service.rs b/crates/erp-health/src/service/patient_service.rs index ecc8a70..095f5ef 100644 --- a/crates/erp-health/src/service/patient_service.rs +++ b/crates/erp-health/src/service/patient_service.rs @@ -12,6 +12,7 @@ use erp_core::types::PaginatedResponse; use crate::dto::patient_dto::*; use crate::entity::patient; use crate::entity::patient_family_member; +use crate::entity::patient_tag; use crate::entity::patient_tag_relation; use crate::entity::patient_doctor_relation; use crate::error::{HealthError, HealthResult}; @@ -52,11 +53,10 @@ pub async fn list_patients( .filter(patient::Column::DeletedAt.is_null()); if let Some(ref search) = search { - let pattern = format!("%{}%", search); query = query.filter( Condition::any() - .add(patient::Column::Name.contains(&pattern)) - .add(patient::Column::IdNumber.contains(&pattern)), + .add(patient::Column::Name.contains(search)) + .add(patient::Column::IdNumber.contains(search)), ); } @@ -131,7 +131,7 @@ pub async fn create_patient( let event = DomainEvent::new( "patient.created", tenant_id, - serde_json::json!({ "patient_id": model.id, "name": model.name }), + serde_json::json!({ "patient_id": model.id }), ); state.event_bus.publish(event, &state.db).await; @@ -257,6 +257,19 @@ pub async fn manage_patient_tags( // 确认患者存在 find_patient(&state.db, tenant_id, patient_id).await?; + // H-1: 校验所有 tag_ids 属于当前租户 + if !req.tag_ids.is_empty() { + let valid_count = patient_tag::Entity::find() + .filter(patient_tag::Column::TenantId.eq(tenant_id)) + .filter(patient_tag::Column::Id.is_in(req.tag_ids.iter().copied())) + .filter(patient_tag::Column::DeletedAt.is_null()) + .count(&state.db) + .await?; + if valid_count != req.tag_ids.len() as u64 { + return Err(HealthError::Validation("部分标签不存在或不属于当前租户".to_string())); + } + } + let now = Utc::now(); // 软删除旧的关联 @@ -287,6 +300,7 @@ pub async fn manage_patient_tags( created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), + version: Set(1), }; rel.insert(&state.db).await?; } @@ -519,6 +533,18 @@ pub async fn assign_doctor( ) -> HealthResult<()> { find_patient(&state.db, tenant_id, patient_id).await?; + // H-2: 检查是否已存在相同的未删除关联 + let existing = patient_doctor_relation::Entity::find() + .filter(patient_doctor_relation::Column::TenantId.eq(tenant_id)) + .filter(patient_doctor_relation::Column::PatientId.eq(patient_id)) + .filter(patient_doctor_relation::Column::DoctorId.eq(doctor_id)) + .filter(patient_doctor_relation::Column::DeletedAt.is_null()) + .one(&state.db) + .await?; + if existing.is_some() { + return Err(HealthError::Validation("该医生已关联此患者".to_string())); + } + let now = Utc::now(); let active = patient_doctor_relation::ActiveModel { id: Set(Uuid::now_v7()), @@ -531,6 +557,7 @@ pub async fn assign_doctor( created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), + version: Set(1), }; active.insert(&state.db).await?; Ok(()) diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index 2f1a855..767b291 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -45,6 +45,7 @@ mod m20260423_000042_create_health_tables; mod m20260423_000043_create_wechat_users; mod m20260423_000044_create_articles; mod m20260424_000045_health_indexes; +mod m20260424_000046_health_constraints_fix; pub struct Migrator; @@ -97,6 +98,7 @@ impl MigratorTrait for Migrator { Box::new(m20260423_000043_create_wechat_users::Migration), Box::new(m20260423_000044_create_articles::Migration), Box::new(m20260424_000045_health_indexes::Migration), + Box::new(m20260424_000046_health_constraints_fix::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260424_000046_health_constraints_fix.rs b/crates/erp-server/migration/src/m20260424_000046_health_constraints_fix.rs new file mode 100644 index 0000000..81e9d5a --- /dev/null +++ b/crates/erp-server/migration/src/m20260424_000046_health_constraints_fix.rs @@ -0,0 +1,94 @@ +use sea_orm_migration::prelude::*; + +/// 迁移 000046: 修复唯一索引软删除兼容 + 关联表添加 version + 补充索引/FK +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let conn = manager.get_connection(); + + // C-4: patient.id_number 唯一索引 — 重建为 partial index WHERE deleted_at IS NULL + conn.execute_unprepared("DROP INDEX IF EXISTS idx_patient_tenant_id_number").await?; + conn.execute_unprepared( + "CREATE UNIQUE INDEX idx_patient_tenant_id_number ON patient (tenant_id, id_number) WHERE deleted_at IS NULL AND id_number IS NOT NULL" + ).await?; + + // C-5: patient_tag.name 唯一索引 — 重建为 partial index WHERE deleted_at IS NULL + conn.execute_unprepared("DROP INDEX IF EXISTS idx_patient_tag_tenant_name_unique").await?; + conn.execute_unprepared( + "CREATE UNIQUE INDEX idx_patient_tag_tenant_name_unique ON patient_tag (tenant_id, name) WHERE deleted_at IS NULL" + ).await?; + + // C-6: doctor_schedule 唯一索引 — 重建为 partial index,修正列选择为 (tenant_id, doctor_id, schedule_date, period_type) + conn.execute_unprepared("DROP INDEX IF EXISTS idx_doctor_schedule_unique_slot").await?; + conn.execute_unprepared( + "CREATE UNIQUE INDEX idx_doctor_schedule_unique_slot ON doctor_schedule (tenant_id, doctor_id, schedule_date, period_type) WHERE deleted_at IS NULL" + ).await?; + + // H-5: patient_tag_relation 添加 version 列 + conn.execute_unprepared( + "ALTER TABLE patient_tag_relation ADD COLUMN IF NOT EXISTS version integer NOT NULL DEFAULT 1" + ).await?; + + // H-5: patient_doctor_relation 添加 version 列 + conn.execute_unprepared( + "ALTER TABLE patient_doctor_relation ADD COLUMN IF NOT EXISTS version integer NOT NULL DEFAULT 1" + ).await?; + + // H-8: follow_up_task.related_appointment_id 添加 FK 约束 + conn.execute_unprepared( + "ALTER TABLE follow_up_task DROP CONSTRAINT IF EXISTS fk_follow_up_task_appointment" + ).await?; + conn.execute_unprepared( + "ALTER TABLE follow_up_task ADD CONSTRAINT fk_follow_up_task_appointment \ + FOREIGN KEY (related_appointment_id) REFERENCES appointment(id) ON DELETE SET NULL" + ).await?; + + // M-6: lab_report 添加 (tenant_id, report_type) 索引 + conn.execute_unprepared( + "CREATE INDEX IF NOT EXISTS idx_lab_report_tenant_type ON lab_report (tenant_id, report_type)" + ).await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let conn = manager.get_connection(); + + // 恢复原始索引(非 partial) + conn.execute_unprepared("DROP INDEX IF EXISTS idx_patient_tenant_id_number").await?; + conn.execute_unprepared( + "CREATE UNIQUE INDEX idx_patient_tenant_id_number ON patient (tenant_id, id_number)" + ).await?; + + conn.execute_unprepared("DROP INDEX IF EXISTS idx_patient_tag_tenant_name_unique").await?; + conn.execute_unprepared( + "CREATE UNIQUE INDEX idx_patient_tag_tenant_name_unique ON patient_tag (tenant_id, name)" + ).await?; + + conn.execute_unprepared("DROP INDEX IF EXISTS idx_doctor_schedule_unique_slot").await?; + conn.execute_unprepared( + "CREATE UNIQUE INDEX idx_doctor_schedule_unique_slot ON doctor_schedule (tenant_id, doctor_id, schedule_date, start_time)" + ).await?; + + conn.execute_unprepared( + "ALTER TABLE patient_tag_relation DROP COLUMN IF EXISTS version" + ).await?; + + conn.execute_unprepared( + "ALTER TABLE patient_doctor_relation DROP COLUMN IF EXISTS version" + ).await?; + + conn.execute_unprepared( + "ALTER TABLE follow_up_task DROP CONSTRAINT IF EXISTS fk_follow_up_task_appointment" + ).await?; + + conn.execute_unprepared( + "DROP INDEX IF EXISTS idx_lab_report_tenant_type" + ).await?; + + Ok(()) + } +}