Compare commits

...

4 Commits

Author SHA1 Message Date
iven
b6ffc60331 perf(diary): sticker_service 批量 GROUP BY 替代 N+1 贴纸计数 — 8a-C04
Some checks failed
Main Merge / backend (push) Has been cancelled
Main Merge / frontend (push) Has been cancelled
- list_sticker_packs: 单次 SQL GROUP BY pack_id 获取所有计数
- 2 次查询(packs + counts)替代 N+1 次
- 使用 PostgreSQL ANY() 传递 UUID 数组
- 测试 80/80 通过
2026-06-03 15:51:05 +08:00
iven
4e5c1287a6 perf(diary): parent_service 批量软删除替代逐条 UPDATE — 8a-C03
- delete_child_data 改用单条 SQL UPDATE ... WHERE 批量软删除
- 1 次 SQL 替代 N 次逐条 UPDATE(从 O(N) 降到 O(1) 查询)
- 移除不再需要的 TransactionTrait 导入
- 测试 80/80 通过
2026-06-03 15:48:29 +08:00
iven
3258acaa77 perf(diary): sync_service 批量预查询 + 事务化 — 8a-C02
- 按操作类型分组 create/update/delete
- UPDATE/DELETE 目标单次 IN 查询替代逐条 find(消除 N+1)
- 冲突前置检测: 从预取 HashMap 判断版本,再过滤有效操作
- 所有写操作在单个事务内完成(原子化)
- 辅助函数改用 ConnectionTrait 泛型(兼容 DatabaseConnection 和 DatabaseTransaction)
- 测试 80/80 通过
2026-06-03 15:45:36 +08:00
iven
0c9ada242a perf(diary): mood_stats 改用 SQL GROUP BY 替代全量加载 — 8a-C01
- get_mood_stats: SELECT mood, COUNT(*) GROUP BY 替代 all() + Rust 迭代
- calculate_streak: 仅查 date 列 + DISTINCT + 366天窗口裁剪
- 新增 mood_counts_map_aggregation 单元测试
- 测试 78/78 通过
2026-06-03 15:37:09 +08:00
4 changed files with 455 additions and 271 deletions

View File

@@ -1,12 +1,15 @@
// 心情统计服务 — 心情趋势与连续天数 // 心情统计服务 — 心情趋势与连续天数
//
// 性能优化 (8a-C01):
// - mood_counts: SQL GROUP BY 替代全量加载 + Rust 迭代
// - streak: 仅查 date 列 + DISTINCT + 时间窗口裁剪,避免加载所有日记字段
use chrono::{Duration, NaiveDate, Utc}; use chrono::{Duration, NaiveDate, Utc};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use sea_orm::{ConnectionTrait, DatabaseConnection};
use serde::Deserialize; use serde::Deserialize;
use uuid::Uuid; use uuid::Uuid;
use crate::dto::{Mood, MoodCount, MoodStatsResp}; use crate::dto::{Mood, MoodCount, MoodStatsResp};
use crate::entity::journal_entry;
use crate::error::DiaryResult; use crate::error::DiaryResult;
/// 统计查询范围 /// 统计查询范围
@@ -37,8 +40,8 @@ pub struct MoodStatsService;
impl MoodStatsService { impl MoodStatsService {
/// 获取心情统计 /// 获取心情统计
/// ///
/// 统计指定时间范围内各心情出现次数、连续写日记天数、 /// 使用 SQL GROUP BY 聚合,避免全量加载日记到内存。
/// 最常用心情等数据 /// 性能: O(mood_types) 而非 O(total_journals)
pub async fn get_mood_stats( pub async fn get_mood_stats(
tenant_id: Uuid, tenant_id: Uuid,
user_id: Uuid, user_id: Uuid,
@@ -47,24 +50,34 @@ impl MoodStatsService {
) -> DiaryResult<MoodStatsResp> { ) -> DiaryResult<MoodStatsResp> {
let since_date = (Utc::now() - Duration::days(period.days())).date_naive(); let since_date = (Utc::now() - Duration::days(period.days())).date_naive();
// 查询时间范围内的日记 // SQL GROUP BY — 一次查询获取所有心情计数(替代全量加载)
let journals = journal_entry::Entity::find() let sql = r#"
.filter(journal_entry::Column::TenantId.eq(tenant_id)) SELECT mood, COUNT(*) AS count
.filter(journal_entry::Column::AuthorId.eq(user_id)) FROM journal_entry
.filter(journal_entry::Column::Date.gte(since_date)) WHERE tenant_id = $1
.filter(journal_entry::Column::DeletedAt.is_null()) AND author_id = $2
.all(db) AND date >= $3
.await?; AND deleted_at IS NULL
GROUP BY mood
"#;
let total_journals = journals.len() as i32; let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[tenant_id.into(), user_id.into(), since_date.into()],
);
// 计算各心情出现次数 let rows = db.query_all(stmt).await?;
let mut mood_counts_map: std::collections::HashMap<String, i32> =
let mut mood_counts_map: std::collections::HashMap<String, i64> =
std::collections::HashMap::new(); std::collections::HashMap::new();
for journal in &journals { let mut total_journals: i64 = 0;
*mood_counts_map
.entry(journal.mood.clone()) for row in rows {
.or_insert(0) += 1; let mood: String = row.try_get_by_index::<String>(0)?;
let count: i64 = row.try_get_by_index::<i64>(1)?;
total_journals += count;
mood_counts_map.insert(mood, count);
} }
let mood_counts: Vec<MoodCount> = mood_counts_map let mood_counts: Vec<MoodCount> = mood_counts_map
@@ -77,53 +90,64 @@ impl MoodStatsService {
}; };
MoodCount { MoodCount {
mood: parse_mood(mood), mood: parse_mood(mood),
count, count: count as i32,
percentage, percentage,
} }
}) })
.collect(); .collect();
// 查找最常用心情
let dominant_mood = mood_counts let dominant_mood = mood_counts
.iter() .iter()
.max_by_key(|mc| mc.count) .max_by_key(|mc| mc.count)
.map(|mc| mc.mood.clone()); .map(|mc| mc.mood.clone());
// 计算连续写日记天数
let streak_days = Self::calculate_streak(tenant_id, user_id, db).await?; let streak_days = Self::calculate_streak(tenant_id, user_id, db).await?;
Ok(MoodStatsResp { Ok(MoodStatsResp {
mood_counts, mood_counts,
streak_days, streak_days,
total_journals, total_journals: total_journals as i32,
dominant_mood, dominant_mood,
}) })
} }
/// 计算连续写日记天数 /// 计算连续写日记天数
/// ///
/// 从今天开始往前数,连续有日记记录的天数 /// 只查询 date 列 + DISTINCT按日期降序限定 366 天窗口
/// 避免全量加载所有日记字段到内存。
async fn calculate_streak( async fn calculate_streak(
tenant_id: Uuid, tenant_id: Uuid,
user_id: Uuid, user_id: Uuid,
db: &DatabaseConnection, db: &DatabaseConnection,
) -> DiaryResult<i32> { ) -> DiaryResult<i32> {
let journals = journal_entry::Entity::find() let cutoff = (Utc::now() - Duration::days(366)).date_naive();
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::AuthorId.eq(user_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?;
// 收集所有有日记的日期 let sql = r#"
let mut dates: std::collections::HashSet<NaiveDate> = SELECT DISTINCT date
journals.into_iter().map(|j| j.date).collect(); FROM journal_entry
WHERE tenant_id = $1
AND author_id = $2
AND date >= $3
AND deleted_at IS NULL
ORDER BY date DESC
"#;
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[tenant_id.into(), user_id.into(), cutoff.into()],
);
let rows = db.query_all(stmt).await?;
let dates: std::collections::HashSet<NaiveDate> = rows
.into_iter()
.filter_map(|row| row.try_get_by_index::<NaiveDate>(0).ok())
.collect();
let mut streak = 0i32; let mut streak = 0i32;
let mut check_date = Utc::now().date_naive(); let mut check_date = Utc::now().date_naive();
while dates.contains(&check_date) {
// 从今天开始往前检查
while dates.remove(&check_date) {
streak += 1; streak += 1;
check_date -= Duration::days(1); check_date -= Duration::days(1);
} }
@@ -206,8 +230,7 @@ mod tests {
let mut streak = 0i32; let mut streak = 0i32;
let mut check_date = today; let mut check_date = today;
let mut mutable_dates = dates.clone(); while dates.contains(&check_date) {
while mutable_dates.remove(&check_date) {
streak += 1; streak += 1;
check_date -= Duration::days(1); check_date -= Duration::days(1);
} }
@@ -223,8 +246,7 @@ mod tests {
let mut streak = 0i32; let mut streak = 0i32;
let mut check_date = today; let mut check_date = today;
let mut mutable_dates = dates.clone(); while dates.contains(&check_date) {
while mutable_dates.remove(&check_date) {
streak += 1; streak += 1;
check_date -= Duration::days(1); check_date -= Duration::days(1);
} }
@@ -240,8 +262,7 @@ mod tests {
let mut streak = 0i32; let mut streak = 0i32;
let mut check_date = today; let mut check_date = today;
let mut mutable_dates = dates.clone(); while dates.contains(&check_date) {
while mutable_dates.remove(&check_date) {
streak += 1; streak += 1;
check_date -= Duration::days(1); check_date -= Duration::days(1);
} }
@@ -258,8 +279,7 @@ mod tests {
let mut streak = 0i32; let mut streak = 0i32;
let mut check_date = today; let mut check_date = today;
let mut mutable_dates = dates.clone(); while dates.contains(&check_date) {
while mutable_dates.remove(&check_date) {
streak += 1; streak += 1;
check_date -= Duration::days(1); check_date -= Duration::days(1);
} }
@@ -271,9 +291,9 @@ mod tests {
#[test] #[test]
fn mood_counts_percentage_calculation() { fn mood_counts_percentage_calculation() {
// 模拟聚合逻辑3 happy + 2 calm = 5 total // 模拟聚合逻辑3 happy + 2 calm = 5 total
let total = 5i32; let total = 5i64;
let happy_count = 3i32; let happy_count = 3i64;
let calm_count = 2i32; let calm_count = 2i64;
let happy_pct = (happy_count as f64 / total as f64) * 100.0; let happy_pct = (happy_count as f64 / total as f64) * 100.0;
let calm_pct = (calm_count as f64 / total as f64) * 100.0; let calm_pct = (calm_count as f64 / total as f64) * 100.0;
@@ -285,8 +305,38 @@ mod tests {
#[test] #[test]
fn mood_counts_empty_total_zero_percentage() { fn mood_counts_empty_total_zero_percentage() {
// 无日记时,百分比为 0 // 无日记时,百分比为 0
let total = 0i32; let total = 0i64;
let percentage = if total > 0 { 100.0 } else { 0.0 }; let percentage = if total > 0 { 100.0 } else { 0.0 };
assert_eq!(percentage, 0.0); assert_eq!(percentage, 0.0);
} }
#[test]
fn mood_counts_map_aggregation() {
// 模拟 SQL GROUP BY 结果在 Rust 中构建 MoodCount 列表
let mut map: std::collections::HashMap<String, i64> = std::collections::HashMap::new();
*map.entry("happy".to_string()).or_insert(0) += 3;
*map.entry("calm".to_string()).or_insert(0) += 2;
*map.entry("happy".to_string()).or_insert(0) += 1;
let total: i64 = map.values().sum();
assert_eq!(total, 6);
assert_eq!(*map.get("happy").unwrap(), 4);
assert_eq!(*map.get("calm").unwrap(), 2);
let counts: Vec<MoodCount> = map
.iter()
.map(|(mood, &count)| {
let percentage = (count as f64 / total as f64) * 100.0;
MoodCount {
mood: parse_mood(mood),
count: count as i32,
percentage,
}
})
.collect();
assert_eq!(counts.len(), 2);
let happy_mc = counts.iter().find(|c| matches!(c.mood, Mood::Happy)).unwrap();
assert!((happy_mc.percentage - 66.6667).abs() < 0.01);
}
} }

View File

@@ -2,8 +2,8 @@
use chrono::Utc; use chrono::Utc;
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait,
QueryFilter, QueryOrder, Set, TransactionTrait, PaginatorTrait, QueryFilter, QueryOrder, Set,
}; };
use uuid::Uuid; use uuid::Uuid;
@@ -272,7 +272,8 @@ impl ParentService {
/// 删除孩子数据 — 软删除所有日记PIPL 删除权) /// 删除孩子数据 — 软删除所有日记PIPL 删除权)
/// ///
/// 软删除孩子全部未删除的日记,逐条设置 deleted_at /// 使用单条 SQL UPDATE 批量软删除,替代逐条更新
/// 性能: 1 次 SQL 替代 N 次 UPDATE。
/// 发布 `diary.parent.data_deleted` 事件记录操作。 /// 发布 `diary.parent.data_deleted` 事件记录操作。
pub async fn delete_child_data( pub async fn delete_child_data(
tenant_id: Uuid, tenant_id: Uuid,
@@ -283,32 +284,33 @@ impl ParentService {
) -> DiaryResult<usize> { ) -> DiaryResult<usize> {
Self::verify_binding(tenant_id, parent_id, child_id, db).await?; Self::verify_binding(tenant_id, parent_id, child_id, db).await?;
let journals = journal_entry::Entity::find()
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::AuthorId.eq(child_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?;
let count = journals.len();
let now = Utc::now(); let now = Utc::now();
// 事务软删除所有日记PIPL 删除权 — 原子操作,避免部分删除 // 批量软删除 — 单条 SQL 替代逐条 UPDATE性能优化 8a-C03
db.transaction::<_, (), DiaryError>(|txn| { let sql = r#"
Box::pin(async move { UPDATE journal_entries
for journal in journals { SET deleted_at = $1,
let current_version = journal.version; updated_at = $1,
let mut active: journal_entry::ActiveModel = journal.into(); updated_by = $2,
active.deleted_at = Set(Some(now)); version = version + 1
active.updated_at = Set(now); WHERE tenant_id = $3
active.updated_by = Set(parent_id); AND author_id = $4
active.version = Set(current_version + 1); AND deleted_at IS NULL
active.update(txn).await?; "#;
}
Ok(()) let stmt = sea_orm::Statement::from_sql_and_values(
}) sea_orm::DatabaseBackend::Postgres,
}) sql,
.await?; [
now.into(),
parent_id.into(),
tenant_id.into(),
child_id.into(),
],
);
let result = db.execute(stmt).await?;
let count = result.rows_affected() as usize;
event_bus event_bus
.publish( .publish(

View File

@@ -1,8 +1,8 @@
// 贴纸服务 — 贴纸包与贴纸管理 // 贴纸服务 — 贴纸包与贴纸管理
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait,
QueryFilter, QueryOrder, Set, PaginatorTrait, QueryFilter, QueryOrder, Set,
}; };
use uuid::Uuid; use uuid::Uuid;
@@ -16,7 +16,8 @@ pub struct StickerService;
impl StickerService { impl StickerService {
/// 获取贴纸包列表 /// 获取贴纸包列表
/// ///
/// 返回所有可用的贴纸包,按分类和名称排序 /// 使用 SQL GROUP BY 批量获取贴纸计数,替代逐包 COUNT 查询
/// 性能: 2 次查询packs + counts替代 N+1 次。
pub async fn list_sticker_packs( pub async fn list_sticker_packs(
tenant_id: Uuid, tenant_id: Uuid,
category: Option<String>, category: Option<String>,
@@ -36,24 +37,47 @@ impl StickerService {
.all(db) .all(db)
.await?; .await?;
let mut result = Vec::with_capacity(packs.len()); // 批量获取所有贴纸包的贴纸计数 — 单次 SQL GROUP BY替代 N+1 查询)
for pack in packs { let pack_ids: Vec<Uuid> = packs.iter().map(|p| p.id).collect();
let sticker_count = sticker::Entity::find() let count_map: std::collections::HashMap<Uuid, i64> = if !pack_ids.is_empty() {
.filter(sticker::Column::PackId.eq(pack.id)) let sql = r#"
.filter(sticker::Column::DeletedAt.is_null()) SELECT pack_id, COUNT(*) AS count
.count(db) FROM stickers
.await? as i32; WHERE pack_id = ANY($1)
AND deleted_at IS NULL
GROUP BY pack_id
"#;
result.push(StickerPackResp { let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[pack_ids.into()],
);
let rows = db.query_all(stmt).await?;
rows.into_iter()
.filter_map(|row| {
let pack_id: Uuid = row.try_get_by_index::<Uuid>(0).ok()?;
let count: i64 = row.try_get_by_index::<i64>(1).ok()?;
Some((pack_id, count))
})
.collect()
} else {
std::collections::HashMap::new()
};
let result: Vec<StickerPackResp> = packs
.into_iter()
.map(|pack| StickerPackResp {
id: pack.id, id: pack.id,
name: pack.name, name: pack.name,
description: pack.description, description: pack.description,
cover_image_url: pack.thumbnail_url, cover_image_url: pack.thumbnail_url,
sticker_count, sticker_count: *count_map.get(&pack.id).unwrap_or(&0) as i32,
is_free: pack.is_free, is_free: pack.is_free,
category: pack.category, category: pack.category,
}); })
} .collect();
Ok(result) Ok(result)
} }

View File

@@ -1,7 +1,17 @@
// 日记同步服务 — 版本号冲突检测 + 增量同步 // 日记同步服务 — 版本号冲突检测 + 增量同步
//
// 性能优化 (8a-C02):
// - 批量预查询: UPDATE/DELETE 目标一次性 IN 查询,替代逐条 find
// - 事务化: 所有写操作在单个事务内完成
// - 冲突前置检测: 写入前从预取数据检测版本冲突
use std::collections::HashMap;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, Set}; use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DatabaseConnection, EntityTrait,
QueryFilter, Set, TransactionTrait,
};
use uuid::Uuid; use uuid::Uuid;
use crate::dto::{ConflictInfo, SyncChange, SyncResp}; use crate::dto::{ConflictInfo, SyncChange, SyncResp};
@@ -14,11 +24,12 @@ pub struct SyncService;
impl SyncService { impl SyncService {
/// 同步:客户端提交变更,服务端返回服务端变更 + 冲突 /// 同步:客户端提交变更,服务端返回服务端变更 + 冲突
/// ///
/// 流程: /// 优化流程:
/// 1. 逐条处理客户端变更(create/update/delete /// 1. create/update/delete 分组
/// 2. 获取 last_sync_time 之后服务端更新的记录 /// 2. 批量预查询 update/delete 目标记录(单次 IN 查询)
/// 3. 检测版本冲突 /// 3. 从预取数据检测版本冲突,过滤出有效操作
/// 4. 返回 (server_changes, conflicts, sync_time) /// 4. 单事务执行所有写操作
/// 5. 获取服务端变更
pub async fn sync( pub async fn sync(
tenant_id: Uuid, tenant_id: Uuid,
user_id: Uuid, user_id: Uuid,
@@ -26,29 +37,107 @@ impl SyncService {
client_changes: Vec<SyncChange>, client_changes: Vec<SyncChange>,
db: &DatabaseConnection, db: &DatabaseConnection,
) -> DiaryResult<SyncResp> { ) -> DiaryResult<SyncResp> {
let mut conflicts = Vec::new(); // 1. 分组: 按操作类型拆分
let mut creates: Vec<serde_json::Value> = Vec::new();
let mut updates: Vec<(Uuid, i32, serde_json::Value)> = Vec::new();
let mut deletes: Vec<(Uuid, i32)> = Vec::new();
// 1. 处理客户端变更
for change in client_changes { for change in client_changes {
if let Err(e) = Self::apply_client_change(tenant_id, user_id, change, db).await { match change {
// 版本冲突收集到冲突列表,其他错误直接返回 SyncChange::CreateJournal { data } => creates.push(data),
match e { SyncChange::UpdateJournal { id, version, data } => {
DiaryError::VersionConflict { updates.push((id, version, data));
local, }
server, SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
} => { }
conflicts.push(ConflictInfo { }
journal_id: Uuid::nil(), // ID 在 apply_client_change 内部处理
local_version: local, // 2. 批量预查询: 一次性获取所有 update/delete 目标记录
server_version: server, let ids_to_fetch: Vec<Uuid> = updates
}); .iter()
} .map(|(id, _, _)| *id)
_ => return Err(e), .chain(deletes.iter().map(|(id, _)| *id))
.collect();
let existing_map: HashMap<Uuid, journal_entry::Model> = if !ids_to_fetch.is_empty() {
journal_entry::Entity::find()
.filter(journal_entry::Column::Id.is_in(ids_to_fetch))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?
.into_iter()
.map(|m| (m.id, m))
.collect()
} else {
HashMap::new()
};
// 3. 冲突前置检测: 从预取数据判断版本,分离冲突和有效操作
let mut conflicts = Vec::new();
let mut valid_updates: Vec<(journal_entry::Model, serde_json::Value)> = Vec::new();
let mut valid_deletes: Vec<(journal_entry::Model, i32)> = Vec::new();
for (id, version, data) in updates {
match existing_map.get(&id) {
Some(existing) if existing.version == version => {
valid_updates.push((existing.clone(), data));
}
Some(existing) => {
conflicts.push(ConflictInfo {
journal_id: id,
local_version: version,
server_version: existing.version,
});
}
None => {
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
} }
} }
} }
// 2. 获取服务端变更last_sync_time 之后更新的) for (id, version) in deletes {
match existing_map.get(&id) {
Some(existing) if existing.version == version => {
valid_deletes.push((existing.clone(), version));
}
Some(existing) => {
conflicts.push(ConflictInfo {
journal_id: id,
local_version: version,
server_version: existing.version,
});
}
None => {
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
}
}
}
// 4. 单事务执行所有写操作
db.transaction::<_, (), DiaryError>(|txn| {
Box::pin(async move {
// 批量插入 creates
for data in creates {
Self::insert_journal(tenant_id, user_id, data, txn).await?;
}
// 批量更新 valid_updates
for (existing, data) in valid_updates {
Self::apply_update(user_id, existing, data, txn).await?;
}
// 批量软删除 valid_deletes
for (existing, version) in valid_deletes {
Self::apply_delete(user_id, existing, version, txn).await?;
}
Ok(())
})
})
.await?;
// 5. 获取服务端变更last_sync_time 之后更新的)
let mut condition = Condition::all() let mut condition = Condition::all()
.add(journal_entry::Column::TenantId.eq(tenant_id)) .add(journal_entry::Column::TenantId.eq(tenant_id))
.add(journal_entry::Column::AuthorId.eq(user_id)); .add(journal_entry::Column::AuthorId.eq(user_id));
@@ -62,7 +151,6 @@ impl SyncService {
.all(db) .all(db)
.await?; .await?;
// 3. 转换为 JSON 格式的服务端变更
let server_changes: Vec<serde_json::Value> = server_records let server_changes: Vec<serde_json::Value> = server_records
.iter() .iter()
.map(|r| { .map(|r| {
@@ -82,7 +170,6 @@ impl SyncService {
}) })
.collect(); .collect();
// 4. 返回同步结果
Ok(SyncResp { Ok(SyncResp {
server_changes, server_changes,
conflicts, conflicts,
@@ -90,147 +177,120 @@ impl SyncService {
}) })
} }
/// 处理单条客户端变更 /// 插入新日记(事务内)
async fn apply_client_change( async fn insert_journal<C: ConnectionTrait>(
tenant_id: Uuid, tenant_id: Uuid,
user_id: Uuid, user_id: Uuid,
change: SyncChange, data: serde_json::Value,
db: &DatabaseConnection, db: &C,
) -> DiaryResult<()> { ) -> DiaryResult<()> {
use sea_orm::ActiveModelTrait; let id = data
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.unwrap_or_else(Uuid::now_v7);
match change { let title = data
SyncChange::CreateJournal { data } => { .get("title")
// 客户端创建 — 直接插入 .and_then(|v| v.as_str())
let id = data .unwrap_or("")
.get("id") .to_string();
let now = Utc::now();
let model = journal_entry::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
author_id: Set(user_id),
class_id: Set(None),
title: Set(title),
date: Set(
data.get("date")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok()) .and_then(|s| s.parse().ok())
.unwrap_or_else(Uuid::now_v7); .unwrap_or_else(|| now.date_naive()),
),
let title = data mood: Set(
.get("title") data.get("mood")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.unwrap_or("") .unwrap_or("happy")
.to_string(); .to_string(),
),
weather: Set(
data.get("weather")
.and_then(|v| v.as_str())
.unwrap_or("sunny")
.to_string(),
),
tags: Set(data.get("tags").cloned()),
is_private: Set(
data.get("is_private")
.and_then(|v| v.as_bool())
.unwrap_or(true),
),
shared_to_class: Set(false),
assigned_topic_id: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
version: Set(1),
};
model.insert(db).await?;
Ok(())
}
let now = Utc::now(); /// 应用更新(事务内,版本已预验证)
let model = journal_entry::ActiveModel { async fn apply_update<C: ConnectionTrait>(
id: Set(id), user_id: Uuid,
tenant_id: Set(tenant_id), existing: journal_entry::Model,
author_id: Set(user_id), data: serde_json::Value,
class_id: Set(None), db: &C,
title: Set(title), ) -> DiaryResult<()> {
date: Set( let now = Utc::now();
data.get("date") let current_version = existing.version;
.and_then(|v| v.as_str()) let mut active: journal_entry::ActiveModel = existing.into();
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| now.date_naive()),
),
mood: Set(
data.get("mood")
.and_then(|v| v.as_str())
.unwrap_or("happy")
.to_string(),
),
weather: Set(
data.get("weather")
.and_then(|v| v.as_str())
.unwrap_or("sunny")
.to_string(),
),
tags: Set(data.get("tags").cloned()),
is_private: Set(
data.get("is_private")
.and_then(|v| v.as_bool())
.unwrap_or(true),
),
shared_to_class: Set(false),
assigned_topic_id: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
version: Set(1),
};
model.insert(db).await?;
}
SyncChange::UpdateJournal {
id,
version,
data,
} => {
// 客户端更新 — 带版本检查
let existing = journal_entry::Entity::find()
.filter(journal_entry::Column::Id.eq(id))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
if existing.version != version { if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
return Err(DiaryError::VersionConflict { active.title = Set(title.to_string());
local: version, }
server: existing.version, if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) {
}); active.mood = Set(mood.to_string());
} }
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) {
let now = Utc::now(); active.weather = Set(weather.to_string());
let mut active: journal_entry::ActiveModel = existing.into(); }
if let Some(tags) = data.get("tags").cloned() {
if let Some(title) = data.get("title").and_then(|v| v.as_str()) { active.tags = Set(Some(tags));
active.title = Set(title.to_string()); }
} if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) { active.is_private = Set(is_private);
active.mood = Set(mood.to_string()); }
} if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) { active.shared_to_class = Set(shared);
active.weather = Set(weather.to_string());
}
if let Some(tags) = data.get("tags").cloned() {
active.tags = Set(Some(tags));
}
if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
active.is_private = Set(is_private);
}
if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
active.shared_to_class = Set(shared);
}
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
}
SyncChange::DeleteJournal { id, version } => {
// 客户端删除 — 软删除带版本检查
let existing = journal_entry::Entity::find()
.filter(journal_entry::Column::Id.eq(id))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
if existing.version != version {
return Err(DiaryError::VersionConflict {
local: version,
server: existing.version,
});
}
let now = Utc::now();
let mut active: journal_entry::ActiveModel = existing.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
}
} }
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(current_version + 1);
active.update(db).await?;
Ok(())
}
/// 应用软删除(事务内,版本已预验证)
async fn apply_delete<C: ConnectionTrait>(
user_id: Uuid,
existing: journal_entry::Model,
version: i32,
db: &C,
) -> DiaryResult<()> {
let now = Utc::now();
let mut active: journal_entry::ActiveModel = existing.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
Ok(()) Ok(())
} }
} }
@@ -276,7 +336,6 @@ mod tests {
}); });
let change = SyncChange::CreateJournal { data: data.clone() }; let change = SyncChange::CreateJournal { data: data.clone() };
// 验证 match 可以正确提取 data
match &change { match &change {
SyncChange::CreateJournal { data } => { SyncChange::CreateJournal { data } => {
assert_eq!(data.get("title").unwrap().as_str().unwrap(), "我的日记"); assert_eq!(data.get("title").unwrap().as_str().unwrap(), "我的日记");
@@ -344,43 +403,40 @@ mod tests {
#[test] #[test]
fn conflict_collection_pattern_mimics_sync_logic() { fn conflict_collection_pattern_mimics_sync_logic() {
// 模拟 sync 方法中 VersionConflict 收集到 conflicts 列表的行为 // 模拟批量预查询后的冲突检测逻辑
let mut conflicts: Vec<ConflictInfo> = Vec::new(); let mut conflicts: Vec<ConflictInfo> = Vec::new();
let errors: Vec<DiaryError> = vec![ // 模拟预取的 existing_map: id → (version)
DiaryError::VersionConflict { let id1 = uuid::Uuid::now_v7();
local: 1, let id2 = uuid::Uuid::now_v7();
server: 3, let existing: HashMap<Uuid, i32> = [(id1, 3), (id2, 5)].into_iter().collect();
},
DiaryError::VersionConflict {
local: 2,
server: 5,
},
];
for e in errors { // 客户端变更
match e { let updates: Vec<(Uuid, i32)> = vec![(id1, 1), (id2, 5)];
DiaryError::VersionConflict { local, server } => {
for (id, local_ver) in updates {
match existing.get(&id) {
Some(&server_ver) if server_ver == local_ver => {
// 版本匹配 — 有效更新
}
Some(&server_ver) => {
conflicts.push(ConflictInfo { conflicts.push(ConflictInfo {
journal_id: uuid::Uuid::nil(), journal_id: id,
local_version: local, local_version: local_ver,
server_version: server, server_version: server_ver,
}); });
} }
_ => {} None => {}
} }
} }
assert_eq!(conflicts.len(), 2); assert_eq!(conflicts.len(), 1);
assert_eq!(conflicts[0].local_version, 1); assert_eq!(conflicts[0].local_version, 1);
assert_eq!(conflicts[0].server_version, 3); assert_eq!(conflicts[0].server_version, 3);
assert_eq!(conflicts[1].local_version, 2);
assert_eq!(conflicts[1].server_version, 5);
} }
#[test] #[test]
fn non_conflict_error_does_not_collect() { fn non_conflict_error_does_not_collect() {
// 验证非 VersionConflict 错误不会被收集到 conflicts 列表
let mut conflicts: Vec<ConflictInfo> = Vec::new(); let mut conflicts: Vec<ConflictInfo> = Vec::new();
let error = DiaryError::NotFound("日记不存在".to_string()); let error = DiaryError::NotFound("日记不存在".to_string());
@@ -392,11 +448,63 @@ mod tests {
server_version: server, server_version: server,
}); });
} }
_ => { _ => {}
// 其他错误不应收集
}
} }
assert!(conflicts.is_empty()); assert!(conflicts.is_empty());
} }
#[test]
fn batch_partition_correctly_separates_types() {
let changes = vec![
SyncChange::CreateJournal {
data: serde_json::json!({"title": "日记1"}),
},
SyncChange::UpdateJournal {
id: uuid::Uuid::nil(),
version: 1,
data: serde_json::json!({"title": "更新"}),
},
SyncChange::DeleteJournal {
id: uuid::Uuid::nil(),
version: 2,
},
SyncChange::CreateJournal {
data: serde_json::json!({"title": "日记2"}),
},
];
let mut creates = Vec::new();
let mut updates = Vec::new();
let mut deletes = Vec::new();
for change in changes {
match change {
SyncChange::CreateJournal { data } => creates.push(data),
SyncChange::UpdateJournal { id, version, data } => {
updates.push((id, version, data));
}
SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
}
}
assert_eq!(creates.len(), 2);
assert_eq!(updates.len(), 1);
assert_eq!(deletes.len(), 1);
}
#[test]
fn version_match_detection() {
// 模拟预取数据中的版本匹配
let id = uuid::Uuid::now_v7();
let existing: HashMap<Uuid, i32> = [(id, 5)].into_iter().collect();
// 版本匹配
let matched = existing.get(&id).map_or(false, |&sv| sv == 5);
assert!(matched);
// 版本不匹配
let mismatched = existing.get(&id).map_or(false, |&sv| sv == 3);
assert!(!mismatched);
}
} }