From 3258acaa77fa79dfa54b2b048130a93de7f6fdc2 Mon Sep 17 00:00:00 2001 From: iven Date: Wed, 3 Jun 2026 15:45:36 +0800 Subject: [PATCH] =?UTF-8?q?perf(diary):=20sync=5Fservice=20=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E9=A2=84=E6=9F=A5=E8=AF=A2=20+=20=E4=BA=8B=E5=8A=A1?= =?UTF-8?q?=E5=8C=96=20=E2=80=94=208a-C02?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 按操作类型分组 create/update/delete - UPDATE/DELETE 目标单次 IN 查询替代逐条 find(消除 N+1) - 冲突前置检测: 从预取 HashMap 判断版本,再过滤有效操作 - 所有写操作在单个事务内完成(原子化) - 辅助函数改用 ConnectionTrait 泛型(兼容 DatabaseConnection 和 DatabaseTransaction) - 测试 80/80 通过 --- crates/erp-diary/src/service/sync_service.rs | 472 ++++++++++++------- 1 file changed, 290 insertions(+), 182 deletions(-) diff --git a/crates/erp-diary/src/service/sync_service.rs b/crates/erp-diary/src/service/sync_service.rs index 356e1aa..5bd1867 100644 --- a/crates/erp-diary/src/service/sync_service.rs +++ b/crates/erp-diary/src/service/sync_service.rs @@ -1,7 +1,17 @@ // 日记同步服务 — 版本号冲突检测 + 增量同步 +// +// 性能优化 (8a-C02): +// - 批量预查询: UPDATE/DELETE 目标一次性 IN 查询,替代逐条 find +// - 事务化: 所有写操作在单个事务内完成 +// - 冲突前置检测: 写入前从预取数据检测版本冲突 + +use std::collections::HashMap; use chrono::{DateTime, Utc}; -use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, Set}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DatabaseConnection, EntityTrait, + QueryFilter, Set, TransactionTrait, +}; use uuid::Uuid; use crate::dto::{ConflictInfo, SyncChange, SyncResp}; @@ -14,11 +24,12 @@ pub struct SyncService; impl SyncService { /// 同步:客户端提交变更,服务端返回服务端变更 + 冲突 /// - /// 流程: - /// 1. 逐条处理客户端变更(create/update/delete) - /// 2. 获取 last_sync_time 之后服务端更新的记录 - /// 3. 检测版本冲突 - /// 4. 返回 (server_changes, conflicts, sync_time) + /// 优化流程: + /// 1. 按 create/update/delete 分组 + /// 2. 批量预查询 update/delete 目标记录(单次 IN 查询) + /// 3. 从预取数据检测版本冲突,过滤出有效操作 + /// 4. 单事务执行所有写操作 + /// 5. 获取服务端变更 pub async fn sync( tenant_id: Uuid, user_id: Uuid, @@ -26,29 +37,107 @@ impl SyncService { client_changes: Vec, db: &DatabaseConnection, ) -> DiaryResult { - let mut conflicts = Vec::new(); + // 1. 分组: 按操作类型拆分 + let mut creates: Vec = Vec::new(); + let mut updates: Vec<(Uuid, i32, serde_json::Value)> = Vec::new(); + let mut deletes: Vec<(Uuid, i32)> = Vec::new(); - // 1. 处理客户端变更 for change in client_changes { - if let Err(e) = Self::apply_client_change(tenant_id, user_id, change, db).await { - // 版本冲突收集到冲突列表,其他错误直接返回 - match e { - DiaryError::VersionConflict { - local, - server, - } => { - conflicts.push(ConflictInfo { - journal_id: Uuid::nil(), // ID 在 apply_client_change 内部处理 - local_version: local, - server_version: server, - }); - } - _ => return Err(e), + match change { + SyncChange::CreateJournal { data } => creates.push(data), + SyncChange::UpdateJournal { id, version, data } => { + updates.push((id, version, data)); + } + SyncChange::DeleteJournal { id, version } => deletes.push((id, version)), + } + } + + // 2. 批量预查询: 一次性获取所有 update/delete 目标记录 + let ids_to_fetch: Vec = updates + .iter() + .map(|(id, _, _)| *id) + .chain(deletes.iter().map(|(id, _)| *id)) + .collect(); + + let existing_map: HashMap = if !ids_to_fetch.is_empty() { + journal_entry::Entity::find() + .filter(journal_entry::Column::Id.is_in(ids_to_fetch)) + .filter(journal_entry::Column::TenantId.eq(tenant_id)) + .filter(journal_entry::Column::DeletedAt.is_null()) + .all(db) + .await? + .into_iter() + .map(|m| (m.id, m)) + .collect() + } else { + HashMap::new() + }; + + // 3. 冲突前置检测: 从预取数据判断版本,分离冲突和有效操作 + let mut conflicts = Vec::new(); + let mut valid_updates: Vec<(journal_entry::Model, serde_json::Value)> = Vec::new(); + let mut valid_deletes: Vec<(journal_entry::Model, i32)> = Vec::new(); + + for (id, version, data) in updates { + match existing_map.get(&id) { + Some(existing) if existing.version == version => { + valid_updates.push((existing.clone(), data)); + } + Some(existing) => { + conflicts.push(ConflictInfo { + journal_id: id, + local_version: version, + server_version: existing.version, + }); + } + None => { + return Err(DiaryError::NotFound(format!("日记 {} 不存在", id))); } } } - // 2. 获取服务端变更(last_sync_time 之后更新的) + for (id, version) in deletes { + match existing_map.get(&id) { + Some(existing) if existing.version == version => { + valid_deletes.push((existing.clone(), version)); + } + Some(existing) => { + conflicts.push(ConflictInfo { + journal_id: id, + local_version: version, + server_version: existing.version, + }); + } + None => { + return Err(DiaryError::NotFound(format!("日记 {} 不存在", id))); + } + } + } + + // 4. 单事务执行所有写操作 + db.transaction::<_, (), DiaryError>(|txn| { + Box::pin(async move { + // 批量插入 creates + for data in creates { + Self::insert_journal(tenant_id, user_id, data, txn).await?; + } + + // 批量更新 valid_updates + for (existing, data) in valid_updates { + Self::apply_update(user_id, existing, data, txn).await?; + } + + // 批量软删除 valid_deletes + for (existing, version) in valid_deletes { + Self::apply_delete(user_id, existing, version, txn).await?; + } + + Ok(()) + }) + }) + .await?; + + // 5. 获取服务端变更(last_sync_time 之后更新的) let mut condition = Condition::all() .add(journal_entry::Column::TenantId.eq(tenant_id)) .add(journal_entry::Column::AuthorId.eq(user_id)); @@ -62,7 +151,6 @@ impl SyncService { .all(db) .await?; - // 3. 转换为 JSON 格式的服务端变更 let server_changes: Vec = server_records .iter() .map(|r| { @@ -82,7 +170,6 @@ impl SyncService { }) .collect(); - // 4. 返回同步结果 Ok(SyncResp { server_changes, conflicts, @@ -90,147 +177,120 @@ impl SyncService { }) } - /// 处理单条客户端变更 - async fn apply_client_change( + /// 插入新日记(事务内) + async fn insert_journal( tenant_id: Uuid, user_id: Uuid, - change: SyncChange, - db: &DatabaseConnection, + data: serde_json::Value, + db: &C, ) -> DiaryResult<()> { - use sea_orm::ActiveModelTrait; + let id = data + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()) + .unwrap_or_else(Uuid::now_v7); - match change { - SyncChange::CreateJournal { data } => { - // 客户端创建 — 直接插入 - let id = data - .get("id") + let title = data + .get("title") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let now = Utc::now(); + let model = journal_entry::ActiveModel { + id: Set(id), + tenant_id: Set(tenant_id), + author_id: Set(user_id), + class_id: Set(None), + title: Set(title), + date: Set( + data.get("date") .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()) - .unwrap_or_else(Uuid::now_v7); - - let title = data - .get("title") + .and_then(|s| s.parse().ok()) + .unwrap_or_else(|| now.date_naive()), + ), + mood: Set( + data.get("mood") .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); + .unwrap_or("happy") + .to_string(), + ), + weather: Set( + data.get("weather") + .and_then(|v| v.as_str()) + .unwrap_or("sunny") + .to_string(), + ), + tags: Set(data.get("tags").cloned()), + is_private: Set( + data.get("is_private") + .and_then(|v| v.as_bool()) + .unwrap_or(true), + ), + shared_to_class: Set(false), + assigned_topic_id: Set(None), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(user_id), + updated_by: Set(user_id), + deleted_at: Set(None), + version: Set(1), + }; + model.insert(db).await?; + Ok(()) + } - let now = Utc::now(); - let model = journal_entry::ActiveModel { - id: Set(id), - tenant_id: Set(tenant_id), - author_id: Set(user_id), - class_id: Set(None), - title: Set(title), - date: Set( - data.get("date") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse().ok()) - .unwrap_or_else(|| now.date_naive()), - ), - mood: Set( - data.get("mood") - .and_then(|v| v.as_str()) - .unwrap_or("happy") - .to_string(), - ), - weather: Set( - data.get("weather") - .and_then(|v| v.as_str()) - .unwrap_or("sunny") - .to_string(), - ), - tags: Set(data.get("tags").cloned()), - is_private: Set( - data.get("is_private") - .and_then(|v| v.as_bool()) - .unwrap_or(true), - ), - shared_to_class: Set(false), - assigned_topic_id: Set(None), - created_at: Set(now), - updated_at: Set(now), - created_by: Set(user_id), - updated_by: Set(user_id), - deleted_at: Set(None), - version: Set(1), - }; - model.insert(db).await?; - } - SyncChange::UpdateJournal { - id, - version, - data, - } => { - // 客户端更新 — 带版本检查 - let existing = journal_entry::Entity::find() - .filter(journal_entry::Column::Id.eq(id)) - .filter(journal_entry::Column::TenantId.eq(tenant_id)) - .filter(journal_entry::Column::DeletedAt.is_null()) - .one(db) - .await? - .ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?; + /// 应用更新(事务内,版本已预验证) + async fn apply_update( + user_id: Uuid, + existing: journal_entry::Model, + data: serde_json::Value, + db: &C, + ) -> DiaryResult<()> { + let now = Utc::now(); + let current_version = existing.version; + let mut active: journal_entry::ActiveModel = existing.into(); - if existing.version != version { - return Err(DiaryError::VersionConflict { - local: version, - server: existing.version, - }); - } - - let now = Utc::now(); - let mut active: journal_entry::ActiveModel = existing.into(); - - if let Some(title) = data.get("title").and_then(|v| v.as_str()) { - active.title = Set(title.to_string()); - } - if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) { - active.mood = Set(mood.to_string()); - } - if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) { - active.weather = Set(weather.to_string()); - } - if let Some(tags) = data.get("tags").cloned() { - active.tags = Set(Some(tags)); - } - if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) { - active.is_private = Set(is_private); - } - if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) { - active.shared_to_class = Set(shared); - } - - active.updated_at = Set(now); - active.updated_by = Set(user_id); - active.version = Set(version + 1); - active.update(db).await?; - } - SyncChange::DeleteJournal { id, version } => { - // 客户端删除 — 软删除带版本检查 - let existing = journal_entry::Entity::find() - .filter(journal_entry::Column::Id.eq(id)) - .filter(journal_entry::Column::TenantId.eq(tenant_id)) - .filter(journal_entry::Column::DeletedAt.is_null()) - .one(db) - .await? - .ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?; - - if existing.version != version { - return Err(DiaryError::VersionConflict { - local: version, - server: existing.version, - }); - } - - let now = Utc::now(); - let mut active: journal_entry::ActiveModel = existing.into(); - active.deleted_at = Set(Some(now)); - active.updated_at = Set(now); - active.updated_by = Set(user_id); - active.version = Set(version + 1); - active.update(db).await?; - } + if let Some(title) = data.get("title").and_then(|v| v.as_str()) { + active.title = Set(title.to_string()); + } + if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) { + active.mood = Set(mood.to_string()); + } + if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) { + active.weather = Set(weather.to_string()); + } + if let Some(tags) = data.get("tags").cloned() { + active.tags = Set(Some(tags)); + } + if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) { + active.is_private = Set(is_private); + } + if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) { + active.shared_to_class = Set(shared); } + active.updated_at = Set(now); + active.updated_by = Set(user_id); + active.version = Set(current_version + 1); + active.update(db).await?; + Ok(()) + } + + /// 应用软删除(事务内,版本已预验证) + async fn apply_delete( + user_id: Uuid, + existing: journal_entry::Model, + version: i32, + db: &C, + ) -> DiaryResult<()> { + let now = Utc::now(); + let mut active: journal_entry::ActiveModel = existing.into(); + active.deleted_at = Set(Some(now)); + active.updated_at = Set(now); + active.updated_by = Set(user_id); + active.version = Set(version + 1); + active.update(db).await?; Ok(()) } } @@ -276,7 +336,6 @@ mod tests { }); let change = SyncChange::CreateJournal { data: data.clone() }; - // 验证 match 可以正确提取 data match &change { SyncChange::CreateJournal { data } => { assert_eq!(data.get("title").unwrap().as_str().unwrap(), "我的日记"); @@ -344,43 +403,40 @@ mod tests { #[test] fn conflict_collection_pattern_mimics_sync_logic() { - // 模拟 sync 方法中 VersionConflict 收集到 conflicts 列表的行为 + // 模拟批量预查询后的冲突检测逻辑 let mut conflicts: Vec = Vec::new(); - let errors: Vec = vec![ - DiaryError::VersionConflict { - local: 1, - server: 3, - }, - DiaryError::VersionConflict { - local: 2, - server: 5, - }, - ]; + // 模拟预取的 existing_map: id → (version) + let id1 = uuid::Uuid::now_v7(); + let id2 = uuid::Uuid::now_v7(); + let existing: HashMap = [(id1, 3), (id2, 5)].into_iter().collect(); - for e in errors { - match e { - DiaryError::VersionConflict { local, server } => { + // 客户端变更 + let updates: Vec<(Uuid, i32)> = vec![(id1, 1), (id2, 5)]; + + for (id, local_ver) in updates { + match existing.get(&id) { + Some(&server_ver) if server_ver == local_ver => { + // 版本匹配 — 有效更新 + } + Some(&server_ver) => { conflicts.push(ConflictInfo { - journal_id: uuid::Uuid::nil(), - local_version: local, - server_version: server, + journal_id: id, + local_version: local_ver, + server_version: server_ver, }); } - _ => {} + None => {} } } - assert_eq!(conflicts.len(), 2); + assert_eq!(conflicts.len(), 1); assert_eq!(conflicts[0].local_version, 1); assert_eq!(conflicts[0].server_version, 3); - assert_eq!(conflicts[1].local_version, 2); - assert_eq!(conflicts[1].server_version, 5); } #[test] fn non_conflict_error_does_not_collect() { - // 验证非 VersionConflict 错误不会被收集到 conflicts 列表 let mut conflicts: Vec = Vec::new(); let error = DiaryError::NotFound("日记不存在".to_string()); @@ -392,11 +448,63 @@ mod tests { server_version: server, }); } - _ => { - // 其他错误不应收集 - } + _ => {} } assert!(conflicts.is_empty()); } + + #[test] + fn batch_partition_correctly_separates_types() { + let changes = vec![ + SyncChange::CreateJournal { + data: serde_json::json!({"title": "日记1"}), + }, + SyncChange::UpdateJournal { + id: uuid::Uuid::nil(), + version: 1, + data: serde_json::json!({"title": "更新"}), + }, + SyncChange::DeleteJournal { + id: uuid::Uuid::nil(), + version: 2, + }, + SyncChange::CreateJournal { + data: serde_json::json!({"title": "日记2"}), + }, + ]; + + let mut creates = Vec::new(); + let mut updates = Vec::new(); + let mut deletes = Vec::new(); + + for change in changes { + match change { + SyncChange::CreateJournal { data } => creates.push(data), + SyncChange::UpdateJournal { id, version, data } => { + updates.push((id, version, data)); + } + SyncChange::DeleteJournal { id, version } => deletes.push((id, version)), + } + } + + assert_eq!(creates.len(), 2); + assert_eq!(updates.len(), 1); + assert_eq!(deletes.len(), 1); + } + + #[test] + fn version_match_detection() { + // 模拟预取数据中的版本匹配 + let id = uuid::Uuid::now_v7(); + let existing: HashMap = [(id, 5)].into_iter().collect(); + + // 版本匹配 + let matched = existing.get(&id).map_or(false, |&sv| sv == 5); + assert!(matched); + + // 版本不匹配 + let mismatched = existing.get(&id).map_or(false, |&sv| sv == 3); + assert!(!mismatched); + } }