Files
hms/crates/erp-health/src/service/consultation_service.rs
iven fa9278590d
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
refactor(dialysis): 透析模块拆分为独立 erp-dialysis crate
- 创建 erp-dialysis crate(DialysisState + DialysisError + DialysisModule)
- 迁移 2 Entity + 2 Service + 2 Handler + 2 DTO 共 8 个文件
- Entity 移除跨 crate patient Relation(FK 列保留)
- Service 内联 validation 逻辑,移除 patient 存在性检查(FK 约束保证)
- erp-health 的 stats/consultation 中 dialysis 查询改为 raw SQL
- ReviewLabReportReq 从 dialysis_dto 移至 health_data_dto(正确归属)
- workspace 全量编译通过
2026-04-28 12:37:23 +08:00

620 lines
23 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! 咨询管理 Service — 会话管理、消息收发、会话关闭、导出
use chrono::Utc;
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::events::DomainEvent;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, Condition, QueryOrder, QuerySelect, TransactionTrait};
use uuid::Uuid;
use erp_core::error::check_version;
use erp_core::types::PaginatedResponse;
use crate::dto::consultation_dto::*;
use crate::entity::{consultation_message, consultation_session, doctor_profile, patient};
use crate::error::{HealthError, HealthResult};
use crate::service::validation::{validate_sender_role, validate_content_type, validate_consultation_type};
use crate::state::HealthState;
use erp_core::crypto as pii;
// ---------------------------------------------------------------------------
// 咨询会话
// ---------------------------------------------------------------------------
fn model_to_session_resp(m: consultation_session::Model) -> SessionResp {
SessionResp {
id: m.id, patient_id: m.patient_id, doctor_id: m.doctor_id,
patient_name: None, doctor_name: None,
consultation_type: m.consultation_type, status: m.status,
last_message_at: m.last_message_at,
unread_count_patient: m.unread_count_patient,
unread_count_doctor: m.unread_count_doctor,
created_at: m.created_at, updated_at: m.updated_at,
version: m.version,
}
}
pub async fn create_session(
state: &HealthState,
tenant_id: Uuid,
operator_id: Option<Uuid>,
req: CreateSessionReq,
) -> HealthResult<SessionResp> {
let now = Utc::now();
// 校验患者存在
patient::Entity::find()
.filter(patient::Column::Id.eq(req.patient_id))
.filter(patient::Column::TenantId.eq(tenant_id))
.filter(patient::Column::DeletedAt.is_null())
.one(&state.db)
.await?
.ok_or(HealthError::PatientNotFound)?;
let consultation_type = req.consultation_type.unwrap_or_else(|| "customer_service".to_string());
validate_consultation_type(&consultation_type)?;
let active = consultation_session::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
patient_id: Set(req.patient_id),
doctor_id: Set(req.doctor_id),
consultation_type: Set(consultation_type),
status: Set("waiting".to_string()),
last_message_at: Set(None),
unread_count_patient: Set(0),
unread_count_doctor: Set(0),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version: Set(1),
};
let m = active.insert(&state.db).await?;
let event = DomainEvent::new(
crate::event::CONSULTATION_OPENED,
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id })),
);
state.event_bus.publish(event, &state.db).await;
audit_service::record(
AuditLog::new(tenant_id, operator_id, "consultation.opened", "consultation")
.with_resource_id(m.id),
&state.db,
).await;
Ok(model_to_session_resp(m))
}
/// 获取单个咨询会话
pub async fn get_session(
state: &HealthState,
tenant_id: Uuid,
session_id: Uuid,
) -> HealthResult<SessionResp> {
let model = consultation_session::Entity::find()
.filter(consultation_session::Column::Id.eq(session_id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::DeletedAt.is_null())
.one(&state.db)
.await?
.ok_or(HealthError::ConsultationNotFound)?;
Ok(model_to_session_resp(model))
}
pub async fn list_sessions(
state: &HealthState,
tenant_id: Uuid,
page: u64,
page_size: u64,
status: Option<String>,
patient_id: Option<Uuid>,
doctor_id: Option<Uuid>,
) -> HealthResult<PaginatedResponse<SessionResp>> {
let limit = page_size.min(100);
let offset = page.saturating_sub(1) * limit;
let mut query = consultation_session::Entity::find()
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::DeletedAt.is_null());
if let Some(ref s) = status { query = query.filter(consultation_session::Column::Status.eq(s)); }
if let Some(pid) = patient_id { query = query.filter(consultation_session::Column::PatientId.eq(pid)); }
if let Some(did) = doctor_id { query = query.filter(consultation_session::Column::DoctorId.eq(did)); }
let total = query.clone().count(&state.db).await?;
let models = query
.order_by_desc(consultation_session::Column::CreatedAt)
.offset(offset)
.limit(limit)
.all(&state.db)
.await?;
let total_pages = total.div_ceil(limit.max(1));
let data = models.into_iter().map(model_to_session_resp).collect();
Ok(PaginatedResponse { data, total, page, page_size: limit, total_pages })
}
pub async fn close_session(
state: &HealthState,
tenant_id: Uuid,
session_id: Uuid,
operator_id: Option<Uuid>,
expected_version: i32,
) -> HealthResult<SessionResp> {
let model = consultation_session::Entity::find()
.filter(consultation_session::Column::Id.eq(session_id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::DeletedAt.is_null())
.filter(
Condition::any()
.add(consultation_session::Column::Status.eq("active"))
.add(consultation_session::Column::Status.eq("waiting")),
)
.one(&state.db)
.await?
.ok_or(HealthError::ConsultationNotFound)?;
let next_ver = check_version(expected_version, model.version)
.map_err(|_| HealthError::VersionMismatch)?;
let mut active: consultation_session::ActiveModel = model.into();
active.status = Set("closed".to_string());
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
active.version = Set(next_ver);
let m = active.update(&state.db).await?;
let event = DomainEvent::new(
crate::event::CONSULTATION_CLOSED,
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id })),
);
state.event_bus.publish(event, &state.db).await;
audit_service::record(
AuditLog::new(tenant_id, operator_id, "consultation.closed", "consultation")
.with_resource_id(m.id),
&state.db,
).await;
Ok(model_to_session_resp(m))
}
pub async fn export_sessions(
state: &HealthState,
tenant_id: Uuid,
status: Option<String>,
patient_id: Option<Uuid>,
doctor_id: Option<Uuid>,
page: Option<u64>,
page_size: Option<u64>,
) -> HealthResult<PaginatedResponse<SessionResp>> {
let limit = page_size.unwrap_or(100).min(500);
let page_num = page.unwrap_or(1);
let offset = page_num.saturating_sub(1) * limit;
let mut query = consultation_session::Entity::find()
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::DeletedAt.is_null());
if let Some(ref s) = status { query = query.filter(consultation_session::Column::Status.eq(s)); }
if let Some(pid) = patient_id { query = query.filter(consultation_session::Column::PatientId.eq(pid)); }
if let Some(did) = doctor_id { query = query.filter(consultation_session::Column::DoctorId.eq(did)); }
let total = query.clone().count(&state.db).await?;
let models = query
.order_by_desc(consultation_session::Column::CreatedAt)
.offset(offset)
.limit(limit)
.all(&state.db)
.await?;
// 批量查询 patient_name 和 doctor_name
let patient_ids: std::collections::HashSet<Uuid> = models.iter().map(|m| m.patient_id).collect();
let doctor_ids: std::collections::HashSet<Uuid> = models.iter().filter_map(|m| m.doctor_id).collect();
let patient_names: std::collections::HashMap<Uuid, String> = if !patient_ids.is_empty() {
patient::Entity::find()
.filter(patient::Column::Id.is_in(patient_ids))
.filter(patient::Column::TenantId.eq(tenant_id))
.all(&state.db)
.await?
.into_iter()
.map(|p| (p.id, p.name))
.collect()
} else {
std::collections::HashMap::new()
};
let doctor_names: std::collections::HashMap<Uuid, String> = if !doctor_ids.is_empty() {
doctor_profile::Entity::find()
.filter(doctor_profile::Column::Id.is_in(doctor_ids))
.filter(doctor_profile::Column::TenantId.eq(tenant_id))
.all(&state.db)
.await?
.into_iter()
.map(|d| (d.id, d.name))
.collect()
} else {
std::collections::HashMap::new()
};
let total_pages = total.div_ceil(limit.max(1));
let data = models.into_iter().map(|m| {
let mut resp = model_to_session_resp(m.clone());
resp.patient_name = patient_names.get(&m.patient_id).cloned();
resp.doctor_name = m.doctor_id.and_then(|did| doctor_names.get(&did).cloned());
resp
}).collect();
Ok(PaginatedResponse { data, total, page: page_num, page_size: limit, total_pages })
}
// ---------------------------------------------------------------------------
// 咨询消息
// ---------------------------------------------------------------------------
pub async fn list_messages(
state: &HealthState,
tenant_id: Uuid,
session_id: Uuid,
page: u64,
page_size: u64,
after_id: Option<Uuid>,
) -> HealthResult<PaginatedResponse<MessageResp>> {
let limit = page_size.min(100);
let mut query = consultation_message::Entity::find()
.filter(consultation_message::Column::TenantId.eq(tenant_id))
.filter(consultation_message::Column::SessionId.eq(session_id))
.filter(consultation_message::Column::DeletedAt.is_null());
// after_id 模式:返回该 ID 之后的所有消息(用于轮询增量拉取)
if let Some(aid) = after_id {
query = query.filter(consultation_message::Column::Id.gt(aid));
}
let offset = page.saturating_sub(1) * limit;
let total = query.clone().count(&state.db).await?;
let total = query.clone().count(&state.db).await?;
let models = query
.order_by_asc(consultation_message::Column::CreatedAt)
.offset(offset)
.limit(limit)
.all(&state.db)
.await?;
let total_pages = total.div_ceil(limit.max(1));
let kek = state.crypto.kek();
let data = models.into_iter().map(|m| {
let content = pii::decrypt(kek, &m.content).unwrap_or(m.content);
MessageResp {
id: m.id, session_id: m.session_id, sender_id: m.sender_id,
sender_role: m.sender_role, content_type: m.content_type,
content, is_read: m.is_read, created_at: m.created_at,
}
}).collect();
Ok(PaginatedResponse { data, total, page, page_size: limit, total_pages })
}
pub async fn create_message(
state: &HealthState,
tenant_id: Uuid,
operator_id: Option<Uuid>,
req: CreateMessageReq,
) -> HealthResult<MessageResp> {
// 校验会话存在且状态为 active 或 waiting
let session = consultation_session::Entity::find()
.filter(consultation_session::Column::Id.eq(req.session_id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::DeletedAt.is_null())
.filter(
Condition::any()
.add(consultation_session::Column::Status.eq("active"))
.add(consultation_session::Column::Status.eq("waiting")),
)
.one(&state.db)
.await?
.ok_or(HealthError::ConsultationNotFound)?;
let now = Utc::now();
validate_sender_role(&req.sender_role)?;
let content_type = req.content_type.unwrap_or_else(|| "text".to_string());
validate_content_type(&content_type)?;
let is_patient = req.sender_role == "patient";
let should_activate = session.status == "waiting";
// 强制 sender_id 为认证用户,防止冒充
let sender_id = operator_id.ok_or_else(|| {
HealthError::Validation("sender_id 必须与认证用户匹配".into())
})?;
// 事务包裹:消息 INSERT + 会话 CAS 更新,保证原子性
let txn = state.db.begin().await?;
// 创建消息
let active = consultation_message::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
session_id: Set(req.session_id),
sender_id: Set(sender_id),
sender_role: Set(req.sender_role),
content_type: Set(content_type),
content: Set(pii::encrypt(state.crypto.kek(), &req.content)?),
is_read: Set(false),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version: Set(1),
key_version: Set(Some(1)),
};
let m = active.insert(&txn).await?;
// 更新会话的 last_message_at 和未读计数waiting→active 自动触发
// 使用 CAS 防止并发发消息时丢失 unread_count 更新
let expected_version = session.version;
let mut cas = consultation_session::Entity::update_many()
.col_expr(consultation_session::Column::LastMessageAt, Expr::value(Some(now)))
.col_expr(consultation_session::Column::UpdatedAt, Expr::value(now))
.col_expr(consultation_session::Column::Version, Expr::col(consultation_session::Column::Version).add(1))
.filter(consultation_session::Column::Id.eq(req.session_id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::Version.eq(expected_version));
if should_activate {
cas = cas.col_expr(consultation_session::Column::Status, Expr::value("active".to_string()));
}
if is_patient {
cas = cas.col_expr(
consultation_session::Column::UnreadCountDoctor,
Expr::col(consultation_session::Column::UnreadCountDoctor).add(1),
);
} else {
cas = cas.col_expr(
consultation_session::Column::UnreadCountPatient,
Expr::col(consultation_session::Column::UnreadCountPatient).add(1),
);
}
let cas_result = cas.exec(&txn).await?;
if cas_result.rows_affected == 0 {
txn.rollback().await?;
return Err(HealthError::VersionMismatch);
}
txn.commit().await?;
audit_service::record(
AuditLog::new(tenant_id, operator_id, "consultation.message_sent", "consultation")
.with_resource_id(m.id),
&state.db,
).await;
let decrypted_content = pii::decrypt(state.crypto.kek(), &m.content).unwrap_or(m.content);
Ok(MessageResp {
id: m.id, session_id: m.session_id, sender_id: m.sender_id,
sender_role: m.sender_role, content_type: m.content_type,
content: decrypted_content, is_read: m.is_read, created_at: m.created_at,
})
}
// ---------------------------------------------------------------------------
// 标记已读
// ---------------------------------------------------------------------------
/// 标记会话为已读(将 unread_count_doctor 或 unread_count_patient 置零)。
pub async fn mark_session_read(
state: &HealthState,
tenant_id: Uuid,
session_id: Uuid,
user_id: Uuid,
role: &str,
) -> HealthResult<()> {
let session = consultation_session::Entity::find()
.filter(consultation_session::Column::Id.eq(session_id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::DeletedAt.is_null())
.one(&state.db)
.await?
.ok_or(HealthError::ConsultationNotFound)?;
use sea_orm::sea_query::Expr;
let column = if role == "doctor" {
consultation_session::Column::UnreadCountDoctor
} else {
consultation_session::Column::UnreadCountPatient
};
let result = consultation_session::Entity::update_many()
.col_expr(column, Expr::value(0i32))
.filter(consultation_session::Column::Id.eq(session.id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::Version.eq(session.version))
.exec(&state.db)
.await?;
if result.rows_affected == 0 {
return Err(HealthError::VersionMismatch);
}
Ok(())
}
// ---------------------------------------------------------------------------
// 医生仪表盘
// ---------------------------------------------------------------------------
/// 医生仪表盘统计数据。
#[derive(Debug, serde::Serialize)]
pub struct DoctorDashboard {
pub total_patients: i64,
pub active_sessions: i64,
pub unread_messages: i64,
pub pending_follow_ups: i64,
pub today_consultations: i64,
pub pending_dialysis_review: i64,
pub pending_lab_review: i64,
pub today_appointments: i64,
}
/// 获取指定医生的仪表盘数据。
pub async fn get_doctor_dashboard(
state: &HealthState,
tenant_id: Uuid,
doctor_user_id: Uuid,
) -> HealthResult<DoctorDashboard> {
use crate::entity::{doctor_profile, patient_doctor_relation, follow_up_task};
use sea_orm::ColumnTrait;
use sea_orm::QueryFilter;
// 查找医生 profile
let doctor = doctor_profile::Entity::find()
.filter(doctor_profile::Column::UserId.eq(doctor_user_id))
.filter(doctor_profile::Column::TenantId.eq(tenant_id))
.filter(doctor_profile::Column::DeletedAt.is_null())
.one(&state.db)
.await?;
let doctor_id = match doctor {
Some(d) => d.id,
None => {
// 不是医生,返回空仪表盘
return Ok(DoctorDashboard {
total_patients: 0,
active_sessions: 0,
unread_messages: 0,
pending_follow_ups: 0,
today_consultations: 0,
pending_dialysis_review: 0,
pending_lab_review: 0,
today_appointments: 0,
});
}
};
// 关联患者数
let total_patients = patient_doctor_relation::Entity::find()
.filter(patient_doctor_relation::Column::DoctorId.eq(doctor_id))
.filter(patient_doctor_relation::Column::TenantId.eq(tenant_id))
.filter(patient_doctor_relation::Column::DeletedAt.is_null())
.count(&state.db)
.await
.map_err(|e| HealthError::DbError(e.to_string()))?;
// 活跃会话数
let active_sessions = consultation_session::Entity::find()
.filter(consultation_session::Column::DoctorId.eq(doctor_user_id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::Status.is_in(["active", "waiting"]))
.count(&state.db)
.await
.map_err(|e| HealthError::DbError(e.to_string()))?;
// 未读消息总数
use sea_orm::FromQueryResult;
#[derive(FromQueryResult)]
struct UnreadSum {
total: Option<i64>,
}
let unread_result = UnreadSum::find_by_statement(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"SELECT COALESCE(SUM(unread_count_doctor), 0)::bigint as total
FROM consultation_session
WHERE tenant_id = $1 AND doctor_id = $2 AND deleted_at IS NULL AND status IN ('active', 'waiting')"#,
[tenant_id.into(), doctor_user_id.into()],
))
.one(&state.db)
.await
.map_err(|e| HealthError::DbError(e.to_string()))?;
let unread_messages = unread_result.and_then(|r| r.total).unwrap_or(0);
// 待处理随访任务
let pending_follow_ups = follow_up_task::Entity::find()
.filter(follow_up_task::Column::AssignedTo.eq(doctor_user_id))
.filter(follow_up_task::Column::TenantId.eq(tenant_id))
.filter(follow_up_task::Column::Status.eq("pending"))
.filter(follow_up_task::Column::DeletedAt.is_null())
.count(&state.db)
.await
.map_err(|e| HealthError::DbError(e.to_string()))?;
// 今日咨询数
let today = chrono::Utc::now().date_naive();
let today_start = today.and_hms_opt(0, 0, 0).unwrap_or_default();
let today_consultations = consultation_session::Entity::find()
.filter(consultation_session::Column::DoctorId.eq(doctor_user_id))
.filter(consultation_session::Column::TenantId.eq(tenant_id))
.filter(consultation_session::Column::CreatedAt.gte(today_start))
.filter(consultation_session::Column::DeletedAt.is_null())
.count(&state.db)
.await
.map_err(|e| HealthError::DbError(e.to_string()))?;
Ok(DoctorDashboard {
total_patients: total_patients as i64,
active_sessions: active_sessions as i64,
unread_messages,
pending_follow_ups: pending_follow_ups as i64,
today_consultations: today_consultations as i64,
pending_dialysis_review: 0,
pending_lab_review: 0,
today_appointments: 0,
})
}
/// 补充医生仪表盘中的健康数据计数。
/// 由 consultation_service 调用,因为 DoctorDashboard 定义在此模块。
pub async fn enrich_doctor_dashboard_health(
state: &HealthState,
tenant_id: Uuid,
doctor_user_id: Uuid,
dashboard: &mut DoctorDashboard,
) -> HealthResult<()> {
use crate::entity::{lab_report, appointment};
use sea_orm::{FromQueryResult, Statement, DatabaseBackend};
// 待审核透析记录raw SQL — entity 已拆分到 erp-dialysis crate
#[derive(FromQueryResult)]
struct DialysisCount { count: i64 }
let pending_dialysis = DialysisCount::find_by_statement(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"SELECT COUNT(*)::int8 AS count FROM dialysis_record WHERE tenant_id = $1 AND deleted_at IS NULL AND status = 'draft'",
[tenant_id.into()],
)).one(&state.db).await?.map(|r| r.count).unwrap_or(0);
dashboard.pending_dialysis_review = pending_dialysis;
// 待审核化验报告
let pending_lab = lab_report::Entity::find()
.filter(lab_report::Column::TenantId.eq(tenant_id))
.filter(lab_report::Column::DeletedAt.is_null())
.filter(lab_report::Column::Status.eq("pending"))
.count(&state.db)
.await?;
dashboard.pending_lab_review = pending_lab as i64;
// 今日预约
let today = chrono::Utc::now().date_naive();
let today_appts = appointment::Entity::find()
.filter(appointment::Column::TenantId.eq(tenant_id))
.filter(appointment::Column::DeletedAt.is_null())
.filter(appointment::Column::AppointmentDate.eq(today))
.filter(appointment::Column::DoctorId.eq(doctor_user_id))
.filter(appointment::Column::Status.is_in(["confirmed", "pending"]))
.count(&state.db)
.await?;
dashboard.today_appointments = today_appts as i64;
Ok(())
}