feat(core): 审计日志哈希链 — prev_hash + record_hash + 完整性验证
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

- 迁移 087: audit_logs 表添加 prev_hash/record_hash 列 + 索引
- audit_service::record() 写入时查询前一条 record_hash 作为 prev_hash
- SHA256(id+action+resource_type+resource_id+created_at+prev_hash) 计算 record_hash
- verify_hash_chain() 验证链完整性,返回 (总记录数, 断链数)
This commit is contained in:
iven
2026-04-27 19:38:39 +08:00
parent 633bf8c62d
commit 22ef5b6d1f
4 changed files with 162 additions and 1 deletions

View File

@@ -1,7 +1,8 @@
use crate::audit::AuditLog;
use crate::entity::audit_log;
use crate::request_info::RequestInfo;
use sea_orm::{ActiveModelTrait, Set};
use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QueryOrder, Set};
use sha2::{Sha256, Digest};
use tracing;
/// 持久化审计日志到 audit_logs 表。
@@ -10,6 +11,9 @@ use tracing;
///
/// 自动从 task_local 读取当前请求的 IP 和 User-Agent
/// 如果 AuditLog 中已有 ip_address/user_agent 则不覆盖。
///
/// 哈希链:查询同租户最新一条记录的 record_hash 作为 prev_hash
/// 计算 SHA256(id + action + resource_type + resource_id + created_at + prev_hash) 作为 record_hash。
pub async fn record(mut log: AuditLog, db: &sea_orm::DatabaseConnection) {
// 自动填充请求来源信息(仅当调用方未显式设置时)
if log.ip_address.is_none() || log.user_agent.is_none() {
@@ -23,6 +27,20 @@ pub async fn record(mut log: AuditLog, db: &sea_orm::DatabaseConnection) {
}
}
// 查询同租户最新一条记录的 record_hash 作为 prev_hash
let prev_hash = audit_log::Entity::find()
.filter(audit_log::Column::TenantId.eq(log.tenant_id))
.filter(audit_log::Column::RecordHash.is_not_null())
.order_by_desc(audit_log::Column::CreatedAt)
.one(db)
.await
.ok()
.flatten()
.and_then(|m| m.record_hash);
// 计算当前记录的 record_hash
let record_hash = compute_record_hash(&log, prev_hash.as_deref());
let model = audit_log::ActiveModel {
id: Set(log.id),
tenant_id: Set(log.tenant_id),
@@ -35,9 +53,83 @@ pub async fn record(mut log: AuditLog, db: &sea_orm::DatabaseConnection) {
ip_address: Set(log.ip_address),
user_agent: Set(log.user_agent),
created_at: Set(log.created_at),
prev_hash: Set(prev_hash),
record_hash: Set(Some(record_hash)),
};
if let Err(e) = model.insert(db).await {
tracing::warn!(error = %e, "审计日志写入失败");
}
}
/// 计算 record_hash: SHA256(id + action + resource_type + resource_id + created_at + prev_hash)
fn compute_record_hash(log: &AuditLog, prev_hash: Option<&str>) -> String {
let mut hasher = Sha256::new();
hasher.update(log.id.to_string().as_bytes());
hasher.update(log.action.as_bytes());
hasher.update(log.resource_type.as_bytes());
hasher.update(
log.resource_id
.map(|id| id.to_string())
.unwrap_or_default()
.as_bytes(),
);
hasher.update(log.created_at.to_rfc3339().as_bytes());
hasher.update(prev_hash.unwrap_or("").as_bytes());
format!("{:x}", hasher.finalize())
}
/// 验证审计日志哈希链完整性。
///
/// 检查指定租户的所有含 record_hash 的日志记录,
/// 验证每条记录的 prev_hash 是否等于前一条的 record_hash
/// 以及 record_hash 是否可以重新计算验证。
///
/// 返回 (总记录数, 断链数)。
pub async fn verify_hash_chain(
db: &sea_orm::DatabaseConnection,
tenant_id: uuid::Uuid,
) -> Result<(usize, usize), sea_orm::DbErr> {
use sea_orm::QueryOrder;
let records = audit_log::Entity::find()
.filter(audit_log::Column::TenantId.eq(tenant_id))
.filter(audit_log::Column::RecordHash.is_not_null())
.order_by_asc(audit_log::Column::CreatedAt)
.all(db)
.await?;
let total = records.len();
let mut broken = 0;
let mut prev: Option<String> = None;
for record in &records {
// 验证 prev_hash 指向正确
if prev.as_deref() != record.prev_hash.as_deref() {
broken += 1;
}
// 验证 record_hash 可重算
let log = AuditLog {
id: record.id,
tenant_id: record.tenant_id,
user_id: record.user_id,
action: record.action.clone(),
resource_type: record.resource_type.clone(),
resource_id: record.resource_id,
old_value: record.old_value.clone(),
new_value: record.new_value.clone(),
ip_address: record.ip_address.clone(),
user_agent: record.user_agent.clone(),
created_at: record.created_at,
};
let expected = compute_record_hash(&log, record.prev_hash.as_deref());
if Some(expected.as_str()) != record.record_hash.as_deref() {
broken += 1;
}
prev = record.record_hash.clone();
}
Ok((total, broken))
}

View File

@@ -17,6 +17,10 @@ pub struct Model {
pub ip_address: Option<String>,
pub user_agent: Option<String>,
pub created_at: DateTimeUtc,
/// 哈希链 — 前一条记录的 record_hash
pub prev_hash: Option<String>,
/// 当前记录的哈希 SHA256(id + action + resource_type + resource_id + created_at + prev_hash)
pub record_hash: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -86,6 +86,7 @@ mod m20260427_000083_create_follow_up_template;
mod m20260427_000084_domain_events_cleanup;
mod m20260427_000085_processed_events;
mod m20260427_000086_enable_rls_all_tables;
mod m20260427_000087_audit_logs_hash_chain;
pub struct Migrator;
@@ -179,6 +180,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260427_000084_domain_events_cleanup::Migration),
Box::new(m20260427_000085_processed_events::Migration),
Box::new(m20260427_000086_enable_rls_all_tables::Migration),
Box::new(m20260427_000087_audit_logs_hash_chain::Migration),
]
}
}

View File

@@ -0,0 +1,63 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let conn = manager.get_connection();
conn.execute_unprepared(
"ALTER TABLE audit_logs ADD COLUMN IF NOT EXISTS prev_hash TEXT",
)
.await?;
conn.execute_unprepared(
"ALTER TABLE audit_logs ADD COLUMN IF NOT EXISTS record_hash TEXT",
)
.await?;
// 为 record_hash 创建索引(用于快速查找最新哈希)
conn.execute_unprepared(
"CREATE INDEX IF NOT EXISTS idx_audit_logs_record_hash
ON audit_logs (record_hash) WHERE record_hash IS NOT NULL",
)
.await?;
// 按 tenant_id + created_at DESC 查找最新哈希的索引
conn.execute_unprepared(
"CREATE INDEX IF NOT EXISTS idx_audit_logs_tenant_created
ON audit_logs (tenant_id, created_at DESC)",
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let conn = manager.get_connection();
conn.execute_unprepared(
"DROP INDEX IF EXISTS idx_audit_logs_tenant_created",
)
.await?;
conn.execute_unprepared(
"DROP INDEX IF EXISTS idx_audit_logs_record_hash",
)
.await?;
conn.execute_unprepared(
"ALTER TABLE audit_logs DROP COLUMN IF EXISTS record_hash",
)
.await?;
conn.execute_unprepared(
"ALTER TABLE audit_logs DROP COLUMN IF EXISTS prev_hash",
)
.await?;
Ok(())
}
}