Phase 4 — Dead-letter 重试 + 内容推送 + 安全加固: - erp-core: retry_dead_letters() 定时重试 + PII payload 脱敏 - erp-core: audit_service 哈希链定时验证 + 写入失败告警 - erp-health: article.published 消费者匹配 patient_tag 推送消息 - erp-health: care_plan 事件消费者 (激活通知 + 完成积分) Phase 5 — 患者批量操作 + 咨询增强 + 护理事件: - patient: batch_import_patients + bind_by_phone + refer_patient - consultation: rate_session 满意度评价 (rating + feedback) - consent: patient_sign_consent 患者端签署 - validation: source 枚举 (7值) + relationship 枚举 (7值) + 12 单元测试 Phase 6 — 咨询文件上传 + AI 引用标注: - consultation_message: media_id 附件上传端点 - ai_suggestion: references JSONB + [ref:id] 格式引用标注 - AI system prompt 增加引用指令 + output_parser 提取逻辑 迁移: 000161 (media_id + references) + 000162 (rating + feedback) 集成测试: consultation/follow_up/pii_encryption 新字段同步修复 讨论文档: 2026-05-20-business-process-brainstorm.md (10域审核报告)
222 lines
7.3 KiB
Rust
222 lines
7.3 KiB
Rust
use crate::audit::AuditLog;
|
||
use crate::entity::audit_log;
|
||
use crate::request_info::RequestInfo;
|
||
use sea_orm::{
|
||
ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set,
|
||
};
|
||
use serde::{Deserialize, Serialize};
|
||
use sha2::{Digest, Sha256};
|
||
use tracing;
|
||
use uuid::Uuid;
|
||
|
||
/// 持久化审计日志到 audit_logs 表。
|
||
///
|
||
/// 使用 fire-and-forget 模式:失败仅记录 warning 日志,不影响业务操作。
|
||
///
|
||
/// 自动从 task_local 读取当前请求的 IP 和 User-Agent,
|
||
/// 如果 AuditLog 中已有 ip_address/user_agent 则不覆盖。
|
||
///
|
||
/// 哈希链:查询同租户最新一条记录的 record_hash 作为 prev_hash,
|
||
/// 计算 SHA256(id + action + resource_type + resource_id + created_at + prev_hash) 作为 record_hash。
|
||
pub async fn record(mut log: AuditLog, db: &sea_orm::DatabaseConnection) {
|
||
// 自动填充请求来源信息(仅当调用方未显式设置时)
|
||
if let Some(info) = RequestInfo::try_current() {
|
||
if log.ip_address.is_none() {
|
||
log.ip_address = info.ip_address;
|
||
}
|
||
if log.user_agent.is_none() {
|
||
log.user_agent = info.user_agent;
|
||
}
|
||
}
|
||
|
||
// 查询同租户最新一条记录的 record_hash 作为 prev_hash
|
||
let prev_hash = audit_log::Entity::find()
|
||
.filter(audit_log::Column::TenantId.eq(log.tenant_id))
|
||
.filter(audit_log::Column::RecordHash.is_not_null())
|
||
.order_by_desc(audit_log::Column::CreatedAt)
|
||
.one(db)
|
||
.await
|
||
.ok()
|
||
.flatten()
|
||
.and_then(|m| m.record_hash);
|
||
|
||
// 计算当前记录的 record_hash
|
||
let record_hash = compute_record_hash(&log, prev_hash.as_deref());
|
||
|
||
// 保存日志字段用于错误日志(model 构建会 move String 字段)
|
||
let err_tenant_id = log.tenant_id;
|
||
let err_action = log.action.clone();
|
||
let err_resource_type = log.resource_type.clone();
|
||
let err_resource_id = log.resource_id;
|
||
|
||
let model = audit_log::ActiveModel {
|
||
id: Set(log.id),
|
||
tenant_id: Set(log.tenant_id),
|
||
user_id: Set(log.user_id),
|
||
action: Set(log.action),
|
||
resource_type: Set(log.resource_type),
|
||
resource_id: Set(log.resource_id),
|
||
old_value: Set(log.old_value),
|
||
new_value: Set(log.new_value),
|
||
ip_address: Set(log.ip_address),
|
||
user_agent: Set(log.user_agent),
|
||
created_at: Set(log.created_at),
|
||
prev_hash: Set(prev_hash),
|
||
record_hash: Set(Some(record_hash)),
|
||
};
|
||
|
||
if let Err(e) = model.insert(db).await {
|
||
tracing::error!(
|
||
error = %e,
|
||
tenant_id = ?err_tenant_id,
|
||
action = %err_action,
|
||
resource_type = %err_resource_type,
|
||
resource_id = ?err_resource_id,
|
||
"审计日志写入失败 — 数据完整性风险"
|
||
);
|
||
}
|
||
}
|
||
|
||
/// 计算 record_hash: SHA256(id + action + resource_type + resource_id + created_at + prev_hash)
|
||
fn compute_record_hash(log: &AuditLog, prev_hash: Option<&str>) -> String {
|
||
let mut hasher = Sha256::new();
|
||
hasher.update(log.id.to_string().as_bytes());
|
||
hasher.update(log.action.as_bytes());
|
||
hasher.update(log.resource_type.as_bytes());
|
||
hasher.update(
|
||
log.resource_id
|
||
.map(|id| id.to_string())
|
||
.unwrap_or_default()
|
||
.as_bytes(),
|
||
);
|
||
hasher.update(log.created_at.to_rfc3339().as_bytes());
|
||
hasher.update(prev_hash.unwrap_or("").as_bytes());
|
||
format!("{:x}", hasher.finalize())
|
||
}
|
||
|
||
/// 验证审计日志哈希链完整性。
|
||
///
|
||
/// 检查指定租户的所有含 record_hash 的日志记录,
|
||
/// 验证每条记录的 prev_hash 是否等于前一条的 record_hash,
|
||
/// 以及 record_hash 是否可以重新计算验证。
|
||
///
|
||
/// 返回 (总记录数, 断链数)。
|
||
pub async fn verify_hash_chain(
|
||
db: &sea_orm::DatabaseConnection,
|
||
tenant_id: uuid::Uuid,
|
||
) -> Result<(usize, usize), sea_orm::DbErr> {
|
||
use sea_orm::QueryOrder;
|
||
|
||
let records = audit_log::Entity::find()
|
||
.filter(audit_log::Column::TenantId.eq(tenant_id))
|
||
.filter(audit_log::Column::RecordHash.is_not_null())
|
||
.order_by_asc(audit_log::Column::CreatedAt)
|
||
.all(db)
|
||
.await?;
|
||
|
||
let total = records.len();
|
||
let mut broken = 0;
|
||
let mut prev: Option<String> = None;
|
||
|
||
for record in &records {
|
||
// 验证 prev_hash 指向正确
|
||
if prev.as_deref() != record.prev_hash.as_deref() {
|
||
broken += 1;
|
||
}
|
||
|
||
// 验证 record_hash 可重算
|
||
let log = AuditLog {
|
||
id: record.id,
|
||
tenant_id: record.tenant_id,
|
||
user_id: record.user_id,
|
||
action: record.action.clone(),
|
||
resource_type: record.resource_type.clone(),
|
||
resource_id: record.resource_id,
|
||
old_value: record.old_value.clone(),
|
||
new_value: record.new_value.clone(),
|
||
ip_address: record.ip_address.clone(),
|
||
user_agent: record.user_agent.clone(),
|
||
created_at: record.created_at,
|
||
};
|
||
let expected = compute_record_hash(&log, record.prev_hash.as_deref());
|
||
if Some(expected.as_str()) != record.record_hash.as_deref() {
|
||
broken += 1;
|
||
}
|
||
|
||
prev = record.record_hash.clone();
|
||
}
|
||
|
||
Ok((total, broken))
|
||
}
|
||
|
||
/// 哈希链验证结果
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct ChainVerificationResult {
|
||
pub total: usize,
|
||
pub passed: usize,
|
||
pub failed: usize,
|
||
pub failed_ids: Vec<Uuid>,
|
||
}
|
||
|
||
/// 验证最近 N 条审计记录的哈希链完整性。
|
||
pub async fn verify_recent_chain(
|
||
db: &sea_orm::DatabaseConnection,
|
||
tenant_id: Uuid,
|
||
limit: u64,
|
||
) -> Result<ChainVerificationResult, String> {
|
||
let records = audit_log::Entity::find()
|
||
.filter(audit_log::Column::TenantId.eq(tenant_id))
|
||
.filter(audit_log::Column::RecordHash.is_not_null())
|
||
.order_by_desc(audit_log::Column::CreatedAt)
|
||
.limit(limit)
|
||
.all(db)
|
||
.await
|
||
.map_err(|e| format!("查询审计日志失败: {}", e))?;
|
||
|
||
let mut records = records;
|
||
records.sort_by(|a, b| a.created_at.cmp(&b.created_at));
|
||
|
||
let total = records.len();
|
||
let mut passed = 0;
|
||
let mut failed_ids = Vec::new();
|
||
let mut prev: Option<String> = None;
|
||
|
||
for record in &records {
|
||
let mut record_broken = false;
|
||
if prev.as_deref() != record.prev_hash.as_deref() {
|
||
record_broken = true;
|
||
}
|
||
let log = AuditLog {
|
||
id: record.id,
|
||
tenant_id: record.tenant_id,
|
||
user_id: record.user_id,
|
||
action: record.action.clone(),
|
||
resource_type: record.resource_type.clone(),
|
||
resource_id: record.resource_id,
|
||
old_value: record.old_value.clone(),
|
||
new_value: record.new_value.clone(),
|
||
ip_address: record.ip_address.clone(),
|
||
user_agent: record.user_agent.clone(),
|
||
created_at: record.created_at,
|
||
};
|
||
let expected = compute_record_hash(&log, record.prev_hash.as_deref());
|
||
if Some(expected.as_str()) != record.record_hash.as_deref() {
|
||
record_broken = true;
|
||
}
|
||
if record_broken {
|
||
failed_ids.push(record.id);
|
||
} else {
|
||
passed += 1;
|
||
}
|
||
prev = record.record_hash.clone();
|
||
}
|
||
|
||
let failed = total - passed;
|
||
Ok(ChainVerificationResult {
|
||
total,
|
||
passed,
|
||
failed,
|
||
failed_ids,
|
||
})
|
||
}
|