perf(diary): sync_service 批量预查询 + 事务化 — 8a-C02

- 按操作类型分组 create/update/delete
- UPDATE/DELETE 目标单次 IN 查询替代逐条 find(消除 N+1)
- 冲突前置检测: 从预取 HashMap 判断版本,再过滤有效操作
- 所有写操作在单个事务内完成(原子化)
- 辅助函数改用 ConnectionTrait 泛型(兼容 DatabaseConnection 和 DatabaseTransaction)
- 测试 80/80 通过
This commit is contained in:
iven
2026-06-03 15:45:36 +08:00
parent 0c9ada242a
commit 3258acaa77

View File

@@ -1,7 +1,17 @@
// 日记同步服务 — 版本号冲突检测 + 增量同步 // 日记同步服务 — 版本号冲突检测 + 增量同步
//
// 性能优化 (8a-C02):
// - 批量预查询: UPDATE/DELETE 目标一次性 IN 查询,替代逐条 find
// - 事务化: 所有写操作在单个事务内完成
// - 冲突前置检测: 写入前从预取数据检测版本冲突
use std::collections::HashMap;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, Set}; use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DatabaseConnection, EntityTrait,
QueryFilter, Set, TransactionTrait,
};
use uuid::Uuid; use uuid::Uuid;
use crate::dto::{ConflictInfo, SyncChange, SyncResp}; use crate::dto::{ConflictInfo, SyncChange, SyncResp};
@@ -14,11 +24,12 @@ pub struct SyncService;
impl SyncService { impl SyncService {
/// 同步:客户端提交变更,服务端返回服务端变更 + 冲突 /// 同步:客户端提交变更,服务端返回服务端变更 + 冲突
/// ///
/// 流程: /// 优化流程:
/// 1. 逐条处理客户端变更(create/update/delete /// 1. create/update/delete 分组
/// 2. 获取 last_sync_time 之后服务端更新的记录 /// 2. 批量预查询 update/delete 目标记录(单次 IN 查询)
/// 3. 检测版本冲突 /// 3. 从预取数据检测版本冲突,过滤出有效操作
/// 4. 返回 (server_changes, conflicts, sync_time) /// 4. 单事务执行所有写操作
/// 5. 获取服务端变更
pub async fn sync( pub async fn sync(
tenant_id: Uuid, tenant_id: Uuid,
user_id: Uuid, user_id: Uuid,
@@ -26,29 +37,107 @@ impl SyncService {
client_changes: Vec<SyncChange>, client_changes: Vec<SyncChange>,
db: &DatabaseConnection, db: &DatabaseConnection,
) -> DiaryResult<SyncResp> { ) -> DiaryResult<SyncResp> {
let mut conflicts = Vec::new(); // 1. 分组: 按操作类型拆分
let mut creates: Vec<serde_json::Value> = Vec::new();
let mut updates: Vec<(Uuid, i32, serde_json::Value)> = Vec::new();
let mut deletes: Vec<(Uuid, i32)> = Vec::new();
// 1. 处理客户端变更
for change in client_changes { for change in client_changes {
if let Err(e) = Self::apply_client_change(tenant_id, user_id, change, db).await { match change {
// 版本冲突收集到冲突列表,其他错误直接返回 SyncChange::CreateJournal { data } => creates.push(data),
match e { SyncChange::UpdateJournal { id, version, data } => {
DiaryError::VersionConflict { updates.push((id, version, data));
local, }
server, SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
} => { }
conflicts.push(ConflictInfo { }
journal_id: Uuid::nil(), // ID 在 apply_client_change 内部处理
local_version: local, // 2. 批量预查询: 一次性获取所有 update/delete 目标记录
server_version: server, let ids_to_fetch: Vec<Uuid> = updates
}); .iter()
} .map(|(id, _, _)| *id)
_ => return Err(e), .chain(deletes.iter().map(|(id, _)| *id))
.collect();
let existing_map: HashMap<Uuid, journal_entry::Model> = if !ids_to_fetch.is_empty() {
journal_entry::Entity::find()
.filter(journal_entry::Column::Id.is_in(ids_to_fetch))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?
.into_iter()
.map(|m| (m.id, m))
.collect()
} else {
HashMap::new()
};
// 3. 冲突前置检测: 从预取数据判断版本,分离冲突和有效操作
let mut conflicts = Vec::new();
let mut valid_updates: Vec<(journal_entry::Model, serde_json::Value)> = Vec::new();
let mut valid_deletes: Vec<(journal_entry::Model, i32)> = Vec::new();
for (id, version, data) in updates {
match existing_map.get(&id) {
Some(existing) if existing.version == version => {
valid_updates.push((existing.clone(), data));
}
Some(existing) => {
conflicts.push(ConflictInfo {
journal_id: id,
local_version: version,
server_version: existing.version,
});
}
None => {
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
} }
} }
} }
// 2. 获取服务端变更last_sync_time 之后更新的) for (id, version) in deletes {
match existing_map.get(&id) {
Some(existing) if existing.version == version => {
valid_deletes.push((existing.clone(), version));
}
Some(existing) => {
conflicts.push(ConflictInfo {
journal_id: id,
local_version: version,
server_version: existing.version,
});
}
None => {
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
}
}
}
// 4. 单事务执行所有写操作
db.transaction::<_, (), DiaryError>(|txn| {
Box::pin(async move {
// 批量插入 creates
for data in creates {
Self::insert_journal(tenant_id, user_id, data, txn).await?;
}
// 批量更新 valid_updates
for (existing, data) in valid_updates {
Self::apply_update(user_id, existing, data, txn).await?;
}
// 批量软删除 valid_deletes
for (existing, version) in valid_deletes {
Self::apply_delete(user_id, existing, version, txn).await?;
}
Ok(())
})
})
.await?;
// 5. 获取服务端变更last_sync_time 之后更新的)
let mut condition = Condition::all() let mut condition = Condition::all()
.add(journal_entry::Column::TenantId.eq(tenant_id)) .add(journal_entry::Column::TenantId.eq(tenant_id))
.add(journal_entry::Column::AuthorId.eq(user_id)); .add(journal_entry::Column::AuthorId.eq(user_id));
@@ -62,7 +151,6 @@ impl SyncService {
.all(db) .all(db)
.await?; .await?;
// 3. 转换为 JSON 格式的服务端变更
let server_changes: Vec<serde_json::Value> = server_records let server_changes: Vec<serde_json::Value> = server_records
.iter() .iter()
.map(|r| { .map(|r| {
@@ -82,7 +170,6 @@ impl SyncService {
}) })
.collect(); .collect();
// 4. 返回同步结果
Ok(SyncResp { Ok(SyncResp {
server_changes, server_changes,
conflicts, conflicts,
@@ -90,147 +177,120 @@ impl SyncService {
}) })
} }
/// 处理单条客户端变更 /// 插入新日记(事务内)
async fn apply_client_change( async fn insert_journal<C: ConnectionTrait>(
tenant_id: Uuid, tenant_id: Uuid,
user_id: Uuid, user_id: Uuid,
change: SyncChange, data: serde_json::Value,
db: &DatabaseConnection, db: &C,
) -> DiaryResult<()> { ) -> DiaryResult<()> {
use sea_orm::ActiveModelTrait; let id = data
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.unwrap_or_else(Uuid::now_v7);
match change { let title = data
SyncChange::CreateJournal { data } => { .get("title")
// 客户端创建 — 直接插入 .and_then(|v| v.as_str())
let id = data .unwrap_or("")
.get("id") .to_string();
let now = Utc::now();
let model = journal_entry::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
author_id: Set(user_id),
class_id: Set(None),
title: Set(title),
date: Set(
data.get("date")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok()) .and_then(|s| s.parse().ok())
.unwrap_or_else(Uuid::now_v7); .unwrap_or_else(|| now.date_naive()),
),
let title = data mood: Set(
.get("title") data.get("mood")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.unwrap_or("") .unwrap_or("happy")
.to_string(); .to_string(),
),
weather: Set(
data.get("weather")
.and_then(|v| v.as_str())
.unwrap_or("sunny")
.to_string(),
),
tags: Set(data.get("tags").cloned()),
is_private: Set(
data.get("is_private")
.and_then(|v| v.as_bool())
.unwrap_or(true),
),
shared_to_class: Set(false),
assigned_topic_id: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
version: Set(1),
};
model.insert(db).await?;
Ok(())
}
let now = Utc::now(); /// 应用更新(事务内,版本已预验证)
let model = journal_entry::ActiveModel { async fn apply_update<C: ConnectionTrait>(
id: Set(id), user_id: Uuid,
tenant_id: Set(tenant_id), existing: journal_entry::Model,
author_id: Set(user_id), data: serde_json::Value,
class_id: Set(None), db: &C,
title: Set(title), ) -> DiaryResult<()> {
date: Set( let now = Utc::now();
data.get("date") let current_version = existing.version;
.and_then(|v| v.as_str()) let mut active: journal_entry::ActiveModel = existing.into();
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| now.date_naive()),
),
mood: Set(
data.get("mood")
.and_then(|v| v.as_str())
.unwrap_or("happy")
.to_string(),
),
weather: Set(
data.get("weather")
.and_then(|v| v.as_str())
.unwrap_or("sunny")
.to_string(),
),
tags: Set(data.get("tags").cloned()),
is_private: Set(
data.get("is_private")
.and_then(|v| v.as_bool())
.unwrap_or(true),
),
shared_to_class: Set(false),
assigned_topic_id: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
version: Set(1),
};
model.insert(db).await?;
}
SyncChange::UpdateJournal {
id,
version,
data,
} => {
// 客户端更新 — 带版本检查
let existing = journal_entry::Entity::find()
.filter(journal_entry::Column::Id.eq(id))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
if existing.version != version { if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
return Err(DiaryError::VersionConflict { active.title = Set(title.to_string());
local: version, }
server: existing.version, if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) {
}); active.mood = Set(mood.to_string());
} }
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) {
let now = Utc::now(); active.weather = Set(weather.to_string());
let mut active: journal_entry::ActiveModel = existing.into(); }
if let Some(tags) = data.get("tags").cloned() {
if let Some(title) = data.get("title").and_then(|v| v.as_str()) { active.tags = Set(Some(tags));
active.title = Set(title.to_string()); }
} if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) { active.is_private = Set(is_private);
active.mood = Set(mood.to_string()); }
} if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) { active.shared_to_class = Set(shared);
active.weather = Set(weather.to_string());
}
if let Some(tags) = data.get("tags").cloned() {
active.tags = Set(Some(tags));
}
if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
active.is_private = Set(is_private);
}
if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
active.shared_to_class = Set(shared);
}
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
}
SyncChange::DeleteJournal { id, version } => {
// 客户端删除 — 软删除带版本检查
let existing = journal_entry::Entity::find()
.filter(journal_entry::Column::Id.eq(id))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
if existing.version != version {
return Err(DiaryError::VersionConflict {
local: version,
server: existing.version,
});
}
let now = Utc::now();
let mut active: journal_entry::ActiveModel = existing.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
}
} }
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(current_version + 1);
active.update(db).await?;
Ok(())
}
/// 应用软删除(事务内,版本已预验证)
async fn apply_delete<C: ConnectionTrait>(
user_id: Uuid,
existing: journal_entry::Model,
version: i32,
db: &C,
) -> DiaryResult<()> {
let now = Utc::now();
let mut active: journal_entry::ActiveModel = existing.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
Ok(()) Ok(())
} }
} }
@@ -276,7 +336,6 @@ mod tests {
}); });
let change = SyncChange::CreateJournal { data: data.clone() }; let change = SyncChange::CreateJournal { data: data.clone() };
// 验证 match 可以正确提取 data
match &change { match &change {
SyncChange::CreateJournal { data } => { SyncChange::CreateJournal { data } => {
assert_eq!(data.get("title").unwrap().as_str().unwrap(), "我的日记"); assert_eq!(data.get("title").unwrap().as_str().unwrap(), "我的日记");
@@ -344,43 +403,40 @@ mod tests {
#[test] #[test]
fn conflict_collection_pattern_mimics_sync_logic() { fn conflict_collection_pattern_mimics_sync_logic() {
// 模拟 sync 方法中 VersionConflict 收集到 conflicts 列表的行为 // 模拟批量预查询后的冲突检测逻辑
let mut conflicts: Vec<ConflictInfo> = Vec::new(); let mut conflicts: Vec<ConflictInfo> = Vec::new();
let errors: Vec<DiaryError> = vec![ // 模拟预取的 existing_map: id → (version)
DiaryError::VersionConflict { let id1 = uuid::Uuid::now_v7();
local: 1, let id2 = uuid::Uuid::now_v7();
server: 3, let existing: HashMap<Uuid, i32> = [(id1, 3), (id2, 5)].into_iter().collect();
},
DiaryError::VersionConflict {
local: 2,
server: 5,
},
];
for e in errors { // 客户端变更
match e { let updates: Vec<(Uuid, i32)> = vec![(id1, 1), (id2, 5)];
DiaryError::VersionConflict { local, server } => {
for (id, local_ver) in updates {
match existing.get(&id) {
Some(&server_ver) if server_ver == local_ver => {
// 版本匹配 — 有效更新
}
Some(&server_ver) => {
conflicts.push(ConflictInfo { conflicts.push(ConflictInfo {
journal_id: uuid::Uuid::nil(), journal_id: id,
local_version: local, local_version: local_ver,
server_version: server, server_version: server_ver,
}); });
} }
_ => {} None => {}
} }
} }
assert_eq!(conflicts.len(), 2); assert_eq!(conflicts.len(), 1);
assert_eq!(conflicts[0].local_version, 1); assert_eq!(conflicts[0].local_version, 1);
assert_eq!(conflicts[0].server_version, 3); assert_eq!(conflicts[0].server_version, 3);
assert_eq!(conflicts[1].local_version, 2);
assert_eq!(conflicts[1].server_version, 5);
} }
#[test] #[test]
fn non_conflict_error_does_not_collect() { fn non_conflict_error_does_not_collect() {
// 验证非 VersionConflict 错误不会被收集到 conflicts 列表
let mut conflicts: Vec<ConflictInfo> = Vec::new(); let mut conflicts: Vec<ConflictInfo> = Vec::new();
let error = DiaryError::NotFound("日记不存在".to_string()); let error = DiaryError::NotFound("日记不存在".to_string());
@@ -392,11 +448,63 @@ mod tests {
server_version: server, server_version: server,
}); });
} }
_ => { _ => {}
// 其他错误不应收集
}
} }
assert!(conflicts.is_empty()); assert!(conflicts.is_empty());
} }
#[test]
fn batch_partition_correctly_separates_types() {
let changes = vec![
SyncChange::CreateJournal {
data: serde_json::json!({"title": "日记1"}),
},
SyncChange::UpdateJournal {
id: uuid::Uuid::nil(),
version: 1,
data: serde_json::json!({"title": "更新"}),
},
SyncChange::DeleteJournal {
id: uuid::Uuid::nil(),
version: 2,
},
SyncChange::CreateJournal {
data: serde_json::json!({"title": "日记2"}),
},
];
let mut creates = Vec::new();
let mut updates = Vec::new();
let mut deletes = Vec::new();
for change in changes {
match change {
SyncChange::CreateJournal { data } => creates.push(data),
SyncChange::UpdateJournal { id, version, data } => {
updates.push((id, version, data));
}
SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
}
}
assert_eq!(creates.len(), 2);
assert_eq!(updates.len(), 1);
assert_eq!(deletes.len(), 1);
}
#[test]
fn version_match_detection() {
// 模拟预取数据中的版本匹配
let id = uuid::Uuid::now_v7();
let existing: HashMap<Uuid, i32> = [(id, 5)].into_iter().collect();
// 版本匹配
let matched = existing.get(&id).map_or(false, |&sv| sv == 5);
assert!(matched);
// 版本不匹配
let mismatched = existing.get(&id).map_or(false, |&sv| sv == 3);
assert!(!mismatched);
}
} }